Source code for ehtim.survey

# survey.py
# a parameter survey class
#
#    Copyright (C) 2018 Andrew Chael
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os

import ehtim as eh
import paramsurvey
import paramsurvey.params

import warnings
warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning)


##################################################################################################
# ParameterSet object
##################################################################################################

[docs]class ParameterSet: """ Attributes: paramset (dict): dict containing single parameter set. Each key in dict becomes its own attribute params_fixed (dict): dict containing non-varying parameters. Each key in dict becomes its own attribute outfile (str): base of outfile with 7 digit index of pset added to it obs (Obsdata): observation object from input uvfits file, generated by load_data() zbl_tot (float): flux on ALMA-APEX baseline, generated in preimcal() obs_sc_init (Obsdata): original observation with systematic noise added, reverse taper applied, and extended flux removed obs_sc (Obsdata): obs_sc_init after all rounds of self-calibration initimg (Image): prior/initial image im_out (Image): final reconstructed image im_addcmp (Image): final reconstructed image with extended flux added back in obs_sc_addcmp (Obsdata): original observation self-calibrated with im_addcmp caltab (Caltable): final calibration table associated with im_out """ def __init__(self, paramset, params_fixed={}): """ An object for one parameter set Args: paramset (dict): A dict containing single parameter set params_fixed (dict): A dict containing non-varying parameters Returns: a ParameterSet object """ # set each item in paramset dict to an attribute for param in paramset: setattr(self, param, paramset[param]) if len(params_fixed) > 0: for param in params_fixed: setattr(self, param, params_fixed[param]) os.makedirs(self.outpath, exist_ok=True) self.paramset = paramset self.params_fixed = params_fixed self.outfile = '%s_%0.7i' % (self.outfile_base, self.i) self.fov *= eh.RADPERUAS self.reverse_taper_uas *= eh.RADPERUAS self.prior_fwhm *= eh.RADPERUAS
[docs] def load_data(self): """ Loads in uvfits file from self.infile into eht-imaging obs object and averages using self.avg_time Creates self.obs Args: Returns: """ # load the uvfits file self.obs = eh.obsdata.load_uvfits(self.infile) # identify the scans (times of continuous observation) in the data self.obs.add_scans() # coherently average if self.avg_time == 'scan': self.obs = self.obs.avg_coherent(0., scan_avg=True) else: self.obs = self.obs.avg_coherent(self.avg_time)
[docs] def preimcal(self): """ Applies pre-imaging calibration to self.obs. This includes flagging sites with no measurements, rescaling short baselines so only compact flux is being imaged, applying a u-v taper, and adding extra sys noise. Creates self.obs_sc which will have self-cal applied in future steps and self.obs_sc_init which is a copy of the initial preimcal observation Args: Returns: """ # handle site name change of APEX between 2017 (AP) to 2018 (AX) sites = list(self.obs.tkey.keys()) alma_options = ['ALMA', 'AA'] for opt in alma_options: if opt in sites: alma = opt apex_options = ['APEX', 'AP', 'AX'] for opt in apex_options: if opt in sites: apex = opt self.zbl_tot = np.median(self.obs.unpack_bl(alma, apex, 'amp')['amp']) # Flag out sites in the obs.tarr table with no measurements allsites = set(self.obs.unpack(['t1'])['t1']) | set(self.obs.unpack(['t2'])['t2']) self.obs.tarr = self.obs.tarr[[o in allsites for o in self.obs.tarr['site']]] self.obs = eh.obsdata.Obsdata(self.obs.ra, self.obs.dec, self.obs.rf, self.obs.bw, self.obs.data, self.obs.tarr, source=self.obs.source, mjd=self.obs.mjd, ampcal=self.obs.ampcal, phasecal=self.obs.phasecal) self.obs_orig = self.obs.copy() # Rescale short baselines to excise contributions from extended flux. # setting zbl < zbl_tot assumes there is an extended constant flux component of zbl_tot-zbl Jy if self.zbl != self.zbl_tot: for j in range(len(self.obs.data)): if (self.obs.data['u'][j] ** 2 + self.obs.data['v'][j] ** 2) ** 0.5 >= self.uv_zblcut: continue for field in ['vis', 'qvis', 'uvis', 'vvis', 'sigma', 'qsigma', 'usigma', 'vsigma']: self.obs.data[field][j] *= self.zbl / self.zbl_tot self.obs.reorder_tarr_snr() self.obs_sc = self.obs.copy() # Reverse taper the observation: this enforces a maximum resolution on reconstructed features if self.reverse_taper_uas > 0: self.obs_sc = self.obs_sc.reverse_taper(self.reverse_taper_uas) # Add non-closing systematic noise to the observation self.obs_sc = self.obs_sc.add_fractional_noise(self.sys_noise) # Make a copy of the initial data (before any self-calibration but after the taper) self.obs_sc_init = self.obs_sc.copy()
[docs] def init_img(self): """ Creates initial/prior image. Only gaussian prior option at present, but creates prior attritubute self.initimg using self.zbl and self.prior_fwhm Args: Returns: """ # create guassian prior/inital image emptyprior = eh.image.make_square(self.obs_sc, self.npixels, self.fov) gaussprior = emptyprior.add_gauss(self.zbl, (self.prior_fwhm, self.prior_fwhm, 0, 0, 0)) # To avoid gradient singularities in the first step, add an additional small Gaussian gaussprior = gaussprior.add_gauss(self.zbl * 1e-3, (self.prior_fwhm, self.prior_fwhm, 0, self.prior_fwhm, self.prior_fwhm)) self.initimg = gaussprior.copy()
[docs] def make_img(self): """ Reconstructs image with specified parameters (data weights, amount of self-cal, etc) described in paramset dict Creates attributes self.im_out containing the final image and self.caltab containing a corresponding calibration table object for the final image Args: Returns: """ # specify data terms data_term = {} if hasattr(self, 'vis') and self.vis != 0.: data_term['vis'] = self.vis if hasattr(self, 'amp') and self.amp != 0.: data_term['amp'] = self.amp if hasattr(self, 'diag_closure') and self.diag_closure is True: if hasattr(self, 'logcamp_diag') and self.logcamp_diag != 0.: data_term['logcamp_diag'] = self.logcamp if hasattr(self, 'cphase_diag') and self.cphase_diag != 0.: data_term['cphase_diag'] = self.cphase else: if hasattr(self, 'logcamp') and self.logcamp != 0.: data_term['logcamp'] = self.logcamp if hasattr(self, 'cphase') and self.cphase != 0.: data_term['cphase'] = self.cphase # specify regularizer terms reg_term = {} if hasattr(self, 'simple') and self.simple != 0.: reg_term['simple'] = self.simple if hasattr(self, 'tv2') and self.tv2 != 0.: reg_term['tv2'] = self.tv2 if hasattr(self, 'tv') and self.tv != 0.: reg_term['tv'] = self.tv if hasattr(self, 'l1') and self.l1 != 0.: reg_term['l1'] = self.l1 if hasattr(self, 'flux') and self.flux != 0.: reg_term['flux'] = self.flux if hasattr(self, 'rgauss') and self.rgauss != 0.: reg_term['rgauss'] = self.rgauss ### How to make this more general? ### # Add systematic noise tolerance for amplitude a-priori calibration errors # Start with the SEFD noise (but need sqrt) # then rescale to ensure that final results respect the stated error budget systematic_noise = self.SEFD_error_budget.copy() for key in systematic_noise.keys(): systematic_noise[key] = ((1.0 + systematic_noise[key]) ** 0.5 - 1.0) * 0.25 # set up imager imgr = eh.imager.Imager(self.obs_sc, self.initimg, prior_im=self.initimg, flux=self.zbl, data_term=data_term, maxit=self.maxit, norm_reg=True, systematic_noise=systematic_noise, reg_term=reg_term, ttype=self.ttype, cp_uv_min=self.uv_zblcut, stop=self.stop) res = self.obs.res() imgr.make_image_I(show_updates=False, niter=self.niter_static, blur_frac=self.blurfrac) if self.selfcal: # Self-calibrate to the previous model (phase-only); # The solution_interval is 0 to align phases from high and low bands if needed self.obs_sc = eh.selfcal(self.obs_sc, imgr.out_last(), method='phase', ttype=self.ttype, solution_interval=0.0, processes=-1) sc_p_idx = 0 dterms = data_term.keys() while sc_p_idx < self.sc_phase: # Blur the previous reconstruction to the intrinsic resolution init = imgr.out_last().blur_circ(res) # Increase the data weights and reinitialize imaging if sc_p_idx == 0: for key in dterms: data_term[key] *= self.xdw_phase # set up imager imgr = eh.imager.Imager(self.obs_sc, init, prior_im=self.initimg, flux=self.zbl, data_term=data_term, maxit=self.maxit, norm_reg=True, systematic_noise=systematic_noise, reg_term=reg_term, ttype=self.ttype, cp_uv_min=self.uv_zblcut, stop=self.stop) # Imaging imgr.make_image_I(show_updates=False, niter=self.niter_static, blur_frac=self.blurfrac) # apply self-calibration to original calibrated data self.obs_sc = eh.selfcal(self.obs_sc_init, imgr.out_last(), method='phase', ttype=self.ttype) sc_p_idx += 1 # repeat amp+phase self-calibration sc_ap_idx = 0 while sc_ap_idx < self.sc_ap: # Blur the previous reconstruction to the intrinsic resolution init = imgr.out_last().blur_circ(res) # Increase the data weights and reinitialize imaging if sc_p_idx == 0: for key in dterms: data_term[key] *= self.xdw_ap # set up imager imgr = eh.imager.Imager(self.obs_sc, init, prior_im=self.initimg, flux=self.zbl, data_term=data_term, maxit=self.maxit, norm_reg=True, systematic_noise=systematic_noise, reg_term=reg_term, ttype=self.ttype, cp_uv_min=self.uv_zblcut, stop=self.stop) # Imaging imgr.make_image_I(show_updates=False, niter=self.niter_static, blur_frac=self.blurfrac) caltab = eh.selfcal(self.obs_sc_init, imgr.out_last(), method='both', ttype=self.ttype, gain_tol=self.gaintol, caltable=True, processes=-1) self.obs_sc = caltab.applycal(self.obs_sc_init, interp='nearest', extrapolate=True) sc_ap_idx += 1 self.im_out = imgr.out_last().copy() # if no self-cal, no caltabs will be saved if self.sc_phase == 0 and self.sc_ap == 0: self.save_caltab = False else: self.caltab = caltab
[docs] def output_results(self): """ Outputs all requested files pertaining to final image Args: Returns: """ # Add a large gaussian component to account for the missing flux # so the final image can be compared with the original data self.im_addcmp = self.im_out.add_zblterm(self.obs_orig, self.uv_zblcut, debias=True) self.obs_sc_addcmp = eh.selfcal(self.obs_orig, self.im_addcmp, method='both', ttype=self.ttype) # If an inverse taper was used, restore the final image # to be consistent with the original data if self.reverse_taper_uas > 0.0: self.im_out = self.im_out.blur_circ(self.reverse_taper_uas) # Save the final image outfits = os.path.join(self.outpath, '%s.fits' % (self.outfile)) self.im_out.save_fits(outfits) # Save caltab if hasattr(self, 'save_caltab') and self.save_caltab == True: outcal = os.path.join(self.outpath, '%s/' % (self.outfile)) eh.caltable.save_caltable(self.caltab, self.obs_sc_init, outcal) # Save self-calibrated uvfits if self.save_uvfits: outuvfits = os.path.join(self.outpath, '%s.uvfits' % (self.outfile)) self.obs_sc_addcmp.save_uvfits(outuvfits) # Save pdf of final image if self.save_pdf: outpdf = os.path.join(self.outpath, '%s.pdf' % (self.outfile)) self.im_out.display(cbar_unit=['Tb'], label_type='scale', export_pdf=outpdf) # Save pdf of image summary if self.save_imgsums: # Save an image summary sheet plt.close('all') outimgsum = os.path.join(self.outpath, '%s_imgsum.pdf' % (self.outfile)) eh.imgsum(self.im_addcmp, self.obs_sc_addcmp, self.obs_orig, outimgsum, cp_uv_min=self.uv_zblcut, processes=-1)
[docs] def save_statistics(self): """ Saves a csv file with the following statistics: chi^2 closure phase, logcamp, vis wrt the original observation chi^2 vis wrt to original observation with self-cal to final image applied chi^2 closure phase, logcamp, vis wrt the original observation with sys noise and self-cal applied Args: Returns: """ stats_dict = {} stats_dict['i'] = [self.i] outstats = os.path.join(self.outpath, '%s_stats.csv' % (self.outfile)) # if ground truth image available, compute nxcorr if self.ground_truth_img != 'None': gt_im = eh.image.load_fits(self.ground_truth_img) fov_ = 200 * eh.RADPERUAS psize_ = fov_ / 256 nxcorr_, _, _ = gt_im.compare_images(self.im_addcmp, metric='nxcorr', target_fov=fov_, psize=psize_) nxcorr = nxcorr_[0] stats_dict['nxcorr'] = [nxcorr] # chi^2 for closure phase (cp) and log camp (lc) # original uv data obs_ref = self.obs_orig chi2_cp_ref = obs_ref.chisq(self.im_addcmp, dtype='cphase', ttype=self.ttype, systematic_noise=0., systematic_cphase_noise=0, maxset=False, cp_uv_min=self.uv_zblcut) chi2_lc_ref = obs_ref.chisq(self.im_addcmp, dtype='logcamp', ttype=self.ttype, systematic_noise=0., snrcut=1.0, maxset=False, cp_uv_min=self.uv_zblcut) # snrcut to remove large chi2 point chi2_vis_ref = obs_ref.chisq(self.im_addcmp, dtype='vis', ttype=self.ttype, systematic_noise=0., snrcut=1.0, maxset=False, cp_uv_min=self.uv_zblcut) # snrcut to remove large chi2 point stats_dict['chi2_cp_ref'] = [chi2_cp_ref] stats_dict['chi2_lc_ref'] = [chi2_lc_ref] stats_dict['chi2_vis_ref'] = [chi2_vis_ref] # orig data self-cal to final image obs_sub = self.obs_sc_addcmp chi2_vis_sub = obs_sub.chisq(self.im_addcmp, dtype='vis', ttype=self.ttype, systematic_noise=0., snrcut=1.0, maxset=False, cp_uv_min=self.uv_zblcut) # snrcut to remove large chi2 point stats_dict['chi2_vis_sub'] = [chi2_vis_sub] # orig data with sys noise added + self-cal to final image self.obs_sc_addcmp_sys = eh.selfcal(self.obs_sc_init, self.im_addcmp, method='both', ttype=self.ttype) obs_sys = self.obs_sc_addcmp_sys chi2_cp_sys = obs_sys.chisq(self.im_addcmp, dtype='cphase', ttype=self.ttype, systematic_noise=0., systematic_cphase_noise=0, maxset=False, cp_uv_min=self.uv_zblcut) chi2_lc_sys = obs_sys.chisq(self.im_addcmp, dtype='logcamp', ttype=self.ttype, systematic_noise=0., snrcut=1.0, maxset=False, cp_uv_min=self.uv_zblcut) # snrcut to remove large chi2 point chi2_vis_sys = obs_sys.chisq(self.im_addcmp, dtype='vis', ttype=self.ttype, systematic_noise=0., snrcut=1.0, maxset=False, cp_uv_min=self.uv_zblcut) # snrcut to remove large chi2 point stats_dict['chi2_cp_sys'] = [chi2_cp_sys] stats_dict['chi2_lc_sys'] = [chi2_lc_sys] stats_dict['chi2_vis_sys'] = [chi2_vis_sys] df = pd.DataFrame.from_dict(stats_dict) df.to_csv(outstats)
[docs] def save_params(self): """ Saves a csv file with parameter set details Args: Returns: """ self.paramset['fov'] = self.fov self.paramset['zbl_tot'] = self.zbl_tot df = pd.DataFrame.from_dict([self.paramset]) outparams = os.path.join(self.outpath, '%s_params.csv' % (self.outfile)) df.to_csv(outparams)
[docs] def run(self): """ Run imaging pipeline for one parameter set. Args: Returns: """ # if a *_params.csv file exists, it means this parameter has already been run and can be skipped # useful in case survey with multiple parameter sets gets interrupted outcsv = os.path.join(self.outpath, '%s_params.csv' % (self.outfile)) if not self.overwrite: if os.path.exists(outcsv): pass else: # load in data self.load_data() # do pre-imaging calibration self.preimcal() # create initial/prior image self.init_img() # run imaging step self.make_img() # output the results self.output_results() # save params to text self.save_params() if self.save_stats: self.save_statistics()
[docs]def run_pset(pset, system_kwargs, params_fixed): """ Run imaging for one parameter set. Not to be used individually, but called in map function for run_survey Args: pset (dict): A dict containing single parameter set params_fixed (dict): A dict containing non-varying parameters Returns: """ PSet = ParameterSet(pset, params_fixed) PSet.run()
[docs]def run_survey(psets, params_fixed): """Run survey for all parameter sets using paramsurvey Args: psets (DataFrame): A pandas DataFrame containing all parameter sets params_fixed (dict): A dict containing non-varying parameters Returns: """ # run whole survey using map function paramsurvey.init(backend=params_fixed['backend'], ncores=params_fixed['nproc'], verbose=0,vstats=0) paramsurvey.map(run_pset, psets, user_kwargs=params_fixed, verbose=0)
[docs]def create_params_fixed(infile, outfile_base, outpath, ground_truth_img='None', save_imgsums=False, save_uvfits=True, save_pdf=False, save_stats=True, save_caltab=True, nproc=1, backend='multiprocessing', ttype='nfft', overwrite=False, selfcal=True, gaintol=[0.02,0.2], niter_static=3, blurfrac=1, maxit=100, stop=1e-4, fov=128, npixels=64, reverse_taper_uas=5, uv_zblcut=0.1e9, SEFD_error_budget={'AA':0.1,'AX':0.1,'GL':0.1,'LM':0.1,'MG':0.1,'MM':0.1,'PV':0.1,'SW':0.1}): """Create a dict for all non-varying survery parameters Args: infile (str): path to input uvfits observation file outfile_base (str): name of base filename for all outputs outpath (str): path to directory where all outputs should be stored ground_truth_img (str): if applicable, path to ground truth fits file save_imgsums (bool): save summary pdf for each image save_uvfits (bool): save final self-cal observation to uvfits file save_pdf (bool): save pdf of each image save_stats (bool): save csv file containing statistics for each image save_caltab (bool): save a calibration table for each image nproc (int): number of parallel processes backend (str): either 'multiprocessing' or 'ray' ttype (str): “fast” or “nfft” or “direct” overwrite (bool): if True, write over existing files with same names - else, skip parameter set selfcal (bool): perform self-calibration steps during imaging gaintol (array): tolerance for gains to be under/over unity respectively during self-cal niter_static (int): number of iterations for each imaging step blurfrac (int): factor to blur initial image between iterations maxit (int): maximum number of iterations if image does not converge stop (float): convergence criterion for imaging fov (int): image field of view in uas npixels (int): number of image pixels reverse_taper_uas (int): fwhm of gassuain in uas to reverse taper observation uv_zblcut (float): maximum uv-distance to which is considered short baseline flux SEFD_error_budget (dict): SEFD percentage error for each station Returns: (dict): dict containing all non-varying survery parameters """ # take all arguments and put them in a dict args = list(locals().keys()) params_fixed = {} for arg in args: params_fixed[arg] = locals().get(arg) return params_fixed
[docs]def create_survey_psets(zbl=[0.6], sys_noise=[0.02], avg_time=['scan'], prior_fwhm=[40], sc_phase=[0], xdw_phase=[10], sc_ap=[0], xdw_ap=[1], amp=[0.2], cphase=[1], logcamp=[1], simple=[1], l1=[1], tv=[1], tv2=[1], flux=[1], epsilon_tv=[1e-10]): """Create a dataframe given all survey parameters. Default values will create an example dataframe but these values should be adjusted for each specific observation Args: zbl (array): compact flux value (Jy) sys_noise (array): percent addition of systematic noise avg_time (array): in seconds or 'scan' for scan averaging prior_fwhm (array): fwhm of gaussian prior in uas sc_phase (array): number of iterations to perform phase-only self-cal xdw_phase (array): multiplicative factor for data weights after one round of phase-only self-cal sc_ap (array): number of iterations to perform amp+phase self-cal xdw_ap (array): multiplicative factor for data weights after one round of amp+phase self-cal amp (array): data weight to be placed on amplitudes cphase (array): data weight to be placed on closure phases logcamp (array): data weight to be placed on log closure amplitudes simple (array): regularizer weight for relative entropy, favoring similarity to prior image l1 (array): regularizer weight for l1 norm, favoring image sparsity tv (array): regularizer weight for total variation, favoring sharp edges tv2 (array): regularizer weight for total squared variation, favoring smooth edges flux (array): regularizer weight for total flux density, favoring final images with flux close to zbl epsilon_tv (array): epsilon value used in definition of total variation - rarely need to change this Returns: (DataFrame): pandas DataFrame containing all combination of parameter sets along with index values """ # take all arguments and put them in a dict args = list(locals().keys()) params = {} for arg in args: params[arg] = locals().get(arg) # pandas dataframe containing all combinations of parameters to survey psets = paramsurvey.params.product(params) # add pset index number to each row of dataframe psets['i'] = np.array(range(len(psets))) return psets