Source code for ehtim.obsdata

# obsdata.py
# a interferometric observation class
#
#    Copyright (C) 2018 Andrew Chael
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.


from __future__ import division
from __future__ import print_function

from builtins import str
from builtins import range
from builtins import object

import string
import copy
import numpy as np
import numpy.lib.recfunctions as rec
import matplotlib.pyplot as plt
import scipy.optimize as opt
import scipy.spatial as spatial
import itertools as it
import sys

try:
    import pandas as pd
except ImportError:
    print("Warning: pandas not installed!")
    print("Please install pandas to use statistics package!")


import ehtim.image
import ehtim.io.save
import ehtim.io.load
import ehtim.const_def as ehc
import ehtim.observing.obs_helpers as obsh
import ehtim.statistics.dataframes as ehdf

import warnings
warnings.filterwarnings("ignore",
                        message="Casting complex values to real discards the imaginary part")

RAPOS = 0
DECPOS = 1
RFPOS = 2
BWPOS = 3
DATPOS = 4
TARRPOS = 5

##################################################################################################
# Obsdata object
##################################################################################################


[docs]class Obsdata(object): """A polarimetric VLBI observation of visibility amplitudes and phases (in Jy). Attributes: source (str): The source name ra (float): The source Right Ascension in fractional hours dec (float): The source declination in fractional degrees mjd (int): The integer MJD of the observation tstart (float): The start time of the observation in hours tstop (float): The end time of the observation in hours rf (float): The observation frequency in Hz bw (float): The observation bandwidth in Hz timetype (str): How to interpret tstart and tstop; either 'GMST' or 'UTC' polrep (str): polarization representation, either 'stokes' or 'circ' tarr (numpy.recarray): The array of telescope data with datatype DTARR tkey (dict): A dictionary of rows in the tarr for each site name data (numpy.recarray): the basic data with datatype DTPOL_STOKES or DTPOL_CIRC scantable (numpy.recarray): The array of scan information ampcal (bool): True if amplitudes calibrated phasecal (bool): True if phases calibrated opacitycal (bool): True if time-dependent opacities correctly accounted for in sigmas frcal (bool): True if feed rotation calibrated out of visibilities dcal (bool): True if D terms calibrated out of visibilities amp (numpy.recarray): An array of (averaged) visibility amplitudes bispec (numpy.recarray): An array of (averaged) bispectra cphase (numpy.recarray): An array of (averaged) closure phases cphase_diag (numpy.recarray): An array of (averaged) diagonalized closure phases camp (numpy.recarray): An array of (averaged) closure amplitudes logcamp (numpy.recarray): An array of (averaged) log closure amplitudes logcamp_diag (numpy.recarray): An array of (averaged) diagonalized log closure amps """ def __init__(self, ra, dec, rf, bw, datatable, tarr, scantable=None, polrep='stokes', source=ehc.SOURCE_DEFAULT, mjd=ehc.MJD_DEFAULT, timetype='UTC', ampcal=True, phasecal=True, opacitycal=True, dcal=True, frcal=True, trial_speedups=False): """A polarimetric VLBI observation of visibility amplitudes and phases (in Jy). Args: ra (float): The source Right Ascension in fractional hours dec (float): The source declination in fractional degrees rf (float): The observation frequency in Hz bw (float): The observation bandwidth in Hz datatable (numpy.recarray): the basic data with datatype DTPOL_STOKES or DTPOL_CIRC tarr (numpy.recarray): The array of telescope data with datatype DTARR scantable (numpy.recarray): The array of scan information polrep (str): polarization representation, either 'stokes' or 'circ' source (str): The source name mjd (int): The integer MJD of the observation timetype (str): How to interpret tstart and tstop; either 'GMST' or 'UTC' ampcal (bool): True if amplitudes calibrated phasecal (bool): True if phases calibrated opacitycal (bool): True if time-dependent opacities correctly accounted for in sigmas frcal (bool): True if feed rotation calibrated out of visibilities dcal (bool): True if D terms calibrated out of visibilities Returns: obsdata (Obsdata): an Obsdata object """ if len(datatable) == 0: raise Exception("No data in input table!") if not (datatable.dtype in [ehc.DTPOL_STOKES, ehc.DTPOL_CIRC]): raise Exception("Data table dtype should be DTPOL_STOKES or DTPOL_CIRC") # Polarization Representation if polrep == 'stokes': self.polrep = 'stokes' self.poldict = ehc.POLDICT_STOKES self.poltype = ehc.DTPOL_STOKES elif polrep == 'circ': self.polrep = 'circ' self.poldict = ehc.POLDICT_CIRC self.poltype = ehc.DTPOL_CIRC else: raise Exception("only 'stokes' and 'circ' are supported polreps!") # Set the various observation parameters self.source = str(source) self.ra = float(ra) self.dec = float(dec) self.rf = float(rf) self.bw = float(bw) self.ampcal = bool(ampcal) self.phasecal = bool(phasecal) self.opacitycal = bool(opacitycal) self.dcal = bool(dcal) self.frcal = bool(frcal) if timetype not in ['GMST', 'UTC']: raise Exception("timetype must be 'GMST' or 'UTC'") self.timetype = timetype # Save the data self.data = datatable self.scans = scantable # Telescope array: default ordering is by sefd self.tarr = tarr self.tkey = {self.tarr[i]['site']: i for i in range(len(self.tarr))} if np.any(self.tarr['sefdr'] != 0) or np.any(self.tarr['sefdl'] != 0): self.reorder_tarr_sefd(reorder_baselines=False) # reorder baselines to uvfits convention self.reorder_baselines(trial_speedups=trial_speedups) # Get tstart, mjd and tstop times = self.unpack(['time'])['time'] self.tstart = times[0] self.mjd = int(mjd) self.tstop = times[-1] if self.tstop < self.tstart: self.tstop += 24.0 # Saved closure quantity arrays self.amp = None self.bispec = None self.cphase = None self.cphase_diag = None self.camp = None self.logcamp = None self.logcamp_diag = None @property def tarr(self): return self._tarr @tarr.setter def tarr(self, tarr): self._tarr = tarr self.tkey = {tarr[i]['site']: i for i in range(len(tarr))}
[docs] def obsdata_args(self): """"Copy arguments for making a new Obsdata into a list and dictonary """ arglist = [self.ra, self.dec, self.rf, self.bw, self.data, self.tarr] argdict = {'scantable': self.scans, 'polrep': self.polrep, 'source': self.source, 'mjd': self.mjd, 'timetype': self.timetype, 'ampcal': self.ampcal, 'phasecal': self.phasecal, 'opacitycal': self.opacitycal, 'dcal': self.dcal, 'frcal': self.frcal} return (arglist, argdict)
[docs] def copy(self): """Copy the observation object. Args: Returns: (Obsdata): a copy of the Obsdata object. """ # TODO: Do we want to copy over e.g. closure tables? newobs = copy.deepcopy(self) return newobs
[docs] def switch_timetype(self, timetype_out='UTC'): """Return a new observation with the time type switched Args: timetype (str): "UTC" or "GMST" Returns: (Obsdata): new Obsdata object with potentially different timetype """ if timetype_out not in ['GMST', 'UTC']: raise Exception("timetype_out must be 'GMST' or 'UTC'") out = self.copy() if timetype_out == self.timetype: return out if timetype_out == 'UTC': out.data['time'] = obsh.gmst_to_utc(out.data['time'], out.mjd) if timetype_out == 'GMST': out.data['time'] = obsh.utc_to_gmst(out.data['time'], out.mjd) out.timetype = timetype_out return out
[docs] def switch_polrep(self, polrep_out='stokes', allow_singlepol=True, singlepol_hand='R'): """Return a new observation with the polarization representation changed Args: polrep_out (str): the polrep of the output data allow_singlepol (bool): If True, treat single-polarization data as Stokes I when converting from 'circ' polrep to 'stokes' singlepol_hand (str): 'R' or 'L'; determines which parallel-hand is assumed when converting 'stokes' to 'circ' if only I is present Returns: (Obsdata): new Obsdata object with potentially different polrep """ if polrep_out not in ['stokes', 'circ']: raise Exception("polrep_out must be either 'stokes' or 'circ'") if polrep_out == self.polrep: return self.copy() elif polrep_out == 'stokes': # circ -> stokes data = np.empty(len(self.data), dtype=ehc.DTPOL_STOKES) rrmask = np.isnan(self.data['rrvis']) llmask = np.isnan(self.data['llvis']) for f in ehc.DTPOL_STOKES: f = f[0] if f in ['time', 'tint', 't1', 't2', 'tau1', 'tau2', 'u', 'v']: data[f] = self.data[f] elif f == 'vis': data[f] = 0.5 * (self.data['rrvis'] + self.data['llvis']) elif f == 'qvis': data[f] = 0.5 * (self.data['lrvis'] + self.data['rlvis']) elif f == 'uvis': data[f] = 0.5j * (self.data['lrvis'] - self.data['rlvis']) elif f == 'vvis': data[f] = 0.5 * (self.data['rrvis'] - self.data['llvis']) elif f in ['sigma', 'vsigma']: data[f] = 0.5 * np.sqrt(self.data['rrsigma']**2 + self.data['llsigma']**2) elif f in ['qsigma', 'usigma']: data[f] = 0.5 * np.sqrt(self.data['rlsigma']**2 + self.data['lrsigma']**2) if allow_singlepol: # In cases where only one polarization is present # use it as an estimator for Stokes I data['vis'][rrmask] = self.data['llvis'][rrmask] data['sigma'][rrmask] = self.data['llsigma'][rrmask] data['vis'][llmask] = self.data['rrvis'][llmask] data['sigma'][llmask] = self.data['rrsigma'][llmask] elif polrep_out == 'circ': # stokes -> circ data = np.empty(len(self.data), dtype=ehc.DTPOL_CIRC) Vmask = np.isnan(self.data['vvis']) for f in ehc.DTPOL_CIRC: f = f[0] if f in ['time', 'tint', 't1', 't2', 'tau1', 'tau2', 'u', 'v']: data[f] = self.data[f] elif f == 'rrvis': data[f] = (self.data['vis'] + self.data['vvis']) elif f == 'llvis': data[f] = (self.data['vis'] - self.data['vvis']) elif f == 'rlvis': data[f] = (self.data['qvis'] + 1j * self.data['uvis']) elif f == 'lrvis': data[f] = (self.data['qvis'] - 1j * self.data['uvis']) elif f in ['rrsigma', 'llsigma']: data[f] = np.sqrt(self.data['sigma']**2 + self.data['vsigma']**2) elif f in ['rlsigma', 'lrsigma']: data[f] = np.sqrt(self.data['qsigma']**2 + self.data['usigma']**2) if allow_singlepol: # In cases where only Stokes I is present, copy it to a specified parallel-hand prefix = singlepol_hand.lower() + singlepol_hand.lower() # rr or ll if prefix not in ['rr', 'll']: raise Exception('singlepol_hand must be R or L') data[prefix + 'vis'][Vmask] = self.data['vis'][Vmask] data[prefix + 'sigma'][Vmask] = self.data['sigma'][Vmask] arglist, argdict = self.obsdata_args() arglist[DATPOS] = data argdict['polrep'] = polrep_out newobs = Obsdata(*arglist, **argdict) return newobs
[docs] def reorder_baselines(self, trial_speedups=False): """Reorder baselines to match uvfits convention, based on the telescope array ordering """ if trial_speedups: self.reorder_baselines_trial_speedups() else: # original code # Time partition the datatable datatable = self.data.copy() datalist = [] for key, group in it.groupby(datatable, lambda x: x['time']): #print(key*60*60,len(list(group))) datalist.append(np.array([obs for obs in group])) # loop through all data obsdata = [] for tlist in datalist: blpairs = [] for dat in tlist: # Remove conjugate baselines if not (set((dat['t1'], dat['t2']))) in blpairs: # Reverse the baseline in the right order for uvfits: if(self.tkey[dat['t2']] < self.tkey[dat['t1']]): (dat['t1'], dat['t2']) = (dat['t2'], dat['t1']) (dat['tau1'], dat['tau2']) = (dat['tau2'], dat['tau1']) dat['u'] = -dat['u'] dat['v'] = -dat['v'] if self.polrep == 'stokes': dat['vis'] = np.conj(dat['vis']) dat['qvis'] = np.conj(dat['qvis']) dat['uvis'] = np.conj(dat['uvis']) dat['vvis'] = np.conj(dat['vvis']) elif self.polrep == 'circ': dat['rrvis'] = np.conj(dat['rrvis']) dat['llvis'] = np.conj(dat['llvis']) # must switch l & r !! rl = dat['rlvis'].copy() lr = dat['lrvis'].copy() dat['rlvis'] = np.conj(lr) dat['lrvis'] = np.conj(rl) # You also have to switch the errors for the coherency! rlerr = dat['rlsigma'].copy() lrerr = dat['lrsigma'].copy() dat["rlsigma"] = lrerr dat["lrsigma"] = rlerr else: raise Exception("polrep must be either 'stokes' or 'circ'") # Append the data point blpairs.append(set((dat['t1'], dat['t2']))) obsdata.append(dat) obsdata = np.array(obsdata, dtype=self.poltype) # Timesort data obsdata = obsdata[np.argsort(obsdata, order=['time', 't1'])] # Save the data self.data = obsdata return
[docs] def reorder_baselines_trial_speedups(self): """Reorder baselines to match uvfits convention, based on the telescope array ordering """ dat = self.data.copy() ############ Ensure correct baseline order # TODO can these be faster? t1nums = np.fromiter([self.tkey[t] for t in dat['t1']],int) t2nums = np.fromiter([self.tkey[t] for t in dat['t2']],int) # which entries are in the wrong telescope order? ordermask = t2nums < t1nums # flip the order of these entries t1 = dat['t1'].copy() t2 = dat['t2'].copy() tau1 = dat['tau1'].copy() tau2 = dat['tau2'].copy() dat['t1'][ordermask] = t2[ordermask] dat['t2'][ordermask] = t1[ordermask] dat['tau1'][ordermask] = tau2[ordermask] dat['tau2'][ordermask] = tau1[ordermask] dat['u'][ordermask] *= -1 dat['v'][ordermask] *= -1 if self.polrep=='stokes': dat['vis'][ordermask] = np.conj(dat['vis'][ordermask]) dat['qvis'][ordermask] = np.conj(dat['qvis'][ordermask]) dat['uvis'][ordermask] = np.conj(dat['uvis'][ordermask]) dat['vvis'][ordermask] = np.conj(dat['vvis'][ordermask]) elif self.polrep == 'circ': dat['rrvis'][ordermask] = np.conj(dat['rrvis'][ordermask]) dat['llvis'][ordermask] = np.conj(dat['llvis'][ordermask]) rl = dat['rlvis'].copy() lr = dat['lrvis'].copy() dat['rlvis'][ordermask] = np.conj(lr[ordermask]) dat['lrvis'][ordermask] = np.conj(rl[ordermask]) # Also need to switch error matrix rle = dat['rlsigma'].copy() lre = dat['lrsigma'].copy() dat['rlsigma'][ordermask] = lre[ordermask] dat['lrsigma'][ordermask] = rle[ordermask] else: raise Exception("polrep must be either 'stokes' or 'circ'") # Remove duplicate or conjugate entries at any timestep # Since telescope order has been sorted conjugates should appear as duplicates timeblcombos = np.vstack((dat['time'],t1nums,t2nums)).T uniqdat, uniqdatinv = np.unique(timeblcombos,axis=0, return_inverse=True) if len(uniqdat) != len(dat): print("WARNING: removing duplicate/conjuagte points in reorder_baselines!") deletemask = np.ones(len(dat)).astype(bool) for j in len(uniqdat): idxs = np.argwhere(uniqdatinv==j)[:,0] for idx in idxs[1:]: # delete all but first occurance deletemask[idx] = False # remove duplicates dat_unique = dat[deletemask] # sort data dat = dat[np.argsort(dat, order=['time', 't1'])] # save the data self.data = dat return
[docs] def reorder_tarr_sefd(self, reorder_baselines=True): """Reorder the telescope array by SEFD minimal to maximum. """ sorted_list = sorted(self.tarr, key=lambda x: np.sqrt(x['sefdr']**2 + x['sefdl']**2)) self.tarr = np.array(sorted_list, dtype=ehc.DTARR) self.tkey = {self.tarr[i]['site']: i for i in range(len(self.tarr))} if reorder_baselines: self.reorder_baselines() return
[docs] def reorder_tarr_snr(self, reorder_baselines=True): """Reorder the telescope array by median SNR maximal to minimal. """ snr = self.unpack(['t1', 't2', 'snr']) snr_median = [np.median(snr[(snr['t1'] == site) + (snr['t2'] == site)]['snr']) for site in self.tarr['site']] idx = np.argsort(snr_median)[::-1] self.tarr = self.tarr[idx] self.tkey = {self.tarr[i]['site']: i for i in range(len(self.tarr))} if reorder_baselines: self.reorder_baselines() return
[docs] def reorder_tarr_random(self, reorder_baselines=True): """Randomly reorder the telescope array. """ idx = np.arange(len(self.tarr)) np.random.shuffle(idx) self.tarr = self.tarr[idx] self.tkey = {self.tarr[i]['site']: i for i in range(len(self.tarr))} if reorder_baselines: self.reorder_baselines() return
[docs] def data_conj(self): """Make a data array including all conjugate baselines. Args: Returns: (numpy.recarray): a copy of the Obsdata.data table including all conjugate baselines. """ data = np.empty(2 * len(self.data), dtype=self.poltype) # Add the conjugate baseline data for f in self.poltype: f = f[0] if f in ['t1', 't2', 'tau1', 'tau2']: if f[-1] == '1': f2 = f[:-1] + '2' else: f2 = f[:-1] + '1' data[f] = np.hstack((self.data[f], self.data[f2])) elif f in ['u', 'v']: data[f] = np.hstack((self.data[f], -self.data[f])) elif f in [self.poldict['vis1'], self.poldict['vis2'], self.poldict['vis3'], self.poldict['vis4']]: if self.polrep == 'stokes': data[f] = np.hstack((self.data[f], np.conj(self.data[f]))) elif self.polrep == 'circ': if f in ['rrvis', 'llvis']: data[f] = np.hstack((self.data[f], np.conj(self.data[f]))) elif f == 'rlvis': data[f] = np.hstack((self.data['rlvis'], np.conj(self.data['lrvis']))) elif f == 'lrvis': data[f] = np.hstack((self.data['lrvis'], np.conj(self.data['rlvis']))) # ALSO SWITCH THE ERRORS! else: raise Exception("polrep must be either 'stokes' or 'circ'") # The conjugate baselines need the transpose error terms. elif f == "rlsigma": data[f] = np.hstack((self.data["rlsigma"], self.data["lrsigma"])) elif f == "lrsigma": data[f] = np.hstack((self.data["lrsigma"], self.data["rlsigma"])) else: data[f] = np.hstack((self.data[f], self.data[f])) # Sort the data by time data = data[np.argsort(data['time'])] return data
[docs] def tlist(self, conj=False, t_gather=0., scan_gather=False): """Group the data in a list of equal time observation datatables. Args: conj (bool): True if tlist_out includes conjugate baselines. t_gather (float): Grouping timescale (in seconds). 0.0 indicates no grouping. scan_gather (bool): If true, gather data into scans Returns: (list): a list of data tables containing time-partitioned data """ if conj: data = self.data_conj() else: data = self.data # partition the data by time datalist = [] if t_gather <= 0.0 and not scan_gather: # Only group measurements at the same time for key, group in it.groupby(data, lambda x: x['time']): datalist.append(np.array([obs for obs in group])) elif t_gather > 0.0 and not scan_gather: # Group measurements in time for key, group in it.groupby(data, lambda x: int(x['time'] / (t_gather / 3600.0))): datalist.append(np.array([obs for obs in group])) else: # Group measurements by scan if ((self.scans is None) or np.any([scan is None for scan in self.scans]) or len(self.scans) == 0): print("No scan table in observation. Adding scan table before gathering...") self.add_scans() for key, group in it.groupby( data, lambda x: np.searchsorted(self.scans[:, 0], x['time'])): datalist.append(np.array([obs for obs in group])) return np.array(datalist, dtype=object)
[docs] def split_obs(self, t_gather=0., scan_gather=False): """Split single observation into multiple observation files, one per scan.. Args: t_gather (float): Grouping timescale (in seconds). 0.0 indicates no grouping. scan_gather (bool): If true, gather data into scans Returns: (list): list of single-scan Obsdata objects """ tlist = self.tlist(t_gather=t_gather, scan_gather=scan_gather) print("Splitting Observation File into " + str(len(tlist)) + " times") arglist, argdict = self.obsdata_args() # note that the tarr of the output includes all sites, # even those that don't participate in the scan splitlist = [] for tdata in tlist: arglist[DATPOS] = tdata splitlist.append(Obsdata(*arglist, **argdict)) return splitlist
[docs] def getClosestScan(self, time, splitObs=None): """Split observation by scan and grab scan closest to timestamp Args: time (float): Time (GMST) you want to find the scan closest to splitObs (bool): a list of Obsdata objects, output from split_obs, to save time Returns: (Obsdata): Obsdata object composed of scan closest to time """ ## check if splitObs has been passed in alread ## if splitObs is None: splitObs = self.split_obs() ## check for the scan with the closest start time to time arg ## ## TODO: allow user to choose start time, end time, or mid-time closest_index = 0 delta_t = 1e22 for s, s_obs in enumerate(splitObs): dt = abs(s_obs.tstart - time) if dt < delta_t: delta_t = dt closest_index = s print(f"Using scan with time {splitObs[closest_index].tstart}.") return splitObs[closest_index]
[docs] def bllist(self, conj=False): """Group the data in a list of same baseline datatables. Args: conj (bool): True if tlist_out includes conjugate baselines. Returns: (list): a list of data tables containing baseline-partitioned data """ if conj: data = self.data_conj() else: data = self.data # partition the data by baseline datalist = [] idx = np.lexsort((data['t2'], data['t1'])) for key, group in it.groupby(data[idx], lambda x: set((x['t1'], x['t2']))): datalist.append(np.array([obs for obs in group])) return np.array(datalist, dtype=object)
[docs] def unpack_bl(self, site1, site2, fields, ang_unit='deg', debias=False, timetype=False): """Unpack the data over time on the selected baseline site1-site2. Args: site1 (str): First site name site2 (str): Second site name fields (list): list of unpacked quantities from available quantities in FIELDS ang_unit (str): 'deg' for degrees and 'rad' for radian phases debias (bool): True to debias visibility amplitudes timetype (str): 'GMST' or 'UTC' changes what is returned for 'time' Returns: (numpy.recarray): unpacked numpy array with data in fields requested """ if timetype is False: timetype = self.timetype # If we only specify one field if timetype not in ['GMST', 'UTC', 'utc', 'gmst']: raise Exception("timetype should be 'GMST' or 'UTC'!") allfields = ['time'] if not isinstance(fields, list): allfields.append(fields) else: for i in range(len(fields)): allfields.append(fields[i]) # Get the data from data table on the selected baseline allout = [] tlist = self.tlist(conj=True) for scan in tlist: for obs in scan: if (obs['t1'], obs['t2']) == (site1, site2): obs = np.array([obs]) out = self.unpack_dat(obs, allfields, ang_unit=ang_unit, debias=debias, timetype=timetype) allout.append(out) return np.array(allout)
[docs] def unpack(self, fields, mode='all', ang_unit='deg', debias=False, conj=False, timetype=False): """Unpack the data for the whole observation . Args: fields (list): list of unpacked quantities from availalbe quantities in FIELDS mode (str): 'all' returns all data in single table, 'time' groups output by equal time, 'bl' groups by baseline ang_unit (str): 'deg' for degrees and 'rad' for radian phases debias (bool): True to debias visibility amplitudes conj (bool): True to include conjugate baselines timetype (str): 'GMST' or 'UTC' changes what is returned for 'time' Returns: (numpy.recarray): unpacked numpy array with data in fields requested """ if mode not in ('time', 'all', 'bl'): raise Exception("possible options for mode are 'time', 'all' and 'bl'") # If we only specify one field if not isinstance(fields, list): fields = [fields] if mode == 'all': if conj: data = self.data_conj() else: data = self.data allout = self.unpack_dat(data, fields, ang_unit=ang_unit, debias=debias, timetype=timetype) elif mode == 'time': allout = [] tlist = self.tlist(conj=True) for scan in tlist: out = self.unpack_dat(scan, fields, ang_unit=ang_unit, debias=debias, timetype=timetype) allout.append(out) elif mode == 'bl': allout = [] bllist = self.bllist() for bl in bllist: out = self.unpack_dat(bl, fields, ang_unit=ang_unit, debias=debias, timetype=timetype) allout.append(out) return allout
[docs] def unpack_dat(self, data, fields, conj=False, ang_unit='deg', debias=False, timetype=False): """Unpack the data in a passed data recarray. Args: data (numpy.recarray): data recarray of format DTPOL_STOKES or DTPOL_CIRC fields (list): list of unpacked quantities from availalbe quantities in FIELDS conj (bool): True to include conjugate baselines ang_unit (str): 'deg' for degrees and 'rad' for radian phases debias (bool): True to debias visibility amplitudes timetype (str): 'GMST' or 'UTC' changes what is returned for 'time' Returns: (numpy.recarray): unpacked numpy array with data in fields requested """ if ang_unit == 'deg': angle = ehc.DEGREE else: angle = 1.0 # If we only specify one field if isinstance(fields, str): fields = [fields] if not timetype: timetype = self.timetype if timetype not in ['GMST', 'UTC', 'gmst', 'utc']: raise Exception("timetype should be 'GMST' or 'UTC'!") # Get field data allout = [] for field in fields: if field in ["time", "time_utc", "time_gmst"]: out = data['time'] ty = 'f8' elif field in ["u", "v", "tint", "tau1", "tau2"]: out = data[field] ty = 'f8' elif field in ["uvdist"]: out = np.abs(data['u'] + 1j * data['v']) ty = 'f8' elif field in ["t1", "el1", "par_ang1", "hr_ang1"]: sites = data["t1"] keys = [self.tkey[site] for site in sites] tdata = self.tarr[keys] out = sites ty = 'U32' elif field in ["t2", "el2", "par_ang2", "hr_ang2"]: sites = data["t2"] keys = [self.tkey[site] for site in sites] tdata = self.tarr[keys] out = sites ty = 'U32' elif field in ['vis', 'amp', 'phase', 'snr', 'sigma', 'sigma_phase']: ty = 'c16' if self.polrep == 'stokes': out = data['vis'] sig = data['sigma'] elif self.polrep == 'circ': out = 0.5 * (data['rrvis'] + data['llvis']) sig = 0.5 * np.sqrt(data['rrsigma']**2 + data['llsigma']**2) elif field in ['qvis', 'qamp', 'qphase', 'qsnr', 'qsigma', 'qsigma_phase']: ty = 'c16' if self.polrep == 'stokes': out = data['qvis'] sig = data['qsigma'] elif self.polrep == 'circ': out = 0.5 * (data['lrvis'] + data['rlvis']) sig = 0.5 * np.sqrt(data['lrsigma']**2 + data['rlsigma']**2) elif field in ['uvis', 'uamp', 'uphase', 'usnr', 'usigma', 'usigma_phase']: ty = 'c16' if self.polrep == 'stokes': out = data['uvis'] sig = data['usigma'] elif self.polrep == 'circ': out = 0.5j * (data['lrvis'] - data['rlvis']) sig = 0.5 * np.sqrt(data['lrsigma']**2 + data['rlsigma']**2) elif field in ['vvis', 'vamp', 'vphase', 'vsnr', 'vsigma', 'vsigma_phase']: ty = 'c16' if self.polrep == 'stokes': out = data['vvis'] sig = data['vsigma'] elif self.polrep == 'circ': out = 0.5 * (data['rrvis'] - data['llvis']) sig = 0.5 * np.sqrt(data['rrsigma']**2 + data['llsigma']**2) elif field in ['pvis', 'pamp', 'pphase', 'psnr', 'psigma', 'psigma_phase']: ty = 'c16' if self.polrep == 'stokes': out = data['qvis'] + 1j * data['uvis'] sig = np.sqrt(data['qsigma']**2 + data['usigma']**2) elif self.polrep == 'circ': out = data['rlvis'] sig = data['rlsigma'] elif field in ['m', 'mamp', 'mphase', 'msnr', 'msigma', 'msigma_phase']: ty = 'c16' if self.polrep == 'stokes': out = (data['qvis'] + 1j * data['uvis']) / data['vis'] sig = obsh.merr(data['sigma'], data['qsigma'], data['usigma'], data['vis'], out) elif self.polrep == 'circ': out = 2 * data['rlvis'] / (data['rrvis'] + data['llvis']) sig = obsh.merr2(data['rlsigma'], data['rrsigma'], data['llsigma'], 0.5 * (data['rrvis'] + data['llvis']), out) elif field in ['evis', 'eamp', 'ephase', 'esnr', 'esigma', 'esigma_phase']: ty = 'c16' ang = np.arctan2(data['u'], data['v']) # TODO: correct convention EofN? if self.polrep == 'stokes': q = data['qvis'] u = data['uvis'] qsig = data['qsigma'] usig = data['usigma'] elif self.polrep == 'circ': q = 0.5 * (data['lrvis'] + data['rlvis']) u = 0.5j * (data['lrvis'] - data['rlvis']) qsig = 0.5 * np.sqrt(data['lrsigma']**2 + data['rlsigma']**2) usig = qsig out = (np.cos(2 * ang) * q + np.sin(2 * ang) * u) sig = np.sqrt(0.5 * ((np.cos(2 * ang) * qsig)**2 + (np.sin(2 * ang) * usig)**2)) elif field in ['bvis', 'bamp', 'bphase', 'bsnr', 'bsigma', 'bsigma_phase']: ty = 'c16' ang = np.arctan2(data['u'], data['v']) # TODO: correct convention EofN? if self.polrep == 'stokes': q = data['qvis'] u = data['uvis'] qsig = data['qsigma'] usig = data['usigma'] elif self.polrep == 'circ': q = 0.5 * (data['lrvis'] + data['rlvis']) u = 0.5j * (data['lrvis'] - data['rlvis']) qsig = 0.5 * np.sqrt(data['lrsigma']**2 + data['rlsigma']**2) usig = qsig out = (-np.sin(2 * ang) * q + np.cos(2 * ang) * u) sig = np.sqrt(0.5 * ((np.sin(2 * ang) * qsig)**2 + (np.cos(2 * ang) * usig)**2)) elif field in ['rrvis', 'rramp', 'rrphase', 'rrsnr', 'rrsigma', 'rrsigma_phase']: ty = 'c16' if self.polrep == 'stokes': out = data['vis'] + data['vvis'] sig = np.sqrt(data['sigma']**2 + data['vsigma']**2) elif self.polrep == 'circ': out = data['rrvis'] sig = data['rrsigma'] elif field in ['llvis', 'llamp', 'llphase', 'llsnr', 'llsigma', 'llsigma_phase']: ty = 'c16' if self.polrep == 'stokes': out = data['vis'] - data['vvis'] sig = np.sqrt(data['sigma']**2 + data['vsigma']**2) elif self.polrep == 'circ': out = data['llvis'] sig = data['llsigma'] elif field in ['rlvis', 'rlamp', 'rlphase', 'rlsnr', 'rlsigma', 'rlsigma_phase']: ty = 'c16' if self.polrep == 'stokes': out = data['qvis'] + 1j * data['uvis'] sig = np.sqrt(data['qsigma']**2 + data['usigma']**2) elif self.polrep == 'circ': out = data['rlvis'] sig = data['rlsigma'] elif field in ['lrvis', 'lramp', 'lrphase', 'lrsnr', 'lrsigma', 'lrsigma_phase']: ty = 'c16' if self.polrep == 'stokes': out = data['qvis'] - 1j * data['uvis'] sig = np.sqrt(data['qsigma']**2 + data['usigma']**2) elif self.polrep == 'circ': out = data['lrvis'] sig = data['lrsigma'] elif field in ['rrllvis', 'rrllamp', 'rrllphase', 'rrllsnr', 'rrllsigma', 'rrllsigma_phase']: ty = 'c16' if self.polrep == 'stokes': out = (data['vis'] + data['vvis']) / (data['vis'] - data['vvis']) sig = (2.0**0.5 * (np.abs(data['vis'])**2 + np.abs(data['vvis'])**2)**0.5 / np.abs(data['vis'] - data['vvis'])**2 * (data['sigma']**2 + data['vsigma']**2)**0.5) elif self.polrep == 'circ': out = data['rrvis'] / data['llvis'] sig = np.sqrt(np.abs(data['rrsigma'] / data['llvis'])**2 + np.abs(data['llsigma'] * data['rrvis'] / data['llvis'])**2) else: raise Exception("%s is not a valid field \n" % field + "valid field values are: " + ' '.join(ehc.FIELDS)) if field in ["time_utc"] and self.timetype == 'GMST': out = obsh.gmst_to_utc(out, self.mjd) if field in ["time_gmst"] and self.timetype == 'UTC': out = obsh.utc_to_gmst(out, self.mjd) if field in ["time"] and self.timetype == 'GMST' and timetype == 'UTC': out = obsh.gmst_to_utc(out, self.mjd) if field in ["time"] and self.timetype == 'UTC' and timetype == 'GMST': out = obsh.utc_to_gmst(out, self.mjd) # Compute elevation and parallactic angles if field in ["el1", "el2", "hr_ang1", "hr_ang2", "par_ang1", "par_ang2"]: if self.timetype == 'GMST': times_sid = data['time'] else: times_sid = obsh.utc_to_gmst(data['time'], self.mjd) thetas = np.mod((times_sid - self.ra) * ehc.HOUR, 2 * np.pi) coords = obsh.recarr_to_ndarr(tdata[['x', 'y', 'z']], 'f8') el_angle = obsh.elev(obsh.earthrot(coords, thetas), self.sourcevec()) latlon = obsh.xyz_2_latlong(coords) hr_angles = obsh.hr_angle(times_sid * ehc.HOUR, latlon[:, 1], self.ra * ehc.HOUR) if field in ["el1", "el2"]: out = el_angle / angle ty = 'f8' if field in ["hr_ang1", "hr_ang2"]: out = hr_angles / angle ty = 'f8' if field in ["par_ang1", "par_ang2"]: par_ang = obsh.par_angle(hr_angles, latlon[:, 0], self.dec * ehc.DEGREE) out = par_ang / angle ty = 'f8' # Get arg/amps/snr if field in ["amp", "qamp", "uamp", "vamp", "pamp", "mamp", "bamp", "eamp", "rramp", "llamp", "rlamp", "lramp", "rrllamp"]: out = np.abs(out) if debias: out = obsh.amp_debias(out, sig) ty = 'f8' elif field in ["sigma", "qsigma", "usigma", "vsigma", "psigma", "msigma", "bsigma", "esigma", "rrsigma", "llsigma", "rlsigma", "lrsigma", "rrllsigma"]: out = np.abs(sig) ty = 'f8' elif field in ["phase", "qphase", "uphase", "vphase", "pphase", "bphase", "ephase", "mphase", "rrphase", "llphase", "lrphase", "rlphase", "rrllphase"]: out = np.angle(out) / angle ty = 'f8' elif field in ["sigma_phase", "qsigma_phase", "usigma_phase", "vsigma_phase", "psigma_phase", "msigma_phase", "bsigma_phase", "esigma_phase", "rrsigma_phase", "llsigma_phase", "rlsigma_phase", "lrsigma_phase", "rrllsigma_phase"]: out = np.abs(sig) / np.abs(out) / angle ty = 'f8' elif field in ["snr", "qsnr", "usnr", "vsnr", "psnr", "bsnr", "esnr", "msnr", "rrsnr", "llsnr", "rlsnr", "lrsnr", "rrllsnr"]: out = np.abs(out) / np.abs(sig) ty = 'f8' # Reshape and stack with other fields out = np.array(out, dtype=[(field, ty)]) if len(allout) > 0: allout = rec.merge_arrays((allout, out), asrecarray=True, flatten=True) else: allout = out return allout
[docs] def sourcevec(self): """Return the source position vector in geocentric coordinates at 0h GMST. Args: Returns: (numpy.array): normal vector pointing to source in geocentric coordinates (m) """ sourcevec = np.array([np.cos(self.dec * ehc.DEGREE), 0, np.sin(self.dec * ehc.DEGREE)]) return sourcevec
[docs] def res(self): """Return the nominal resolution (1/longest baseline) of the observation in radians. Args: Returns: (float): normal array resolution in radians """ res = 1.0 / np.max(self.unpack('uvdist')['uvdist']) return res
[docs] def chisq(self, im_or_mov, dtype='vis', pol='I', ttype='nfft', mask=[], **kwargs): """Give the reduced chi^2 of the observation for the specified image and datatype. Args: im_or_mov (Image or Movie): image or movie object on which to test chi^2 dtype (str): data type of chi^2 (e.g., 'vis', 'amp', 'bs', 'cphase') pol (str): polarization type ('I', 'Q', 'U', 'V', 'LL', 'RR', 'LR', or 'RL' mask (arr): mask of same dimension as im.imvec ttype (str): "fast" or "nfft" or "direct" fft_pad_factor (float): zero pad the image to (fft_pad_factor * image size) in FFT conv_func ('str'): The convolving function for gridding; 'gaussian', 'pill','cubic' p_rad (int): The pixel radius for the convolving function order ('str'): Interpolation order for sampling the FFT systematic_noise (float): adds a fractional systematic noise tolerance to sigmas snrcut (float): a snr cutoff for including data in the chi^2 sum debias (bool): if True then apply debiasing to amplitudes/closure amplitudes weighting (str): 'natural' or 'uniform' systematic_cphase_noise (float): a value in degrees to add to closure phase sigmas cp_uv_min (float): flag short baselines before forming closure quantities maxset (bool): if True, use maximal set instead of minimal for closure quantities Returns: (float): image chi^2 """ if dtype not in ['vis', 'bs', 'amp', 'cphase', 'cphase_diag', 'camp', 'logcamp', 'logcamp_diag', 'm']: raise Exception("%s is not a supported dterms!" % dtype) # TODO -- should import this at top, but the circular dependencies create a mess... import ehtim.imaging.imager_utils as iu import ehtim.modeling.modeling_utils as mu # Movie -- weighted sum of all frame chi^2 values if hasattr(im_or_mov, 'get_image'): mov = im_or_mov obs_list = self.split_obs() chisq_list = [] num_list = [] for ii, obs in enumerate(obs_list): im = mov.get_image(obs.data[0]['time']) # Get image at the observation time if pol not in im._imdict.keys(): raise Exception(pol + ' is not in the current image.' + ' Consider changing the polarization basis of the image.') try: (data, sigma, A) = iu.chisqdata(obs, im, mask, dtype, pol=pol, ttype=ttype, **kwargs) except IndexError: # Not enough data to form closures! continue imvec = im._imdict[pol] if len(mask) > 0 and np.any(np.invert(mask)): imvec = imvec[mask] chisq_list.append(iu.chisq(imvec, A, data, sigma, dtype, ttype=ttype, mask=mask)) num_list.append(len(data)) chisq = np.sum(np.array(num_list) * np.array(chisq_list)) / np.sum(num_list) # Model -- single chi^2 elif hasattr(im_or_mov,'N_models'): (data, sigma, uv, jonesdict) = mu.chisqdata(self, dtype, pol, **kwargs) chisq = mu.chisq(im_or_mov, uv, data, sigma, dtype, jonesdict) # Image -- single chi^2 else: im = im_or_mov if pol not in im._imdict.keys(): raise Exception(pol + ' is not in the current image.' + ' Consider changing the polarization basis of the image.') (data, sigma, A) = iu.chisqdata(self, im, mask, dtype, pol=pol, ttype=ttype, **kwargs) imvec = im._imdict[pol] if len(mask) > 0 and np.any(np.invert(mask)): imvec = imvec[mask] chisq = iu.chisq(imvec, A, data, sigma, dtype, ttype=ttype, mask=mask) return chisq
[docs] def polchisq(self, im, dtype='pvis', ttype='nfft', pol_trans=True, mask=[], **kwargs): """Give the reduced chi^2 for the specified image and polarimetric datatype. Args: im (Image): image to test polarimetric chi^2 dtype (str): data type of polarimetric chi^2 ('pvis','m','pbs') pol (str): polarization type ('I', 'Q', 'U', 'V', 'LL', 'RR', 'LR', or 'RL' mask (arr): mask of same dimension as im.imvec ttype (str): if "fast" or "nfft" or "direct" pol_trans (bool): True for I,m,chi, False for IQU fft_pad_factor (float): zero pad the image to (fft_pad_factor * image size) in FFT conv_func ('str'): The convolving function for gridding; 'gaussian', 'pill','cubic' p_rad (int): The pixel radius for the convolving function order ('str'): Interpolation order for sampling the FFT systematic_noise (float): adds a fractional systematic noise tolerance to sigmas snrcut (float): a snr cutoff for including data in the chi^2 sum debias (bool): if True then apply debiasing to amplitudes/closure amplitudes weighting (str): 'natural' or 'uniform' systematic_cphase_noise (float): value in degrees to add to closure phase sigmas cp_uv_min (float): flag short baselines before forming closure quantities maxset (bool): if True, use maximal set instead of minimal for closure quantities Returns: (float): image chi^2 """ if dtype not in ['pvis', 'm', 'pbs','vvis']: raise Exception("Only supported polarimetric dterms are 'pvis','m, 'pbs','vvis'!") # TODO -- should import this at top, but the circular dependencies create a mess... import ehtim.imaging.pol_imager_utils as piu # Unpack the necessary polarimetric data (data, sigma, A) = piu.polchisqdata(self, im, mask, dtype, ttype=ttype, **kwargs) # Pack the comparison image in the proper format imstokes = im.switch_polrep(polrep_out='stokes', pol_prim_out='I') if pol_trans: ivec = imstokes.imvec mvec = (np.abs(imstokes.qvec + 1j * imstokes.uvec) / ivec) chivec = np.angle(imstokes.qvec + 1j * imstokes.uvec) / 2 vvec = imstokes.vvec/ivec if len(mask) > 0 and np.any(np.invert(mask)): ivec = ivec[mask] mvec = mvec[mask] chivec = chivec[mask] vvec = vvec[mask] imtuple = np.array((ivec, mvec, chivec,vvec)) else: ivec = imstokes.imvec qvec = imstokes.qvec uvec = imstokes.uvec vvec = imstokes.vvec if len(mask) > 0 and np.any(np.invert(mask)): ivec = ivec[mask] qvec = qvec[mask] uvec = uvec[mask] vvec = vvec[mask] imtuple = np.array((ivec, qvec, uvec,vvec)) # Calculate the chi^2 chisq = piu.polchisq(imtuple, A, data, sigma, dtype, ttype=ttype, mask=mask, pol_trans=pol_trans) return chisq
[docs] def recompute_uv(self): """Recompute u,v points using observation times and metadata Args: Returns: (Obsdata): New Obsdata object containing the same data with recomputed u,v points """ times = self.data['time'] site1 = self.data['t1'] site2 = self.data['t2'] arr = ehtim.array.Array(self.tarr) print("Recomputing U,V Points using MJD %d \n RA %e \n DEC %e \n RF %e GHz" % (self.mjd, self.ra, self.dec, self.rf / 1.e9)) (timesout, uout, vout) = obsh.compute_uv_coordinates(arr, site1, site2, times, self.mjd, self.ra, self.dec, self.rf, timetype=self.timetype, elevmin=0, elevmax=90, no_elevcut_space=False) if len(timesout) != len(times): raise Exception( "len(timesout) != len(times) in recompute_uv: check elevation limits!!") datatable = self.data.copy() datatable['u'] = uout datatable['v'] = vout arglist, argdict = self.obsdata_args() arglist[DATPOS] = np.array(datatable) out = Obsdata(*arglist, **argdict) return out
[docs] def avg_coherent(self, inttime, scan_avg=False, moving=False): """Coherently average data along u,v tracks in chunks of length inttime (sec) Args: inttime (float): coherent integration time in seconds scan_avg (bool): if True, average over scans in self.scans instead of intime moving (bool): averaging with moving window (boxcar width in seconds) Returns: (Obsdata): Obsdata object containing averaged data """ if (scan_avg) and (getattr(self.scans, "shape", None) is None or len(self.scans) == 0): print('No scan data, ignoring scan_avg!') scan_avg = False if inttime <= 0.0 and scan_avg is False: print('No averaging done!') return self.copy() if moving: vis_avg = ehdf.coh_moving_avg_vis(self, dt=inttime, return_type='rec') else: vis_avg = ehdf.coh_avg_vis(self, dt=inttime, return_type='rec', err_type='predicted', scan_avg=scan_avg) arglist, argdict = self.obsdata_args() arglist[DATPOS] = vis_avg out = Obsdata(*arglist, **argdict) return out
[docs] def avg_incoherent(self, inttime, scan_avg=False, debias=True, err_type='predicted'): """Incoherently average data along u,v tracks in chunks of length inttime (sec) Args: inttime (float): incoherent integration time in seconds scan_avg (bool): if True, average over scans in self.scans instead of intime debias (bool): if True, debias the averaged amplitudes err_type (str): 'predicted' or 'measured' Returns: (Obsdata): Obsdata object containing averaged data """ print('Incoherently averaging data, putting phases to zero!') amp_rec = ehdf.incoh_avg_vis(self, dt=inttime, debias=debias, scan_avg=scan_avg, return_type='rec', rec_type='vis', err_type=err_type) arglist, argdict = self.obsdata_args() arglist[DATPOS] = amp_rec out = Obsdata(*arglist, **argdict) return out
[docs] def add_amp(self, avg_time=0, scan_avg=False, debias=True, err_type='predicted', return_type='rec', round_s=0.1, snrcut=0.): """Adds attribute self.amp: aan amplitude table with incoherently averaged amplitudes Args: avg_time (float): incoherent integration time in seconds scan_avg (bool): if True, average over scans in self.scans instead of intime debias (bool): if True then apply debiasing err_type (str): 'predicted' or 'measured' return_type: data frame ('df') or recarray ('rec') round_s (float): accuracy of datetime object in seconds snrcut (float): flag amplitudes with snr lower than this """ # Get the spacing between datapoints in seconds if len(set([x[0] for x in list(self.unpack('time'))])) > 1: tint0 = np.min(np.diff(np.asarray(sorted(list(set( [x[0] for x in list(self.unpack('time'))])))))) * 3600. else: tint0 = 0.0 if avg_time <= tint0: adf = ehdf.make_amp(self, debias=debias, round_s=round_s) if return_type == 'rec': adf = ehdf.df_to_rec(adf, 'amp') print("Updated self.amp: no averaging") else: adf = ehdf.incoh_avg_vis(self, dt=avg_time, debias=debias, scan_avg=scan_avg, return_type=return_type, rec_type='amp', err_type=err_type) # snr cut adf = adf[adf['amp'] / adf['sigma'] > snrcut] self.amp = adf print("Updated self.amp: avg_time %f s\n" % avg_time) return
[docs] def add_bispec(self, avg_time=0, return_type='rec', count='max', snrcut=0., err_type='predicted', num_samples=1000, round_s=0.1, uv_min=False): """Adds attribute self.bispec: bispectra table with bispectra averaged for dt Args: avg_time (float): bispectrum averaging timescale return_type: data frame ('df') or recarray ('rec') count (str): If 'min', return minimal set of bispectra, if 'max' return all bispectra up to reordering err_type (str): 'predicted' or 'measured' num_samples: number of bootstrap (re)samples if measuring error round_s (float): accuracy of datetime object in seconds snrcut (float): flag bispectra with snr lower than this """ # Get spacing between datapoints in seconds if len(set([x[0] for x in list(self.unpack('time'))])) > 1: tint0 = np.min(np.diff(np.asarray(sorted(list(set( [x[0] for x in list(self.unpack('time'))])))))) * 3600. else: tint0 = 0 if avg_time > tint0: cdf = ehdf.make_bsp_df(self, mode='all', round_s=round_s, count=count, snrcut=0., uv_min=uv_min) cdf = ehdf.average_bispectra(cdf, avg_time, return_type=return_type, num_samples=num_samples, snrcut=snrcut) else: cdf = ehdf.make_bsp_df(self, mode='all', round_s=round_s, count=count, snrcut=snrcut, uv_min=uv_min) print("Updated self.bispec: no averaging") if return_type == 'rec': cdf = ehdf.df_to_rec(cdf, 'bispec') self.bispec = cdf print("Updated self.bispec: avg_time %f s\n" % avg_time) return
[docs] def add_cphase(self, avg_time=0, return_type='rec', count='max', snrcut=0., err_type='predicted', num_samples=1000, round_s=0.1, uv_min=False): """Adds attribute self.cphase: cphase table averaged for dt Args: avg_time (float): closure phase averaging timescale return_type: data frame ('df') or recarray ('rec') count (str): If 'min', return minimal set of phases, if 'max' return all closure phases up to reordering err_type (str): 'predicted' or 'measured' num_samples: number of bootstrap (re)samples if measuring error round_s (float): accuracy of datetime object in seconds snrcut (float): flag closure phases with snr lower than this """ # Get spacing between datapoints in seconds if len(set([x[0] for x in list(self.unpack('time'))])) > 1: tint0 = np.min(np.diff(np.asarray(sorted(list(set( [x[0] for x in list(self.unpack('time'))])))))) * 3600. else: tint0 = 0 if avg_time > tint0: cdf = ehdf.make_cphase_df(self, mode='all', round_s=round_s, count=count, snrcut=0., uv_min=uv_min) cdf = ehdf.average_cphases(cdf, avg_time, return_type=return_type, err_type=err_type, num_samples=num_samples, snrcut=snrcut) else: cdf = ehdf.make_cphase_df(self, mode='all', round_s=round_s, count=count, snrcut=snrcut, uv_min=uv_min) if return_type == 'rec': cdf = ehdf.df_to_rec(cdf, 'cphase') print("Updated self.cphase: no averaging") self.cphase = cdf print("updated self.cphase: avg_time %f s\n" % avg_time) return
[docs] def add_cphase_diag(self, avg_time=0, return_type='rec', vtype='vis', count='min', snrcut=0., err_type='predicted', num_samples=1000, round_s=0.1, uv_min=False): """Adds attribute self.cphase_diag: cphase_diag table averaged for dt Args: avg_time (float): closure phase averaging timescale return_type: data frame ('df') or recarray ('rec') vtype (str): Visibility type (e.g., 'vis', 'llvis', 'rrvis', etc.) count (str): If 'min', return minimal set of phases, If 'max' return all closure phases up to reordering err_type (str): 'predicted' or 'measured' num_samples: number of bootstrap (re)samples if measuring error round_s (float): accuracy of datetime object in seconds snrcut (float): flag closure phases with snr lower than this """ # Get spacing between datapoints in seconds if len(set([x[0] for x in list(self.unpack('time'))])) > 1: tint0 = np.min(np.diff(np.asarray(sorted(list(set( [x[0] for x in list(self.unpack('time'))])))))) tint0 *= 3600 else: tint0 = 0 # Dom TODO: implement averaging during diagonal closure phase creation if avg_time > tint0: print("Averaging while creating diagonal closure phases is not yet implemented!") print("Proceeding for now without averaging.") cdf = ehdf.make_cphase_diag_df(self, vtype=vtype, round_s=round_s, count=count, snrcut=snrcut, uv_min=uv_min) else: cdf = ehdf.make_cphase_diag_df(self, vtype=vtype, round_s=round_s, count=count, snrcut=snrcut, uv_min=uv_min) if return_type == 'rec': cdf = ehdf.df_to_rec(cdf, 'cphase_diag') print("Updated self.cphase_diag: no averaging") self.cphase_diag = cdf print("updated self.cphase_diag: avg_time %f s\n" % avg_time) return
[docs] def add_camp(self, avg_time=0, return_type='rec', ctype='camp', count='max', debias=True, snrcut=0., err_type='predicted', num_samples=1000, round_s=0.1): """Adds attribute self.camp or self.logcamp: closure amplitudes table Args: avg_time (float): closure amplitude averaging timescale return_type: data frame ('df') or recarray ('rec') ctype (str): The closure amplitude type ('camp' or 'logcamp') debias (bool): If True, debias the closure amplitude count (str): If 'min', return minimal set of amplitudes, if 'max' return all closure amplitudes up to inverses err_type (str): 'predicted' or 'measured' num_samples: number of bootstrap (re)samples if measuring error round_s (float): accuracy of datetime object in seconds snrcut (float): flag closure amplitudes with snr lower than this """ # Get spacing between datapoints in seconds if len(set([x[0] for x in list(self.unpack('time'))])) > 1: tint0 = np.min(np.diff(np.asarray(sorted(list(set( [x[0] for x in list(self.unpack('time'))])))))) tint0 *= 3600 else: tint0 = 0 if avg_time > tint0: foo = self.avg_incoherent(avg_time, debias=debias, err_type=err_type) else: foo = self cdf = ehdf.make_camp_df(foo, ctype=ctype, debias=False, count=count, round_s=round_s, snrcut=snrcut) if ctype == 'logcamp': print("updated self.lcamp: no averaging") elif ctype == 'camp': print("updated self.camp: no averaging") if return_type == 'rec': cdf = ehdf.df_to_rec(cdf, 'camp') if ctype == 'logcamp': self.logcamp = cdf print("updated self.logcamp: avg_time %f s\n" % avg_time) elif ctype == 'camp': self.camp = cdf print("updated self.camp: avg_time %f s\n" % avg_time) return
[docs] def add_logcamp(self, avg_time=0, return_type='rec', ctype='camp', count='max', debias=True, snrcut=0., err_type='predicted', num_samples=1000, round_s=0.1): """Adds attribute self.logcamp: closure amplitudes table Args: avg_time (float): closure amplitude averaging timescale return_type: data frame ('df') or recarray ('rec') ctype (str): The closure amplitude type ('camp' or 'logcamp') debias (bool): If True, debias the closure amplitude count (str): If 'min', return minimal set of amplitudes, if 'max' return all closure amplitudes up to inverses err_type (str): 'predicted' or 'measured' num_samples: number of bootstrap (re)samples if measuring error round_s (float): accuracy of datetime object in seconds snrcut (float): flag closure amplitudes with snr lower than this """ self.add_camp(return_type=return_type, ctype='logcamp', count=count, debias=debias, snrcut=snrcut, avg_time=avg_time, err_type=err_type, num_samples=num_samples, round_s=round_s) return
[docs] def add_logcamp_diag(self, avg_time=0, return_type='rec', count='min', snrcut=0., debias=True, err_type='predicted', num_samples=1000, round_s=0.1): """Adds attribute self.logcamp_diag: logcamp_diag table averaged for dt Args: avg_time (float): diagonal log closure amplitude averaging timescale return_type: data frame ('df') or recarray ('rec') debias (bool): If True, debias the diagonal log closure amplitude count (str): If 'min', return minimal set of amplitudes, If 'max' return all diagonal log closure amplitudes up to inverses err_type (str): 'predicted' or 'measured' num_samples: number of bootstrap (re)samples if measuring error round_s (float): accuracy of datetime object in seconds snrcut (float): flag diagonal log closure amplitudes with snr lower than this """ # Get spacing between datapoints in seconds if len(set([x[0] for x in list(self.unpack('time'))])) > 1: tint0 = np.min(np.diff(np.asarray(sorted(list(set( [x[0] for x in list(self.unpack('time'))])))))) tint0 *= 3600 else: tint0 = 0 if avg_time > tint0: foo = self.avg_incoherent(avg_time, debias=debias, err_type=err_type) cdf = ehdf.make_logcamp_diag_df(foo, debias='False', count=count, round_s=round_s, snrcut=snrcut) else: foo = self cdf = ehdf.make_logcamp_diag_df(foo, debias=debias, count=count, round_s=round_s, snrcut=snrcut) if return_type == 'rec': cdf = ehdf.df_to_rec(cdf, 'logcamp_diag') self.logcamp_diag = cdf print("updated self.logcamp_diag: avg_time %f s\n" % avg_time) return
[docs] def add_all(self, avg_time=0, return_type='rec', count='max', debias=True, snrcut=0., err_type='predicted', num_samples=1000, round_s=0.1): """Adds tables of all all averaged derived quantities self.amp,self.bispec,self.cphase,self.camp,self.logcamp Args: avg_time (float): closure amplitude averaging timescale return_type: data frame ('df') or recarray ('rec') debias (bool): If True, debias the closure amplitude count (str): If 'min', return minimal set of closure quantities, if 'max' return all closure quantities err_type (str): 'predicted' or 'measured' num_samples: number of bootstrap (re)samples if measuring error round_s (float): accuracy of datetime object in seconds snrcut (float): flag closure amplitudes with snr lower than this """ self.add_amp(return_type=return_type, avg_time=avg_time, debias=debias, err_type=err_type) self.add_bispec(return_type=return_type, count=count, avg_time=avg_time, snrcut=snrcut, err_type=err_type, num_samples=num_samples, round_s=round_s) self.add_cphase(return_type=return_type, count=count, avg_time=avg_time, snrcut=snrcut, err_type=err_type, num_samples=num_samples, round_s=round_s) self.add_cphase_diag(return_type=return_type, count='min', avg_time=avg_time, snrcut=snrcut, err_type=err_type, num_samples=num_samples, round_s=round_s) self.add_camp(return_type=return_type, ctype='camp', count=count, debias=debias, snrcut=snrcut, avg_time=avg_time, err_type=err_type, num_samples=num_samples, round_s=round_s) self.add_camp(return_type=return_type, ctype='logcamp', count=count, debias=debias, snrcut=snrcut, avg_time=avg_time, err_type=err_type, num_samples=num_samples, round_s=round_s) self.add_logcamp_diag(return_type=return_type, count='min', debias=debias, avg_time=avg_time, snrcut=snrcut, err_type=err_type, num_samples=num_samples, round_s=round_s) return
[docs] def add_scans(self, info='self', filepath='', dt=0.0165, margin=0.0001,split_subarray=False): """Compute scans and add self.scans to Obsdata object. Args: info (str): 'self' to infer from data, 'txt' for text file, 'vex' for vex schedule file filepath (str): path to txt/vex file with scans info dt (float): minimal time interval between scans in hours margin (float): padding scans by that time margin in hours """ # infer scans directly from data if info == 'self': times_uni = np.asarray(sorted(list(set(self.data['time'])))) scans = np.zeros_like(times_uni) scan_id = 0 for cou in range(len(times_uni) - 1): scans[cou] = scan_id if (times_uni[cou + 1] - times_uni[cou] > dt): scan_id += 1 scans[-1] = scan_id scanlist = np.asarray([np.asarray([ np.min(times_uni[scans == cou]) - margin, np.max(times_uni[scans == cou]) + margin]) for cou in range(int(scans[-1]) + 1)]) # read in scans from a text file elif info == 'txt': scanlist = np.loadtxt(filepath) # read in scans from a vex file elif info == 'vex': vex0 = ehtim.vex.Vex(filepath) t_min = [vex0.sched[x]['start_hr'] for x in range(len(vex0.sched))] duration = [] for x in range(len(vex0.sched)): duration_foo = max([vex0.sched[x]['scan'][y]['scan_sec'] for y in range(len(vex0.sched[x]['scan']))]) duration.append(duration_foo) t_max = [tmin + dur / 3600. for (tmin, dur) in zip(t_min, duration)] scanlist = np.array([[tmin, tmax] for (tmin, tmax) in zip(t_min, t_max)]) else: print("Parameter 'info' can only assume values 'self', 'txt' or 'vex'! ") scanlist = None #Split scan in 2 wherever the array changed if split_subarray: timelist=self.tlist() prev_array=[] for cou in range(len(timelist)): current_array=np.unique([timelist[cou]['t1'],timelist[cou]['t2']]) if set(current_array)==set(prev_array): continue split_time=timelist[cou]['time'][0] prev_array=current_array if split_time-margin in scanlist[:,0]: continue index=np.where(split_time>=scanlist.T[0])[0][-1] scanlist=np.insert(scanlist,index,scanlist[index],axis=0) scanlist[index,1]=split_time-margin scanlist[index+1,0]=split_time+margin self.scans = scanlist return
[docs] def cleanbeam(self, npix, fov, pulse=ehc.PULSE_DEFAULT): """Make an image of the observation clean beam. Args: npix (int): The pixel size of the square output image. fov (float): The field of view of the square output image in radians. pulse (function): The function convolved with the pixel values for continuous image. Returns: (Image): an Image object with the clean beam. """ im = ehtim.image.make_square(self, npix, fov, pulse=pulse) beamparams = self.fit_beam() im = im.add_gauss(1.0, beamparams) return im
[docs] def fit_beam(self, weighting='uniform', units='rad'): """Fit a Gaussian to the dirty beam and return the parameters (fwhm_maj, fwhm_min, theta). Args: weighting (str): 'uniform' or 'natural'. units (string): 'rad' returns values in radians, 'natural' returns FWHMs in uas and theta in degrees Returns: (tuple): a tuple (fwhm_maj, fwhm_min, theta) of the dirty beam parameters in radians. """ # Define the fit function that compares the quadratic expansion of the dirty image # with the quadratic expansion of an elliptical gaussian def fit_chisq(beamparams, db_coeff): (fwhm_maj2, fwhm_min2, theta) = beamparams a = 4 * np.log(2) * (np.cos(theta)**2 / fwhm_min2 + np.sin(theta)**2 / fwhm_maj2) b = 4 * np.log(2) * (np.cos(theta)**2 / fwhm_maj2 + np.sin(theta)**2 / fwhm_min2) c = 8 * np.log(2) * np.cos(theta) * np.sin(theta) * (1.0 / fwhm_maj2 - 1.0 / fwhm_min2) gauss_coeff = np.array((a, b, c)) chisq = np.sum((np.array(db_coeff) - gauss_coeff)**2) return chisq # These are the coefficients (a,b,c) of a quadratic expansion of the dirty beam # For a point (x,y) in the image plane, the dirty beam expansion is 1-ax^2-by^2-cxy u = self.unpack('u')['u'] v = self.unpack('v')['v'] sigma = self.unpack('sigma')['sigma'] weights = np.ones(u.shape) if weighting == 'natural': weights = 1. / sigma**2 abc = np.array([np.sum(weights * u**2), np.sum(weights * v**2), 2 * np.sum(weights * u * v)]) abc *= (2. * np.pi**2 / np.sum(weights)) abc *= 1e-20 # Decrease size of coefficients # Fit the beam guess = [(50)**2, (50)**2, 0.0] params = opt.minimize(fit_chisq, guess, args=(abc,), method='Powell') # Return parameters, adjusting fwhm_maj and fwhm_min if necessary if params.x[0] > params.x[1]: fwhm_maj = 1e-10 * np.sqrt(params.x[0]) fwhm_min = 1e-10 * np.sqrt(params.x[1]) theta = np.mod(params.x[2], np.pi) else: fwhm_maj = 1e-10 * np.sqrt(params.x[1]) fwhm_min = 1e-10 * np.sqrt(params.x[0]) theta = np.mod(params.x[2] + np.pi / 2.0, np.pi) gparams = np.array((fwhm_maj, fwhm_min, theta)) if units == 'natural': gparams[0] /= ehc.RADPERUAS gparams[1] /= ehc.RADPERUAS gparams[2] *= 180. / np.pi return gparams
[docs] def dirtybeam(self, npix, fov, pulse=ehc.PULSE_DEFAULT, weighting='uniform'): """Make an image of the observation dirty beam. Args: npix (int): The pixel size of the square output image. fov (float): The field of view of the square output image in radians. pulse (function): The function convolved with the pixel values for continuous image. weighting (str): 'uniform' or 'natural' Returns: (Image): an Image object with the dirty beam. """ pdim = fov / npix sigma = self.unpack('sigma')['sigma'] u = self.unpack('u')['u'] v = self.unpack('v')['v'] if weighting == 'natural': weights = 1. / sigma**2 else: weights = np.ones(u.shape) xlist = np.arange(0, -npix, -1) * pdim + (pdim * npix) / 2.0 - pdim / 2.0 # TODO -- use NFFT # TODO -- different beam weightings im = np.array([[np.mean(weights * np.cos(-2 * np.pi * (i * u + j * v))) for i in xlist] for j in xlist]) im = im[0:npix, 0:npix] im = im / np.sum(im) # Normalize to a total beam power of 1 src = self.source + "_DB" outim = ehtim.image.Image(im, pdim, self.ra, self.dec, rf=self.rf, source=src, mjd=self.mjd, pulse=pulse) return outim
[docs] def dirtyimage(self, npix, fov, pulse=ehc.PULSE_DEFAULT, weighting='uniform'): """Make the observation dirty image (direct Fourier transform). Args: npix (int): The pixel size of the square output image. fov (float): The field of view of the square output image in radians. pulse (function): The function convolved with the pixel values for continuous image. weighting (str): 'uniform' or 'natural' Returns: (Image): an Image object with dirty image. """ pdim = fov / npix u = self.unpack('u')['u'] v = self.unpack('v')['v'] sigma = self.unpack('sigma')['sigma'] xlist = np.arange(0, -npix, -1) * pdim + (pdim * npix) / 2.0 - pdim / 2.0 if weighting == 'natural': weights = 1. / sigma**2 else: weights = np.ones(u.shape) dim = np.array([[np.mean(weights * np.cos(-2 * np.pi * (i * u + j * v))) for i in xlist] for j in xlist]) normfac = 1. / np.sum(dim) for label in ['vis1', 'vis2', 'vis3', 'vis4']: visname = self.poldict[label] vis = self.unpack(visname)[visname] # TODO -- use NFFT # TODO -- different beam weightings im = np.array([[np.mean(weights * (np.real(vis) * np.cos(-2 * np.pi * (i * u + j * v)) - np.imag(vis) * np.sin(-2 * np.pi * (i * u + j * v)))) for i in xlist] for j in xlist]) # Final normalization im = im * normfac im = im[0:npix, 0:npix] if label == 'vis1': out = ehtim.image.Image(im, pdim, self.ra, self.dec, polrep=self.polrep, rf=self.rf, source=self.source, mjd=self.mjd, pulse=pulse) else: pol = {ehc.vis_poldict[key]: key for key in ehc.vis_poldict.keys()}[visname] out.add_pol_image(im, pol) return out
[docs] def rescale_zbl(self, totflux, uv_max, debias=True): """Rescale the short baselines to a new level of total flux. Args: totflux (float): new total flux to rescale to uv_max (float): maximum baseline length to rescale debias (bool): Debias amplitudes before computing original total flux from short bls Returns: (Obsdata): An Obsdata object with the inflated noise values. """ # estimate the original total flux obs_zerobl = self.flag_uvdist(uv_max=uv_max) obs_zerobl.add_amp(debias=True) orig_totflux = np.sum(obs_zerobl.amp['amp'] * (1 / obs_zerobl.amp['sigma']**2)) orig_totflux /= np.sum(1 / obs_zerobl.amp['sigma']**2) print('Rescaling zero baseline by ' + str(orig_totflux - totflux) + ' Jy' + ' to ' + str(totflux) + ' Jy') # Rescale short baselines to excise contributions from extended flux # Note: this does not do the proper thing for fractional polarization) obs = self.copy() for j in range(len(obs.data)): if (obs.data['u'][j]**2 + obs.data['v'][j]**2)**0.5 < uv_max: obs.data['vis'][j] *= totflux / orig_totflux obs.data['qvis'][j] *= totflux / orig_totflux obs.data['uvis'][j] *= totflux / orig_totflux obs.data['vvis'][j] *= totflux / orig_totflux obs.data['sigma'][j] *= totflux / orig_totflux obs.data['qsigma'][j] *= totflux / orig_totflux obs.data['usigma'][j] *= totflux / orig_totflux obs.data['vsigma'][j] *= totflux / orig_totflux return obs
[docs] def add_leakage_noise(self, Dterm_amp=0.1, min_noise=0.01, debias=False): """Add estimated systematic noise from leakage at quadrature to thermal noise. Requires cross-hand visibilities. !! this operation is not currently tracked and should be applied with extreme caution!! Args: Dterm_amp (float): Estimated magnitude of leakage terms min_noise (float): Minimum fractional systematic noise to add debias (bool): Debias amplitudes before computing fractional noise Returns: (Obsdata): An Obsdata object with the inflated noise values. """ # Extract visibility amplitudes # Switch to Stokes for graceful handling of circular basis products missing RR or LL amp = self.switch_polrep('stokes').unpack('amp', debias=debias)['amp'] rlamp = np.nan_to_num(self.switch_polrep('circ').unpack('rlamp', debias=debias)['rlamp']) lramp = np.nan_to_num(self.switch_polrep('circ').unpack('lramp', debias=debias)['lramp']) frac_noise = (Dterm_amp * rlamp / amp)**2 + (Dterm_amp * lramp / amp)**2 frac_noise = frac_noise * (frac_noise > min_noise) + min_noise * (frac_noise < min_noise) out = self.copy() for sigma in ['sigma1', 'sigma2', 'sigma3', 'sigma4']: try: field = self.poldict[sigma] out.data[field] = (self.data[field]**2 + np.abs(frac_noise * amp)**2)**0.5 except KeyError: continue return out
[docs] def add_fractional_noise(self, frac_noise, debias=False): """Add a constant fraction of amplitude at quadrature to thermal noise. Effectively imposes a maximal signal-to-noise ratio. !! this operation is not currently tracked and should be applied with extreme caution!! Args: frac_noise (float): The fraction of noise to add. debias (bool): Whether or not to add frac_noise of debiased amplitudes. Returns: (Obsdata): An Obsdata object with the inflated noise values. """ # Extract visibility amplitudes # Switch to Stokes for graceful handling of circular basis products missing RR or LL amp = self.switch_polrep('stokes').unpack('amp', debias=debias)['amp'] out = self.copy() for sigma in ['sigma1', 'sigma2', 'sigma3', 'sigma4']: try: field = self.poldict[sigma] out.data[field] = (self.data[field]**2 + np.abs(frac_noise * amp)**2)**0.5 except KeyError: continue return out
[docs] def find_amt_fractional_noise(self, im, dtype='vis', target=1.0, debias=False, maxiter=200, ftol=1e-20, gtol=1e-20): """Returns the amount of fractional sys error necessary to make the image have a chisq close to the targeted value (1.0) """ obs = self.copy() def objfunc(frac_noise): obs_tmp = obs.add_fractional_noise(frac_noise, debias=debias) chisq = obs_tmp.chisq(im, dtype=dtype) return np.abs(target - chisq) optdict = {'maxiter': maxiter, 'ftol': ftol, 'gtol': gtol} res = opt.minimize(objfunc, 0.0, method='L-BFGS-B', options=optdict) return res.x
[docs] def rescale_noise(self, noise_rescale_factor=1.0): """Rescale the thermal noise on all Stokes parameters by a constant factor. This is useful for AIPS data, which has a missing factor relating 'weights' to noise. Args: noise_rescale_factor (float): The number to multiple the existing sigmas by. Returns: (Obsdata): An Obsdata object with the rescaled noise values. """ datatable = self.data.copy() for d in datatable: d[-4] = d[-4] * noise_rescale_factor d[-3] = d[-3] * noise_rescale_factor d[-2] = d[-2] * noise_rescale_factor d[-1] = d[-1] * noise_rescale_factor arglist, argdict = self.obsdata_args() arglist[DATPOS] = np.array(datatable) out = Obsdata(*arglist, **argdict) return out
[docs] def estimate_noise_rescale_factor(self, max_diff_sec=0.0, min_num=10, median_snr_cut=0, count='max', vtype='vis', print_std=False): """Estimate a constant noise rescaling factor on all baselines, times, and polarizations. Uses pairwise differences of closure phases relative to the expected scatter. This is useful for AIPS data, which has a missing factor relating 'weights' to noise. Args: max_diff_sec (float): The maximum difference of adjacent closure phases (in seconds) If 0, auto-estimates to twice the median scan length. min_num (int): The minimum number of closure phase differences for a triangle to be included in the set of estimators. median_snr_cut (float): Do not include a triangle if its median SNR is below this count (str): If 'min', use minimal set of phases, if 'max' use all closure phases up to reordering vtype (str): Visibility type (e.g., 'vis', 'llvis', 'rrvis', etc.) print_std (bool): Whether or not to print the std dev. for each closure triangle. Returns: (float): The rescaling factor. """ if max_diff_sec == 0.0: max_diff_sec = 5 * np.median(self.unpack('tint')['tint']) print("estimated max_diff_sec: ", max_diff_sec) # Now check the noise statistics on all closure phase triangles c_phases = self.c_phases(vtype=vtype, mode='time', count=count, ang_unit='') # First, just determine the set of closure phase triangles all_triangles = [] for scan in c_phases: for cphase in scan: all_triangles.append((cphase[1], cphase[2], cphase[3])) std_list = [] print("Estimating noise rescaling factor from %d triangles...\n" % len(set(all_triangles))) # Now determine the differences of adjacent samples on each triangle, # relative to the expected thermal noise i_count = 0 for tri in set(all_triangles): i_count = i_count + 1 if print_std: sys.stdout.write('\rGetting noise for triangles %i/%i ' % (i_count, len(set(all_triangles)))) sys.stdout.flush() all_tri = np.array([[]]) for scan in c_phases: for cphase in scan: if (cphase[1] == tri[0] and cphase[2] == tri[1] and cphase[3] == tri[2] and not np.isnan(cphase[-2]) and not np.isnan(cphase[-2])): all_tri = np.append(all_tri, ((cphase[0], cphase[-2], cphase[-1]))) all_tri = all_tri.reshape(int(len(all_tri) / 3), 3) # See whether the triangle has sufficient SNR if np.median(np.abs(all_tri[:, 1] / all_tri[:, 2])) < median_snr_cut: if print_std: print(tri, 'median snr too low (%6.4f)' % np.median(np.abs(all_tri[:, 1] / all_tri[:, 2]))) continue # Now go through and find studentized differences of adjacent points s_list = np.array([]) for j in range(len(all_tri) - 1): if (all_tri[j + 1, 0] - all_tri[j, 0]) * 3600.0 < max_diff_sec: diff = (all_tri[j + 1, 1] - all_tri[j, 1]) % (2.0 * np.pi) if diff > np.pi: diff -= 2.0 * np.pi s_list = np.append( s_list, diff / (all_tri[j, 2]**2 + all_tri[j + 1, 2]**2)**0.5) if len(s_list) > min_num: std_list.append(np.std(s_list)) if print_std: print(tri, '%6.4f [%d differences]' % (np.std(s_list), len(s_list))) else: if print_std and len(all_tri) > 0: print(tri, '%d cphases found [%d differences < min_num = %d]' % (len(all_tri), len(s_list), min_num)) if len(std_list) == 0: print("No suitable closure phase differences! Try using a larger max_diff_sec.") median = 1.0 else: median = np.median(std_list) return median
[docs] def flag_elev(self, elev_min=0.0, elev_max=90, output='kept'): """Flag visibilities for which either station is outside a stated elevation range Args: elev_min (float): Minimum elevation (deg) elev_max (float): Maximum elevation (deg) output (str): returns 'kept', 'flagged', or 'both' (a dictionary) Returns: (Obsdata): a observation object with flagged data points removed """ el_pairs = self.unpack(['el1', 'el2']) mask = (np.min((el_pairs['el1'], el_pairs['el2']), axis=0) > elev_min) mask *= (np.max((el_pairs['el1'], el_pairs['el2']), axis=0) < elev_max) datatable_kept = self.data.copy() datatable_flagged = self.data.copy() datatable_kept = datatable_kept[mask] datatable_flagged = datatable_flagged[np.invert(mask)] print('Flagged %d/%d visibilities' % (len(datatable_flagged), len(self.data))) obs_kept = self.copy() obs_flagged = self.copy() obs_kept.data = datatable_kept obs_flagged.data = datatable_flagged if output == 'flagged': return obs_flagged elif output == 'both': return {'kept': obs_kept, 'flagged': obs_flagged} else: return obs_kept
[docs] def flag_large_fractional_pol(self, max_fractional_pol=1.0, output='kept'): """Flag visibilities for which the fractional polarization is above a specified threshold Args: max_fractional_pol (float): Maximum fractional polarization output (str): returns 'kept', 'flagged', or 'both' (a dictionary) Returns: (Obsdata): a observation object with flagged data points removed """ m = np.nan_to_num(self.unpack(['mamp'])['mamp']) mask = m < max_fractional_pol datatable_kept = self.data.copy() datatable_flagged = self.data.copy() datatable_kept = datatable_kept[mask] datatable_flagged = datatable_flagged[np.invert(mask)] print('Flagged %d/%d visibilities' % (len(datatable_flagged), len(self.data))) obs_kept = self.copy() obs_flagged = self.copy() obs_kept.data = datatable_kept obs_flagged.data = datatable_flagged if output == 'flagged': return obs_flagged elif output == 'both': return {'kept': obs_kept, 'flagged': obs_flagged} else: return obs_kept
[docs] def flag_uvdist(self, uv_min=0.0, uv_max=1e12, output='kept'): """Flag data points outside a given uv range Args: uv_min (float): remove points with uvdist less than this uv_max (float): remove points with uvdist greater than this output (str): returns 'kept', 'flagged', or 'both' (a dictionary) Returns: (Obsdata): a observation object with flagged data points removed """ uvdist_list = self.unpack('uvdist')['uvdist'] mask = np.array([uv_min <= uvdist_list[j] <= uv_max for j in range(len(uvdist_list))]) datatable_kept = self.data.copy() datatable_flagged = self.data.copy() datatable_kept = datatable_kept[mask] datatable_flagged = datatable_flagged[np.invert(mask)] print('U-V flagged %d/%d visibilities' % (len(datatable_flagged), len(self.data))) obs_kept = self.copy() obs_flagged = self.copy() obs_kept.data = datatable_kept obs_flagged.data = datatable_flagged if output == 'flagged': return obs_flagged elif output == 'both': return {'kept': obs_kept, 'flagged': obs_flagged} else: return obs_kept
[docs] def flag_sites(self, sites, output='kept'): """Flag data points that include the specified sites Args: sites (list): list of sites to remove from the data output (str): returns 'kept', 'flagged', or 'both' (a dictionary) Returns: (Obsdata): a observation object with flagged data points removed """ # This will remove all visibilities that include any of the specified sites t1_list = self.unpack('t1')['t1'] t2_list = self.unpack('t2')['t2'] mask = np.array([t1_list[j] not in sites and t2_list[j] not in sites for j in range(len(t1_list))]) datatable_kept = self.data.copy() datatable_flagged = self.data.copy() datatable_kept = datatable_kept[mask] datatable_flagged = datatable_flagged[np.invert(mask)] print('Flagged %d/%d visibilities' % (len(datatable_flagged), len(self.data))) obs_kept = self.copy() obs_flagged = self.copy() obs_kept.data = datatable_kept obs_flagged.data = datatable_flagged if output == 'flagged': # return only the points flagged as anomalous return obs_flagged elif output == 'both': return {'kept': obs_kept, 'flagged': obs_flagged} else: return obs_kept
[docs] def flag_bl(self, sites, output='kept'): """Flag data points that include the specified baseline Args: sites (list): baseline to remove from the data output (str): returns 'kept', 'flagged', or 'both' (a dictionary) Returns: (Obsdata): a observation object with flagged data points removed """ # This will remove all visibilities that include any of the specified baseline obs_out = self.copy() t1_list = obs_out.unpack('t1')['t1'] t2_list = obs_out.unpack('t2')['t2'] mask = np.array([not(t1_list[j] in sites and t2_list[j] in sites) for j in range(len(t1_list))]) datatable_kept = self.data.copy() datatable_flagged = self.data.copy() datatable_kept = datatable_kept[mask] datatable_flagged = datatable_flagged[np.invert(mask)] print('Flagged %d/%d visibilities' % (len(datatable_flagged), len(self.data))) obs_kept = self.copy() obs_flagged = self.copy() obs_kept.data = datatable_kept obs_flagged.data = datatable_flagged if output == 'flagged': # return only the points flagged as anomalous return obs_flagged elif output == 'both': return {'kept': obs_kept, 'flagged': obs_flagged} else: return obs_kept
[docs] def flag_low_snr(self, snr_cut=3, output='kept'): """Flag low snr data points Args: snr_cut (float): remove points with snr lower than this output (str): returns 'kept', 'flagged', or 'both' (a dictionary) Returns: (Obsdata): a observation object with flagged data points removed """ mask = self.unpack('snr')['snr'] > snr_cut datatable_kept = self.data.copy() datatable_flagged = self.data.copy() datatable_kept = datatable_kept[mask] datatable_flagged = datatable_flagged[np.invert(mask)] print('snr flagged %d/%d visibilities' % (len(datatable_flagged), len(self.data))) obs_kept = self.copy() obs_flagged = self.copy() obs_kept.data = datatable_kept obs_flagged.data = datatable_flagged if output == 'flagged': # return only the points flagged as anomalous return obs_flagged elif output == 'both': return {'kept': obs_kept, 'flagged': obs_flagged} else: return obs_kept
[docs] def flag_high_sigma(self, sigma_cut=.005, sigma_type='sigma', output='kept'): """Flag high sigma (thermal noise on Stoke I) data points Args: sigma_cut (float): remove points with sigma higher than this sigma_type (str): sigma type (sigma, rrsigma, llsigma, etc.) output (str): returns 'kept', 'flagged', or 'both' (a dictionary) Returns: (Obsdata): a observation object with flagged data points removed """ mask = self.unpack(sigma_type)[sigma_type] < sigma_cut datatable_kept = self.data.copy() datatable_flagged = self.data.copy() datatable_kept = datatable_kept[mask] datatable_flagged = datatable_flagged[np.invert(mask)] print('sigma flagged %d/%d visibilities' % (len(datatable_flagged), len(self.data))) obs_kept = self.copy() obs_flagged = self.copy() obs_kept.data = datatable_kept obs_flagged.data = datatable_flagged if output == 'flagged': # return only the points flagged as anomalous return obs_flagged elif output == 'both': return {'kept': obs_kept, 'flagged': obs_flagged} else: return obs_kept
[docs] def flag_UT_range(self, UT_start_hour=0., UT_stop_hour=0., flag_type='all', flag_what='', output='kept'): """Flag data points within a certain UT range Args: UT_start_hour (float): start of time window UT_stop_hour (float): end of time window flag_type (str): 'all', 'baseline', or 'station' flag_what (str): baseline or station to flag output (str): returns 'kept', 'flagged', or 'both' (a dictionary) Returns: (Obsdata): a observation object with flagged data points removed """ # This drops (or only keeps) points within a specified UT range UT_mask = self.unpack('time')['time'] <= UT_start_hour UT_mask = UT_mask + (self.unpack('time')['time'] >= UT_stop_hour) if flag_type != 'all': t1_list = self.unpack('t1')['t1'] t2_list = self.unpack('t2')['t2'] if flag_type == 'station': station = flag_what what_mask = np.array([not (t1_list[j] == station or t2_list[j] == station) for j in range(len(t1_list))]) elif flag_type == 'baseline': station1 = flag_what.split('-')[0] station2 = flag_what.split('-')[1] stations = [station1, station2] what_mask = np.array([not ((t1_list[j] in stations) and (t2_list[j] in stations)) for j in range(len(t1_list))]) else: what_mask = np.array([False for j in range(len(UT_mask))]) mask = UT_mask | what_mask datatable_kept = self.data.copy() datatable_flagged = self.data.copy() datatable_kept = datatable_kept[mask] datatable_flagged = datatable_flagged[np.invert(mask)] print('time flagged %d/%d visibilities' % (len(datatable_flagged), len(self.data))) obs_kept = self.copy() obs_flagged = self.copy() obs_kept.data = datatable_kept obs_flagged.data = datatable_flagged if output == 'flagged': # return only the points flagged as anomalous return obs_flagged elif output == 'both': return {'kept': obs_kept, 'flagged': obs_flagged} else: return obs_kept
[docs] def flags_from_file(self, flagfile, flag_type='station'): """Flagging data based on csv file Args: flagfile (str): path to csv file with mjds of flagging start / stop time, and optionally baseline / stations flag_type (str): 'all', 'baseline', or 'station' Returns: (Obsdata): a observation object with flagged data points removed """ df = pd.read_csv(flagfile) mjd_start = list(df['mjd_start']) mjd_stop = list(df['mjd_stop']) if flag_type == 'station': whatL = list(df['station']) elif flag_type == 'baseline': whatL = list(df['baseline']) elif flag_type == 'all': whatL = ['' for cou in range(len(mjd_start))] obs = self.copy() for cou in range(len(mjd_start)): what = whatL[cou] starth = (mjd_start[cou] % 1) * 24. stoph = (mjd_stop[cou] % 1) * 24. obs = obs.flag_UT_range(UT_start_hour=starth, UT_stop_hour=stoph, flag_type=flag_type, flag_what=what, output='kept') return obs
[docs] def flag_anomalous(self, field='snr', max_diff_seconds=100, robust_nsigma_cut=5, output='kept'): """Flag anomalous data points Args: field (str): The quantity to test for max_diff_seconds (float): The moving window size for testing outliers robust_nsigma_cut (float): Outliers further than this from the mean are removed output (str): returns 'kept', 'flagged', or 'both' (a dictionary) Returns: (Obsdata): a observation object with flagged data points removed """ stats = dict() for t1 in set(self.data['t1']): for t2 in set(self.data['t2']): vals = self.unpack_bl(t1, t2, field) if len(vals) > 0: # nans will all be dropped, which can be problematic for polarimetric values vals[field] = np.nan_to_num(vals[field]) for j in range(len(vals)): near_vals_mask = np.abs(vals['time'] - vals['time'] [j]) < max_diff_seconds / 3600.0 fields = vals[field][near_vals_mask] # Here, we use median absolute deviation as a robust proxy for standard # deviation dfields = np.median(np.abs(fields - np.median(fields))) # Avoid problems when the MAD is zero (e.g., a single sample) if dfields == 0.0: dfields = 1.0 stat = np.abs(vals[field][j] - np.median(fields)) / dfields stats[(vals['time'][j][0], tuple(sorted((t1, t2))))] = stat mask = np.array([stats[(rec[0], tuple(sorted((rec[2], rec[3]))))][0] < robust_nsigma_cut for rec in self.data]) datatable_kept = self.data.copy() datatable_flagged = self.data.copy() datatable_kept = datatable_kept[mask] datatable_flagged = datatable_flagged[np.invert(mask)] print('anomalous %s flagged %d/%d visibilities' % (field, len(datatable_flagged), len(self.data))) # Make new observations with all data first to avoid problems with empty arrays obs_kept = self.copy() obs_flagged = self.copy() obs_kept.data = datatable_kept obs_flagged.data = datatable_flagged if output == 'flagged': # return only the points flagged as anomalous return obs_flagged elif output == 'both': return {'kept': obs_kept, 'flagged': obs_flagged} else: return obs_kept
[docs] def filter_subscan_dropouts(self, perc=0, return_type='rec'): """Filtration to drop data and ensure that we only average parts with same timestamp. Potentially this could reduce risk of non-closing errors. Args: perc (float): drop baseline from scan if it has less than this fraction of median baseline observation time during the scan return_type (str): data frame ('df') or recarray ('rec') Returns: (Obsdata): a observation object with flagged data points removed """ if not isinstance(self.scans, np.ndarray): print('List of scans in ndarray format required! Add it with add_scans') else: # make df and add scan_id to data df = ehdf.make_df(self) tot_points = np.shape(df)[0] bins, labs = ehdf.get_bins_labels(self.scans) df['scan_id'] = list(pd.cut(df.time, bins, labels=labs)) # first flag baselines that are working for short part of scan df['count_samples'] = 1 hm1 = df.groupby(['scan_id', 'baseline', 'polarization']) hm1 = hm1.agg({'count_samples': np.sum}).reset_index() hm1['count_baselines_before'] = 1 hm2 = hm1.groupby(['scan_id', 'polarization']) hm2 = hm2.agg({'count_samples': lambda x: perc * np.median(x), 'count_baselines_before': np.sum}).reset_index() # dictionary with minimum acceptable number of samples per scan dict_elem_in_scan = dict(zip(hm2.scan_id, hm2.count_samples)) # list of acceptable scans and baselines hm1 = hm1[list(map(lambda x: x[1] >= dict_elem_in_scan[x[0]], list(zip(hm1.scan_id, hm1.count_samples))))] list_good_scans_baselines = list(zip(hm1.scan_id, hm1.baseline)) # filter out data df_filtered = df[list(map(lambda x: x in list_good_scans_baselines, list(zip(df.scan_id, df.baseline))))] # how many baselines present during scan? df_filtered['count_samples'] = 1 hm3 = df_filtered.groupby(['scan_id', 'baseline', 'polarization']) hm3 = hm3.agg({'count_samples': np.sum}).reset_index() hm3['count_baselines_after'] = 1 hm4 = hm3.groupby(['scan_id', 'polarization']) hm4 = hm4.agg({'count_baselines_after': np.sum}).reset_index() dict_how_many_baselines = dict(zip(hm4.scan_id, hm4.count_baselines_after)) # how many baselines present during each time? df_filtered['count_baselines_per_time'] = 1 hm5 = df_filtered.groupby(['datetime', 'scan_id', 'polarization']) hm5 = hm5.agg({'count_baselines_per_time': np.sum}).reset_index() dict_datetime_num_baselines = dict(zip(hm5.datetime, hm5.count_baselines_per_time)) # only keep times when all baselines available df_filtered2 = df_filtered[list(map(lambda x: dict_datetime_num_baselines[x[1]] == dict_how_many_baselines[x[0]], list( zip(df_filtered.scan_id, df_filtered.datetime))))] remaining_points = np.shape(df_filtered2)[0] print('Flagged out {} of {} datapoints'.format( tot_points - remaining_points, tot_points)) if return_type == 'rec': out_vis = ehdf.df_to_rec(df_filtered2, 'vis') arglist, argdict = self.obsdata_args() arglist[DATPOS] = out_vis out = Obsdata(*arglist, **argdict) return out
[docs] def reverse_taper(self, fwhm): """Reverse taper the observation with a circular Gaussian kernel Args: fwhm (float): real space fwhm size of convolution kernel in radian Returns: (Obsdata): a new reverse-tapered observation object """ datatable = self.data.copy() vis1 = datatable[self.poldict['vis1']] vis2 = datatable[self.poldict['vis2']] vis3 = datatable[self.poldict['vis3']] vis4 = datatable[self.poldict['vis4']] sigma1 = datatable[self.poldict['sigma1']] sigma2 = datatable[self.poldict['sigma2']] sigma3 = datatable[self.poldict['sigma3']] sigma4 = datatable[self.poldict['sigma4']] u = datatable['u'] v = datatable['v'] fwhm_sigma = fwhm / (2 * np.sqrt(2 * np.log(2))) ker = np.exp(-2 * np.pi**2 * fwhm_sigma**2 * (u**2 + v**2)) datatable[self.poldict['vis1']] = vis1 / ker datatable[self.poldict['vis2']] = vis2 / ker datatable[self.poldict['vis3']] = vis3 / ker datatable[self.poldict['vis4']] = vis4 / ker datatable[self.poldict['sigma1']] = sigma1 / ker datatable[self.poldict['sigma2']] = sigma2 / ker datatable[self.poldict['sigma3']] = sigma3 / ker datatable[self.poldict['sigma4']] = sigma4 / ker arglist, argdict = self.obsdata_args() arglist[DATPOS] = datatable obstaper = Obsdata(*arglist, **argdict) return obstaper
[docs] def taper(self, fwhm): """Taper the observation with a circular Gaussian kernel Args: fwhm (float): real space fwhm size of convolution kernel in radian Returns: (Obsdata): a new tapered observation object """ datatable = self.data.copy() vis1 = datatable[self.poldict['vis1']] vis2 = datatable[self.poldict['vis2']] vis3 = datatable[self.poldict['vis3']] vis4 = datatable[self.poldict['vis4']] sigma1 = datatable[self.poldict['sigma1']] sigma2 = datatable[self.poldict['sigma2']] sigma3 = datatable[self.poldict['sigma3']] sigma4 = datatable[self.poldict['sigma4']] u = datatable['u'] v = datatable['v'] fwhm_sigma = fwhm / (2 * np.sqrt(2 * np.log(2))) ker = np.exp(-2 * np.pi**2 * fwhm_sigma**2 * (u**2 + v**2)) datatable[self.poldict['vis1']] = vis1 * ker datatable[self.poldict['vis2']] = vis2 * ker datatable[self.poldict['vis3']] = vis3 * ker datatable[self.poldict['vis4']] = vis4 * ker datatable[self.poldict['sigma1']] = sigma1 * ker datatable[self.poldict['sigma2']] = sigma2 * ker datatable[self.poldict['sigma3']] = sigma3 * ker datatable[self.poldict['sigma4']] = sigma4 * ker arglist, argdict = self.obsdata_args() arglist[DATPOS] = datatable obstaper = Obsdata(*arglist, **argdict) return obstaper
[docs] def deblur(self): """Deblur the observation obs by dividing by the Sgr A* scattering kernel. Args: Returns: (Obsdata): a new deblurred observation object. """ # make a copy of observation data datatable = self.data.copy() vis1 = datatable[self.poldict['vis1']] vis2 = datatable[self.poldict['vis2']] vis3 = datatable[self.poldict['vis3']] vis4 = datatable[self.poldict['vis4']] sigma1 = datatable[self.poldict['sigma1']] sigma2 = datatable[self.poldict['sigma2']] sigma3 = datatable[self.poldict['sigma3']] sigma4 = datatable[self.poldict['sigma4']] u = datatable['u'] v = datatable['v'] # divide visibilities by the scattering kernel for i in range(len(vis1)): ker = obsh.sgra_kernel_uv(self.rf, u[i], v[i]) vis1[i] = vis1[i] / ker vis2[i] = vis2[i] / ker vis2[i] = vis3[i] / ker vis4[i] = vis4[i] / ker sigma1[i] = sigma1[i] / ker sigma2[i] = sigma2[i] / ker sigma3[i] = sigma3[i] / ker sigma4[i] = sigma4[i] / ker datatable[self.poldict['vis1']] = vis1 datatable[self.poldict['vis2']] = vis2 datatable[self.poldict['vis3']] = vis3 datatable[self.poldict['vis4']] = vis4 datatable[self.poldict['sigma1']] = sigma1 datatable[self.poldict['sigma2']] = sigma2 datatable[self.poldict['sigma3']] = sigma3 datatable[self.poldict['sigma4']] = sigma4 arglist, argdict = self.obsdata_args() arglist[DATPOS] = datatable obsdeblur = Obsdata(*arglist, **argdict) return obsdeblur
[docs] def reweight(self, uv_radius, weightdist=1.0): """Reweight the sigmas based on the local density of uv points Args: uv_radius (float): radius in uv-plane to look for nearby points weightdist (float): ?? Returns: (Obsdata): a new reweighted observation object. """ obs_new = self.copy() npts = len(obs_new.data) uvpoints = np.vstack((obs_new.data['u'], obs_new.data['v'])).transpose() uvpoints_tree1 = spatial.cKDTree(uvpoints) uvpoints_tree2 = spatial.cKDTree(-uvpoints) for i in range(npts): matches1 = uvpoints_tree1.query_ball_point(uvpoints[i, :], uv_radius) matches2 = uvpoints_tree2.query_ball_point(uvpoints[i, :], uv_radius) nmatches = len(matches1) + len(matches2) for sigma in ['sigma', 'qsigma', 'usigma', 'vsigma']: obs_new.data[sigma][i] = np.sqrt(nmatches) scale = np.mean(self.data['sigma']) / np.mean(obs_new.data['sigma']) for sigma in ['sigma', 'qsigma', 'usigma', 'vsigma']: obs_new.data[sigma] *= scale * weightdist if weightdist < 1.0: for i in range(npts): for sigma in ['sigma', 'qsigma', 'usigma', 'vsigma']: obs_new.data[sigma][i] += (1 - weightdist) * self.data[sigma][i] return obs_new
[docs] def fit_gauss(self, flux=1.0, fittype='amp', paramguess=( 100 * ehc.RADPERUAS, 100 * ehc.RADPERUAS, 0.)): """Fit a gaussian to either Stokes I complex visibilities or Stokes I visibility amplitudes. Args: flux (float): total flux in the fitted gaussian fitttype (str): "amp" to fit to visibilty amplitudes paramguess (tuble): initial guess of fit Gaussian (fwhm_maj, fwhm_min, theta) Returns: (tuple) : (fwhm_maj, fwhm_min, theta) of the fit Gaussian parameters in radians. """ # TODO this fit doesn't work very well!! vis = self.data['vis'] u = self.data['u'] v = self.data['v'] sig = self.data['sigma'] # error function if fittype == 'amp': def errfunc(p): vismodel = obsh.gauss_uv(u, v, flux, p, x=0., y=0.) err = np.sum((np.abs(vis) - np.abs(vismodel))**2 / sig**2) return err else: def errfunc(p): vismodel = obsh.gauss_uv(u, v, flux, p, x=0., y=0.) err = np.sum(np.abs(vis - vismodel)**2 / sig**2) return err optdict = {'maxiter': 5000} # minimizer params res = opt.minimize(errfunc, paramguess, method='Powell', options=optdict) gparams = res.x return gparams
[docs] def bispectra(self, vtype='vis', mode='all', count='min', timetype=False, uv_min=False, snrcut=0.): """Return a recarray of the equal time bispectra. Args: vtype (str): The visibilty type from which to assemble bispectra ('vis', 'qvis', 'uvis','vvis','rrvis','lrvis','rlvis','llvis') mode (str): If 'time', return phases in a list of equal time arrays, if 'all', return all phases in a single array count (str): If 'min', return minimal set of bispectra, if 'max' return all bispectra up to reordering timetype (str): 'GMST' or 'UTC' uv_min (float): flag baselines shorter than this before forming closure quantities snrcut (float): flag bispectra with snr lower than this Returns: (numpy.recarry): A recarray of the bispectra values with datatype DTBIS """ if timetype is False: timetype = self.timetype if mode not in ('time', 'all'): raise Exception("possible options for mode are 'time' and 'all'") if count not in ('max', 'min', 'min-cut0bl'): raise Exception("possible options for count are 'max', 'min', or 'min-cut0bl'") if vtype not in ('vis', 'qvis', 'uvis', 'vvis', 'rrvis', 'lrvis', 'rlvis', 'llvis'): raise Exception("possible options for vtype are" + " 'vis', 'qvis', 'uvis','vvis','rrvis','lrvis','rlvis','llvis'") if timetype not in ['GMST', 'UTC', 'gmst', 'utc']: raise Exception("timetype should be 'GMST' or 'UTC'!") # Flag zero baselines obsdata = self.copy() if uv_min: obsdata = obsdata.flag_uvdist(uv_min=uv_min) # get which sites were flagged obsdata_flagged = self.copy() obsdata_flagged = obsdata_flagged.flag_uvdist(uv_max=uv_min) # Generate the time-sorted data with conjugate baselines tlist = obsdata.tlist(conj=True) out = [] bis = [] tt = 1 for tdata in tlist: # sys.stdout.write('\rGetting bispectra:: type %s, count %s, scan %i/%i ' % # (vtype, count, tt, len(tlist))) # sys.stdout.flush() tt += 1 time = tdata[0]['time'] if timetype in ['GMST', 'gmst'] and self.timetype == 'UTC': time = obsh.utc_to_gmst(time, self.mjd) if timetype in ['UTC', 'utc'] and self.timetype == 'GMST': time = obsh.gmst_to_utc(time, self.mjd) sites = list(set(np.hstack((tdata['t1'], tdata['t2'])))) # Create a dictionary of baselines at the current time incl. conjugates; l_dict = {} for dat in tdata: l_dict[(dat['t1'], dat['t2'])] = dat # Determine the triangles in the time step # Minimal Set if count == 'min': tris = obsh.tri_minimal_set(sites, self.tarr, self.tkey) # Maximal Set elif count == 'max': tris = np.sort(list(it.combinations(sites, 3))) elif count == 'min-cut0bl': tris = obsh.tri_minimal_set(sites, self.tarr, self.tkey) # if you cut the 0 baselines, add in triangles that now are not in the minimal set if uv_min: # get the reference site sites_ordered = [x for x in self.tarr['site'] if x in sites] ref = sites_ordered[0] # check if the reference site was in a zero baseline zerobls = np.vstack([obsdata_flagged.data['t1'], obsdata_flagged.data['t2']]) if np.sum(zerobls == ref): # determine which sites were cut out of the minimal set cutsites = np.unique(np.hstack([zerobls[1][zerobls[0] == ref], zerobls[0][zerobls[1] == ref]])) # we can only handle if there was 1 connecting site that was cut if len(cutsites) > 1: raise Exception("Cannot have the root node be in a clique" + "with more than 2 sites sharing 0 baselines'") # get the remaining sites cutsite = cutsites[0] sites_remaining = np.array(sites_ordered)[np.array(sites_ordered) != ref] sites_remaining = sites_remaining[np.array(sites_remaining) != cutsite] # get the next site in the list, ideally sorted by snr second_ref = sites_remaining[0] # add in additional triangles for s2 in range(1, len(sites_remaining)): tris.append((cutsite, second_ref, sites_remaining[s2])) # Generate bispectra for each triangle for tri in tris: # Select triangle entries in the data dictionary try: l1 = l_dict[(tri[0], tri[1])] l2 = l_dict[(tri[1], tri[2])] l3 = l_dict[(tri[2], tri[0])] except KeyError: continue (bi, bisig) = obsh.make_bispectrum(l1, l2, l3, vtype, polrep=self.polrep) # Cut out low snr points if np.abs(bi) / bisig < snrcut: continue # Append to the equal-time list bis.append(np.array((time, tri[0], tri[1], tri[2], l1['u'], l1['v'], l2['u'], l2['v'], l3['u'], l3['v'], bi, bisig), dtype=ehc.DTBIS)) # Append to outlist if mode == 'time' and len(bis) > 0: out.append(np.array(bis)) bis = [] if mode == 'all': out = np.array(bis) return out
[docs] def c_phases(self, vtype='vis', mode='all', count='min', ang_unit='deg', timetype=False, uv_min=False, snrcut=0.): """Return a recarray of the equal time closure phases. Args: vtype (str): The visibilty type from which to assemble closure phases ('vis','qvis','uvis','vvis','pvis') mode (str): If 'time', return phases in a list of equal time arrays, if 'all', return all phases in a single array count (str): If 'min', return minimal set of phases, if 'max' return all closure phases up to reordering ang_unit (str): If 'deg', return closure phases in degrees, else return in radians timetype (str): 'UTC' or 'GMST' uv_min (float): flag baselines shorter than this before forming closure quantities snrcut (float): flag bispectra with snr lower than this Returns: (numpy.recarry): A recarray of the closure phases with datatype DTCPHASE """ if timetype is False: timetype = self.timetype if mode not in ('time', 'all'): raise Exception("possible options for mode are 'time' and 'all'") if count not in ('max', 'min', 'min-cut0bl'): raise Exception("possible options for count are 'max', 'min', or 'min-cut0bl'") if vtype not in ('vis', 'qvis', 'uvis', 'vvis', 'rrvis', 'lrvis', 'rlvis', 'llvis'): raise Exception("possible options for vtype are" + " 'vis', 'qvis', 'uvis','vvis','rrvis','lrvis','rlvis','llvis'") if timetype not in ['GMST', 'UTC', 'gmst', 'utc']: raise Exception("timetype should be 'GMST' or 'UTC'!") if ang_unit == 'deg': angle = ehc.DEGREE else: angle = 1.0 # Get the bispectra data bispecs = self.bispectra(vtype=vtype, mode='time', count=count, timetype=timetype, uv_min=uv_min, snrcut=snrcut) # Reformat into a closure phase list/array out = [] cps = [] cpnames = ('time', 't1', 't2', 't3', 'u1', 'v1', 'u2', 'v2', 'u3', 'v3', 'cphase', 'sigmacp') for bis in bispecs: for bi in bis: if len(bi) == 0: continue bi.dtype.names = cpnames bi['sigmacp'] = np.real(bi['sigmacp'] / np.abs(bi['cphase']) / angle) bi['cphase'] = np.real((np.angle(bi['cphase']) / angle)) cps.append(bi.astype(np.dtype(ehc.DTCPHASE))) if mode == 'time' and len(cps) > 0: out.append(np.array(cps)) cps = [] if mode == 'all': out = np.array(cps) return out
[docs] def c_phases_diag(self, vtype='vis', count='min', ang_unit='deg', timetype=False, uv_min=False, snrcut=0.): """Return a recarray of the equal time diagonalized closure phases. Args: vtype (str): The visibilty type ('vis','qvis','uvis','vvis','pvis') from which to assemble closure phases count (str): If 'min', return minimal set of phases, If 'min-cut0bl' return minimal set after flagging zero-baselines ang_unit (str): If 'deg', return closure phases in degrees, else return in radians timetype (str): 'UTC' or 'GMST' uv_min (float): flag baselines shorter than this before forming closure quantities snrcut (float): flag bispectra with snr lower than this Returns: (numpy.recarry): A recarray of diagonalized closure phases (datatype DTCPHASEDIAG), along with associated triangles and transformation matrices """ if timetype is False: timetype = self.timetype if count not in ('min', 'min-cut0bl'): raise Exception( "possible options for count are 'min' or 'min-cut0bl' for diagonal closure phases") if vtype not in ('vis', 'qvis', 'uvis', 'vvis', 'rrvis', 'lrvis', 'rlvis', 'llvis'): raise Exception( "possible options for vtype are 'vis', 'qvis', " + "'uvis','vvis','rrvis','lrvis','rlvis','llvis'") if timetype not in ['GMST', 'UTC', 'gmst', 'utc']: raise Exception("timetype should be 'GMST' or 'UTC'!") if ang_unit == 'deg': angle = ehc.DEGREE else: angle = 1.0 # determine the appropriate sigmatype if vtype in ["vis", "qvis", "uvis", "vvis"]: if vtype == 'vis': sigmatype = 'sigma' if vtype == 'qvis': sigmatype = 'qsigma' if vtype == 'uvis': sigmatype = 'usigma' if vtype == 'vvis': sigmatype = 'vsigma' if vtype in ["rrvis", "llvis", "rlvis", "lrvis"]: if vtype == 'rrvis': sigmatype = 'rrsigma' if vtype == 'llvis': sigmatype = 'llsigma' if vtype == 'rlvis': sigmatype = 'rlsigma' if vtype == 'lrvis': sigmatype = 'lrsigma' # get the time-sorted visibility data including conjugate baselines viss = np.concatenate(self.tlist(conj=True)) # get the closure phase data cps = self.c_phases(vtype=vtype, mode='all', count=count, ang_unit=ang_unit, timetype=timetype, uv_min=uv_min, snrcut=snrcut) # get the unique timestamps for the closure phases T_cps = np.unique(cps['time']) # list of diagonalized closure phases and corresponding transformation matrices dcps = [] dcp_errs = [] tfmats = [] tris = [] us = [] vs = [] # loop over the timestamps for kk, t in enumerate(T_cps): sys.stdout.write('\rDiagonalizing closure phases:: type %s, count %s, scan %i/%i ' % (vtype, count, kk + 1, len(T_cps))) sys.stdout.flush() # index masks for this timestamp mask_cp = (cps['time'] == t) mask_vis = (viss['time'] == t) # closure phases for this timestamp cps_here = cps[mask_cp] # visibilities for this timestamp viss_here = viss[mask_vis] # initialize the design matrix design_mat = np.zeros((mask_cp.sum(), mask_vis.sum())) # loop over the closure phases within this timestamp trilist = [] ulist = [] vlist = [] for ic, cp in enumerate(cps_here): trilist.append((cp['t1'], cp['t2'], cp['t3'])) ulist.append((cp['u1'], cp['u2'], cp['u3'])) vlist.append((cp['v1'], cp['v2'], cp['v3'])) # matrix entry for first leg of triangle ind1 = ((viss_here['t1'] == cp['t1']) & (viss_here['t2'] == cp['t2'])) design_mat[ic, ind1] = 1.0 # matrix entry for second leg of triangle ind2 = ((viss_here['t1'] == cp['t2']) & (viss_here['t2'] == cp['t3'])) design_mat[ic, ind2] = 1.0 # matrix entry for third leg of triangle ind3 = ((viss_here['t1'] == cp['t3']) & (viss_here['t2'] == cp['t1'])) design_mat[ic, ind3] = 1.0 # construct the covariance matrix visphase_err = viss_here[sigmatype] / np.abs(viss_here[vtype]) sigma_mat = np.diag(visphase_err**2.0) covar_mat = np.matmul(np.matmul(design_mat, sigma_mat), np.transpose(design_mat)) # diagonalize via eigendecomposition eigeninfo = np.linalg.eigh(covar_mat) S_matrix = np.copy(eigeninfo[1]).transpose() dcphase = np.matmul(S_matrix, cps_here['cphase']) if ang_unit != 'deg': dcphase *= angle dcphase_err = np.sqrt(np.copy(eigeninfo[0])) / angle dcps.append(dcphase) dcp_errs.append(dcphase_err) tfmats.append(S_matrix) tris.append(trilist) us.append(ulist) vs.append(vlist) # Reformat into a list out = [] for kk, t in enumerate(T_cps): dcparr = [] for idcp, dcp in enumerate(dcps[kk]): dcparr.append((t, dcp, dcp_errs[kk][idcp])) dcparr = np.array(dcparr, dtype=[('time', 'f8'), ('cphase', 'f8'), ('sigmacp', 'f8')]) out.append((dcparr, np.array(tris[kk]).astype(np.dtype([('trianges', 'U2')])), np.array(us[kk]).astype(np.dtype([('u', 'f8')])), np.array(vs[kk]).astype(np.dtype([('v', 'f8')])), tfmats[kk].astype(np.dtype([('tform_matrix', 'f8')])))) print("\n") return out
[docs] def bispectra_tri(self, site1, site2, site3, vtype='vis', timetype=False, snrcut=0., method='from_maxset', bs=[], force_recompute=False): """Return complex bispectrum over time on a triangle (1-2-3). Args: site1 (str): station 1 name site2 (str): station 2 name site3 (str): station 3 name vtype (str): The visibilty type from which to assemble bispectra ('vis','qvis','uvis','vvis','pvis') timetype (str): 'UTC' or 'GMST' snrcut (float): flag bispectra with snr lower than this method (str): 'from_maxset' (old, default), 'from_vis' (new, more robust) bs (list): optionally pass in the precomputed, time-sorted bispectra force_recompute (bool): if True, recompute bispectra instead of using saved data Returns: (numpy.recarry): A recarray of the bispectra on this triangle with datatype DTBIS """ if timetype is False: timetype = self.timetype if method=='from_maxset' and (vtype in ['lrvis','pvis','rlvis']): print ("Warning! method='from_maxset' default in bispectra_tri() inconsistent with vtype=%s" % vtype) print ("Switching to method='from_vis'") method = 'from_vis' tri = (site1, site2, site3) outdata = [] # get selected bispectra from the maximal set # TODO: verify consistency/performance of from_vis, and delete this method if method=='from_maxset': if ((len(bs) == 0) and not (self.bispec is None) and not (len(self.bispec) == 0) and not force_recompute): bs = self.bispec elif (len(bs) == 0) or force_recompute: bs = self.bispectra(mode='all', count='max', vtype=vtype, timetype=timetype, snrcut=snrcut) # Get requested bispectra over time for obs in bs: obstri = (obs['t1'], obs['t2'], obs['t3']) if set(obstri) == set(tri): t1 = copy.deepcopy(obs['t1']) t2 = copy.deepcopy(obs['t2']) t3 = copy.deepcopy(obs['t3']) u1 = copy.deepcopy(obs['u1']) u2 = copy.deepcopy(obs['u2']) u3 = copy.deepcopy(obs['u3']) v1 = copy.deepcopy(obs['v1']) v2 = copy.deepcopy(obs['v2']) v3 = copy.deepcopy(obs['v3']) # Reorder baselines and flip the sign of the closure phase if necessary if t1 == site1: if t2 == site2: pass else: obs['t2'] = t3 obs['t3'] = t2 obs['u1'] = -u3 obs['v1'] = -v3 obs['u2'] = -u2 obs['v2'] = -v2 obs['u3'] = -u1 obs['v3'] = -v1 obs['bispec'] = np.conjugate(obs['bispec']) elif t1 == site2: if t2 == site3: obs['t1'] = t3 obs['t2'] = t1 obs['t3'] = t2 obs['u1'] = u3 obs['v1'] = v3 obs['u2'] = u1 obs['v2'] = v1 obs['u3'] = u2 obs['v3'] = v2 else: obs['t1'] = t2 obs['t2'] = t1 obs['u1'] = -u1 obs['v1'] = -v1 obs['u2'] = -u3 obs['v2'] = -v3 obs['u3'] = -u2 obs['v3'] = -v2 obs['bispec'] = np.conjugate(obs['bispec']) elif t1 == site3: if t2 == site1: obs['t1'] = t2 obs['t2'] = t3 obs['t3'] = t1 obs['u1'] = u2 obs['v1'] = v2 obs['u2'] = u3 obs['v2'] = v3 obs['u3'] = u1 obs['v3'] = v1 else: obs['t1'] = t3 obs['t3'] = t1 obs['u1'] = -u2 obs['v1'] = -v2 obs['u2'] = -u1 obs['v2'] = -v1 obs['u3'] = -u3 obs['v3'] = -v3 obs['bispec'] = np.conjugate(obs['bispec']) outdata.append(np.array(obs, dtype=ehc.DTBIS)) continue # get selected bispectra from the visibilities directly # taken from bispectra() method elif method=='from_vis': # get all equal-time data, and loop over to construct bispectra tlist = self.tlist(conj=True) for tdata in tlist: time = tdata[0]['time'] if timetype in ['GMST', 'gmst'] and self.timetype == 'UTC': time = obsh.utc_to_gmst(time, self.mjd) if timetype in ['UTC', 'utc'] and self.timetype == 'GMST': time = obsh.gmst_to_utc(time, self.mjd) # Create a dictionary of baselines at the current time incl. conjugates; l_dict = {} for dat in tdata: l_dict[(dat['t1'], dat['t2'])] = dat # Select triangle entries in the data dictionary try: l1 = l_dict[(tri[0], tri[1])] l2 = l_dict[(tri[1], tri[2])] l3 = l_dict[(tri[2], tri[0])] except KeyError: continue (bi, bisig) = obsh.make_bispectrum(l1, l2, l3, vtype, polrep=self.polrep) # Cut out low snr points if np.abs(bi) / bisig < snrcut: continue # Append to the equal-time list outdata.append(np.array((time, tri[0], tri[1], tri[2], l1['u'], l1['v'], l2['u'], l2['v'], l3['u'], l3['v'], bi, bisig), dtype=ehc.DTBIS)) else: raise Exception("keyword 'method' in bispectra_tri() must be either 'from_cphase' or 'from_vis'") outdata = np.array(outdata) return outdata
[docs] def cphase_tri(self, site1, site2, site3, vtype='vis', ang_unit='deg', timetype=False, snrcut=0., method='from_maxset', cphases=[], force_recompute=False): """Return closure phase over time on a triangle (1-2-3). Args: site1 (str): station 1 name site2 (str): station 2 name site3 (str): station 3 name vtype (str): The visibilty type from which to assemble closure phases (e.g., 'vis','qvis','uvis','vvis','pvis') ang_unit (str): If 'deg', return closure phases in degrees, else return in radians timetype (str): 'GMST' or 'UTC' snrcut (float): flag bispectra with snr lower than this method (str): 'from_maxset' (old, default), 'from_vis' (new, more robust) cphases (list): optionally pass in the precomputed time-sorted cphases force_recompute (bool): if True, do not use save closure phase tables Returns: (numpy.recarry): A recarray of the closure phases with datatype DTCPHASE """ if timetype is False: timetype = self.timetype if method=='from_maxset' and (vtype in ['lrvis','pvis','rlvis']): print ("Warning! method='from_maxset' default in cphase_tri() is inconsistent with vtype=%s" % vtype) print ("Switching to method='from_vis'") method = 'from_vis' tri = (site1, site2, site3) outdata = [] # get selected closure phases from the maximal set # TODO: verify consistency/performance of from_vis, and delete this method if method=='from_maxset': # Get closure phases (maximal set) if ((len(cphases) == 0) and not (self.cphase is None) and not (len(self.cphase) == 0) and not force_recompute): cphases = self.cphase elif (len(cphases) == 0) or force_recompute: cphases = self.c_phases(mode='all', count='max', vtype=vtype, ang_unit=ang_unit, timetype=timetype, snrcut=snrcut) # Get requested closure phases over time for obs in cphases: obstri = (obs['t1'], obs['t2'], obs['t3']) if set(obstri) == set(tri): t1 = copy.deepcopy(obs['t1']) t2 = copy.deepcopy(obs['t2']) t3 = copy.deepcopy(obs['t3']) u1 = copy.deepcopy(obs['u1']) u2 = copy.deepcopy(obs['u2']) u3 = copy.deepcopy(obs['u3']) v1 = copy.deepcopy(obs['v1']) v2 = copy.deepcopy(obs['v2']) v3 = copy.deepcopy(obs['v3']) # Reorder baselines and flip the sign of the closure phase if necessary if t1 == site1: if t2 == site2: pass else: obs['t2'] = t3 obs['t3'] = t2 obs['u1'] = -u3 obs['v1'] = -v3 obs['u2'] = -u2 obs['v2'] = -v2 obs['u3'] = -u1 obs['v3'] = -v1 obs['cphase'] *= -1 elif t1 == site2: if t2 == site3: obs['t1'] = t3 obs['t2'] = t1 obs['t3'] = t2 obs['u1'] = u3 obs['v1'] = v3 obs['u2'] = u1 obs['v2'] = v1 obs['u3'] = u2 obs['v3'] = v2 else: obs['t1'] = t2 obs['t2'] = t1 obs['u1'] = -u1 obs['v1'] = -v1 obs['u2'] = -u3 obs['v2'] = -v3 obs['u3'] = -u2 obs['v3'] = -v2 obs['cphase'] *= -1 elif t1 == site3: if t2 == site1: obs['t1'] = t2 obs['t2'] = t3 obs['t3'] = t1 obs['u1'] = u2 obs['v1'] = v2 obs['u2'] = u3 obs['v2'] = v3 obs['u3'] = u1 obs['v3'] = v1 else: obs['t1'] = t3 obs['t3'] = t1 obs['u1'] = -u2 obs['v1'] = -v2 obs['u2'] = -u1 obs['v2'] = -v1 obs['u3'] = -u3 obs['v3'] = -v3 obs['cphase'] *= -1 outdata.append(np.array(obs, dtype=ehc.DTCPHASE)) continue # get selected closure phases from the visibilities directly # taken from bispectra() method elif method=='from_vis': if ang_unit == 'deg': angle = ehc.DEGREE else: angle = 1.0 # get all equal-time data, and loop over to construct closure phase tlist = self.tlist(conj=True) for tdata in tlist: time = tdata[0]['time'] if timetype in ['GMST', 'gmst'] and self.timetype == 'UTC': time = obsh.utc_to_gmst(time, self.mjd) if timetype in ['UTC', 'utc'] and self.timetype == 'GMST': time = obsh.gmst_to_utc(time, self.mjd) # Create a dictionary of baselines at the current time incl. conjugates; l_dict = {} for dat in tdata: l_dict[(dat['t1'], dat['t2'])] = dat # Select triangle entries in the data dictionary try: l1 = l_dict[(tri[0], tri[1])] l2 = l_dict[(tri[1], tri[2])] l3 = l_dict[(tri[2], tri[0])] except KeyError: continue (bi, bisig) = obsh.make_bispectrum(l1, l2, l3, vtype, polrep=self.polrep) # Cut out low snr points if np.abs(bi) / bisig < snrcut: continue # Append to the equal-time list outdata.append(np.array((time, tri[0], tri[1], tri[2], l1['u'], l1['v'], l2['u'], l2['v'], l3['u'], l3['v'], np.real(np.angle(bi) / angle), np.real(bisig / np.abs(bi) / angle)), dtype=ehc.DTCPHASE)) else: raise Exception("keyword 'method' in cphase_tri() must be either 'from_cphase' or 'from_vis'") outdata = np.array(outdata) return outdata
[docs] def c_amplitudes(self, vtype='vis', mode='all', count='min', ctype='camp', debias=True, timetype=False, snrcut=0.): """Return a recarray of the equal time closure amplitudes. Args: vtype (str): The visibilty type from which to assemble closure amplitudes ('vis','qvis','uvis','vvis','pvis') ctype (str): The closure amplitude type ('camp' or 'logcamp') mode (str): If 'time', return amplitudes in a list of equal time arrays, if 'all', return all amplitudes in a single array count (str): If 'min', return minimal set of amplitudes, if 'max' return all closure amplitudes up to inverses debias (bool): If True, debias the closure amplitude timetype (str): 'GMST' or 'UTC' snrcut (float): flag closure amplitudes with snr lower than this Returns: (numpy.recarry): A recarray of the closure amplitudes with datatype DTCAMP """ if timetype is False: timetype = self.timetype if mode not in ('time', 'all'): raise Exception("possible options for mode are 'time' and 'all'") if count not in ('max', 'min'): raise Exception("possible options for count are 'max' and 'min'") if vtype not in ('vis', 'qvis', 'uvis', 'vvis', 'rrvis', 'lrvis', 'rlvis', 'llvis'): raise Exception("possible options for vtype are " + "'vis', 'qvis', 'uvis','vvis','rrvis','lrvis','rlvis','llvis'") if not (ctype in ['camp', 'logcamp']): raise Exception("closure amplitude type must be 'camp' or 'logcamp'!") if timetype not in ['GMST', 'UTC', 'gmst', 'utc']: raise Exception("timetype should be 'GMST' or 'UTC'!") # Get data sorted by time tlist = self.tlist(conj=True) out = [] cas = [] tt = 1 for tdata in tlist: # sys.stdout.write('\rGetting closure amps:: type %s %s , count %s, scan %i/%i' % # (vtype, ctype, count, tt, len(tlist))) # sys.stdout.flush() tt += 1 time = tdata[0]['time'] if timetype in ['GMST', 'gmst'] and self.timetype == 'UTC': time = obsh.utc_to_gmst(time, self.mjd) if timetype in ['UTC', 'utc'] and self.timetype == 'GMST': time = obsh.gmst_to_utc(time, self.mjd) sites = np.array(list(set(np.hstack((tdata['t1'], tdata['t2']))))) if len(sites) < 4: continue # Create a dictionary of baseline data at the current time including conjugates; l_dict = {} for dat in tdata: l_dict[(dat['t1'], dat['t2'])] = dat # Minimal set if count == 'min': quadsets = obsh.quad_minimal_set(sites, self.tarr, self.tkey) # Maximal Set elif count == 'max': # Find all quadrangles quadsets = np.sort(list(it.combinations(sites, 4))) # Include 3 closure amplitudes on each quadrangle quadsets = np.array([(q, [q[0], q[2], q[1], q[3]], [q[0], q[1], q[3], q[2]]) for q in quadsets]).reshape((-1, 4)) # Loop over all closure amplitudes for quad in quadsets: # Blue is numerator, red is denominator if (quad[0], quad[1]) not in l_dict.keys(): continue if (quad[2], quad[3]) not in l_dict.keys(): continue if (quad[1], quad[2]) not in l_dict.keys(): continue if (quad[0], quad[3]) not in l_dict.keys(): continue try: blue1 = l_dict[quad[0], quad[1]] blue2 = l_dict[quad[2], quad[3]] red1 = l_dict[quad[0], quad[3]] red2 = l_dict[quad[1], quad[2]] except KeyError: continue # Compute the closure amplitude and the error (camp, camperr) = obsh.make_closure_amplitude(blue1, blue2, red1, red2, vtype, polrep=self.polrep, ctype=ctype, debias=debias) if ctype == 'camp' and camp / camperr < snrcut: continue elif ctype == 'logcamp' and 1. / camperr < snrcut: continue # Add the closure amplitudes to the equal-time list # Our site convention is (12)(34)/(14)(23) cas.append(np.array((time, quad[0], quad[1], quad[2], quad[3], blue1['u'], blue1['v'], blue2['u'], blue2['v'], red1['u'], red1['v'], red2['u'], red2['v'], camp, camperr), dtype=ehc.DTCAMP)) # Append all equal time closure amps to outlist if mode == 'time': out.append(np.array(cas)) cas = [] if mode == 'all': out = np.array(cas) return out
[docs] def c_log_amplitudes_diag(self, vtype='vis', mode='all', count='min', debias=True, timetype=False, snrcut=0.): """Return a recarray of the equal time diagonalized log closure amplitudes. Args: vtype (str): The visibilty type ('vis','qvis','uvis','vvis','pvis') From which to assemble closure amplitudes ctype (str): The closure amplitude type ('camp' or 'logcamp') mode (str): If 'time', return amplitudes in a list of equal time arrays, If 'all', return all amplitudes in a single array count (str): If 'min', return minimal set of amplitudes, If 'max' return all closure amplitudes up to inverses debias (bool): If True, debias the closure amplitude - The individual visibility amplitudes are always debiased. timetype (str): 'GMST' or 'UTC' snrcut (float): flag closure amplitudes with snr lower than this Returns: (numpy.recarry): A recarray of diagonalized closure amps with datatype DTLOGCAMPDIAG """ if timetype is False: timetype = self.timetype if mode not in ('time', 'all'): raise Exception("possible options for mode are 'time' and 'all'") if count not in ('min'): raise Exception("count can only be 'min' for diagonal log closure amplitudes") if vtype not in ('vis', 'qvis', 'uvis', 'vvis', 'rrvis', 'lrvis', 'rlvis', 'llvis'): raise Exception( "possible options for vtype are 'vis', 'qvis', 'uvis', " + "'vvis','rrvis','lrvis','rlvis','llvis'") if timetype not in ['GMST', 'UTC', 'gmst', 'utc']: raise Exception("timetype should be 'GMST' or 'UTC'!") # determine the appropriate sigmatype if vtype in ["vis", "qvis", "uvis", "vvis"]: if vtype == 'vis': sigmatype = 'sigma' if vtype == 'qvis': sigmatype = 'qsigma' if vtype == 'uvis': sigmatype = 'usigma' if vtype == 'vvis': sigmatype = 'vsigma' if vtype in ["rrvis", "llvis", "rlvis", "lrvis"]: if vtype == 'rrvis': sigmatype = 'rrsigma' if vtype == 'llvis': sigmatype = 'llsigma' if vtype == 'rlvis': sigmatype = 'rlsigma' if vtype == 'lrvis': sigmatype = 'lrsigma' # get the time-sorted visibility data including conjugate baselines viss = np.concatenate(self.tlist(conj=True)) # get the log closure amplitude data lcas = self.c_amplitudes(vtype=vtype, mode=mode, count=count, ctype='logcamp', debias=debias, timetype=timetype, snrcut=snrcut) # get the unique timestamps for the log closure amplitudes T_lcas = np.unique(lcas['time']) # list of diagonalized log closure camplitudes and corresponding transformation matrices dlcas = [] dlca_errs = [] tfmats = [] quads = [] us = [] vs = [] # loop over the timestamps for kk, t in enumerate(T_lcas): printstr = ('\rDiagonalizing log closure amplitudes:: type %s, count %s, scan %i/%i ' % (vtype, count, kk + 1, len(T_lcas))) sys.stdout.write(printstr) sys.stdout.flush() # index masks for this timestamp mask_lca = (lcas['time'] == t) mask_vis = (viss['time'] == t) # log closure amplitudes for this timestamp lcas_here = lcas[mask_lca] # visibilities for this timestamp viss_here = viss[mask_vis] # initialize the design matrix design_mat = np.zeros((mask_lca.sum(), mask_vis.sum())) # loop over the log closure amplitudes within this timestamp quadlist = [] ulist = [] vlist = [] for il, lca in enumerate(lcas_here): quadlist.append((lca['t1'], lca['t2'], lca['t3'], lca['t4'])) ulist.append((lca['u1'], lca['u2'], lca['u3'], lca['u4'])) vlist.append((lca['v1'], lca['v2'], lca['v3'], lca['v4'])) # matrix entry for first leg of quadrangle ind1 = ((viss_here['t1'] == lca['t1']) & (viss_here['t2'] == lca['t2'])) design_mat[il, ind1] = 1.0 # matrix entry for second leg of quadrangle ind2 = ((viss_here['t1'] == lca['t3']) & (viss_here['t2'] == lca['t4'])) design_mat[il, ind2] = 1.0 # matrix entry for third leg of quadrangle ind3 = ((viss_here['t1'] == lca['t1']) & (viss_here['t2'] == lca['t4'])) design_mat[il, ind3] = -1.0 # matrix entry for fourth leg of quadrangle ind4 = ((viss_here['t1'] == lca['t2']) & (viss_here['t2'] == lca['t3'])) design_mat[il, ind4] = -1.0 # construct the covariance matrix logvisamp_err = viss_here[sigmatype] / np.abs(viss_here[vtype]) sigma_mat = np.diag(logvisamp_err**2.0) covar_mat = np.matmul(np.matmul(design_mat, sigma_mat), np.transpose(design_mat)) # diagonalize via eigendecomposition eigeninfo = np.linalg.eigh(covar_mat) T_matrix = np.copy(eigeninfo[1]).transpose() dlogcamp = np.matmul(T_matrix, lcas_here['camp']) dlogcamp_err = np.sqrt(np.copy(eigeninfo[0])) dlcas.append(dlogcamp) dlca_errs.append(dlogcamp_err) tfmats.append(T_matrix) quads.append(quadlist) us.append(ulist) vs.append(vlist) # Reformat into a list out = [] for kk, t in enumerate(T_lcas): dlcaarr = [] for idlca, dlca in enumerate(dlcas[kk]): dlcaarr.append((t, dlca, dlca_errs[kk][idlca])) dlcaarr = np.array(dlcaarr, dtype=[('time', 'f8'), ('camp', 'f8'), ('sigmaca', 'f8')]) out.append((dlcaarr, np.array(quads[kk]).astype(np.dtype([('quadrangles', 'U2')])), np.array(us[kk]).astype(np.dtype([('u', 'f8')])), np.array(vs[kk]).astype(np.dtype([('v', 'f8')])), tfmats[kk].astype(np.dtype([('tform_matrix', 'f8')])))) print("\n") return out
[docs] def camp_quad(self, site1, site2, site3, site4, vtype='vis', ctype='camp', debias=True, timetype=False, snrcut=0., method='from_maxset', camps=[], force_recompute=False): """Return closure phase over time on a quadrange (1-2)(3-4)/(1-4)(2-3). Args: site1 (str): station 1 name site2 (str): station 2 name site3 (str): station 3 name site4 (str): station 4 name vtype (str): The visibilty type from which to assemble closure amplitudes ('vis','qvis','uvis','vvis','pvis') ctype (str): The closure amplitude type ('camp' or 'logcamp') debias (bool): If True, debias the closure amplitude timetype (str): 'UTC' or 'GMST' snrcut (float): flag closure amplitudes with snr lower than this method (str): 'from_maxset' (old, default), 'from_vis' (new, more robust) camps (list): optionally pass in the time-sorted, precomputed camps force_recompute (bool): if True, do not use save closure amplitude data Returns: (numpy.recarry): A recarray of the closure amplitudes with datatype DTCAMP """ if timetype is False: timetype = self.timetype if method=='from_maxset' and (vtype in ['lrvis','pvis','rlvis']): print ("Warning! method='from_maxset' default in camp_quad() is inconsistent with vtype=%s" % vtype) print ("Switching to method='from_vis'") method = 'from_vis' quad = (site1, site2, site3, site4) outdata = [] # get selected closure amplitudes from the maximal set # TODO: verify consistency/performance of from_vis, and delete this method if method=='from_maxset': if (((ctype == 'camp') and (len(camps) == 0)) and not (self.camp is None) and not (len(self.camp) == 0) and not force_recompute): camps = self.camp elif (((ctype == 'logcamp') and (len(camps) == 0)) and not (self.logcamp is None) and not (len(self.logcamp) == 0) and not force_recompute): camps = self.logcamp elif (len(camps) == 0) or force_recompute: camps = self.c_amplitudes(mode='all', count='max', vtype=vtype, ctype=ctype, debias=debias, timetype=timetype, snrcut=snrcut) # blue bls in numerator, red in denominator b1 = set((site1, site2)) b2 = set((site3, site4)) r1 = set((site1, site4)) r2 = set((site2, site3)) for obs in camps: # camps does not contain inverses! num = [set((obs['t1'], obs['t2'])), set((obs['t3'], obs['t4']))] denom = [set((obs['t1'], obs['t4'])), set((obs['t2'], obs['t3']))] obsquad = (obs['t1'], obs['t2'], obs['t3'], obs['t4']) if set(quad) == set(obsquad): # is this either the closure amplitude or inverse? rightup = (b1 in num) and (b2 in num) and (r1 in denom) and (r2 in denom) wrongup = (b1 in denom) and (b2 in denom) and (r1 in num) and (r2 in num) if not (rightup or wrongup): continue # flip the inverse closure amplitudes if wrongup: t1old = copy.deepcopy(obs['t1']) u1old = copy.deepcopy(obs['u1']) v1old = copy.deepcopy(obs['v1']) t2old = copy.deepcopy(obs['t2']) u2old = copy.deepcopy(obs['u2']) v2old = copy.deepcopy(obs['v2']) t3old = copy.deepcopy(obs['t3']) u3old = copy.deepcopy(obs['u3']) v3old = copy.deepcopy(obs['v3']) t4old = copy.deepcopy(obs['t4']) u4old = copy.deepcopy(obs['u4']) v4old = copy.deepcopy(obs['v4']) campold = copy.deepcopy(obs['camp']) csigmaold = copy.deepcopy(obs['sigmaca']) obs['t1'] = t1old obs['t2'] = t4old obs['t3'] = t3old obs['t4'] = t2old obs['u1'] = u3old obs['v1'] = v3old obs['u2'] = -u4old obs['v2'] = -v4old obs['u3'] = u1old obs['v3'] = v1old obs['u4'] = -u2old obs['v4'] = -v2old if ctype == 'logcamp': obs['camp'] = -campold obs['sigmaca'] = csigmaold else: obs['camp'] = 1. / campold obs['sigmaca'] = csigmaold / (campold**2) t1old = copy.deepcopy(obs['t1']) u1old = copy.deepcopy(obs['u1']) v1old = copy.deepcopy(obs['v1']) t2old = copy.deepcopy(obs['t2']) u2old = copy.deepcopy(obs['u2']) v2old = copy.deepcopy(obs['v2']) t3old = copy.deepcopy(obs['t3']) u3old = copy.deepcopy(obs['u3']) v3old = copy.deepcopy(obs['v3']) t4old = copy.deepcopy(obs['t4']) u4old = copy.deepcopy(obs['u4']) v4old = copy.deepcopy(obs['v4']) # this is all same closure amplitude, but the ordering of labels is different # return the label ordering that the user requested! if (obs['t2'], obs['t1'], obs['t4'], obs['t3']) == quad: obs['t1'] = t2old obs['t2'] = t1old obs['t3'] = t4old obs['t4'] = t3old obs['u1'] = -u1old obs['v1'] = -v1old obs['u2'] = -u2old obs['v2'] = -v2old obs['u3'] = u4old obs['v3'] = v4old obs['u4'] = u3old obs['v4'] = v3old elif (obs['t3'], obs['t4'], obs['t1'], obs['t2']) == quad: obs['t1'] = t3old obs['t2'] = t4old obs['t3'] = t1old obs['t4'] = t2old obs['u1'] = u2old obs['v1'] = v2old obs['u2'] = u1old obs['v2'] = v1old obs['u3'] = -u4old obs['v3'] = -v4old obs['u4'] = -u3old obs['v4'] = -v3old elif (obs['t4'], obs['t3'], obs['t2'], obs['t1']) == quad: obs['t1'] = t4old obs['t2'] = t3old obs['t3'] = t2old obs['t4'] = t1old obs['u1'] = -u2old obs['v1'] = -v2old obs['u2'] = -u1old obs['v2'] = -v1old obs['u3'] = -u3old obs['v3'] = -v3old obs['u4'] = -u4old obs['v4'] = -v4old # append to output array outdata.append(np.array(obs, dtype=ehc.DTCAMP)) # get selected bispectra from the visibilities directly # taken from c_ampitudes() method elif method=='from_vis': # get all equal-time data, and loop over to construct closure amplitudes tlist = self.tlist(conj=True) for tdata in tlist: time = tdata[0]['time'] if timetype in ['GMST', 'gmst'] and self.timetype == 'UTC': time = obsh.utc_to_gmst(time, self.mjd) if timetype in ['UTC', 'utc'] and self.timetype == 'GMST': time = obsh.gmst_to_utc(time, self.mjd) sites = np.array(list(set(np.hstack((tdata['t1'], tdata['t2']))))) if len(sites) < 4: continue # Create a dictionary of baselines at the current time incl. conjugates; l_dict = {} for dat in tdata: l_dict[(dat['t1'], dat['t2'])] = dat # Select quadrangle entries in the data dictionary # Blue is numerator, red is denominator if (quad[0], quad[1]) not in l_dict.keys(): continue if (quad[2], quad[3]) not in l_dict.keys(): continue if (quad[1], quad[2]) not in l_dict.keys(): continue if (quad[0], quad[3]) not in l_dict.keys(): continue try: blue1 = l_dict[quad[0], quad[1]] blue2 = l_dict[quad[2], quad[3]] red1 = l_dict[quad[0], quad[3]] red2 = l_dict[quad[1], quad[2]] except KeyError: continue # Compute the closure amplitude and the error (camp, camperr) = obsh.make_closure_amplitude(blue1, blue2, red1, red2, vtype, polrep=self.polrep, ctype=ctype, debias=debias) if ctype == 'camp' and camp / camperr < snrcut: continue elif ctype == 'logcamp' and 1. / camperr < snrcut: continue # Add the closure amplitudes to the equal-time list # Our site convention is (12)(34)/(14)(23) outdata.append(np.array((time, quad[0], quad[1], quad[2], quad[3], blue1['u'], blue1['v'], blue2['u'], blue2['v'], red1['u'], red1['v'], red2['u'], red2['v'], camp, camperr), dtype=ehc.DTCAMP)) else: raise Exception("keyword 'method' in camp_quad() must be either 'from_cphase' or 'from_vis'") outdata = np.array(outdata) return outdata
[docs] def plotall(self, field1, field2, conj=False, debias=False, tag_bl=False, ang_unit='deg', timetype=False, axis=False, rangex=False, rangey=False, xscale='linear', yscale='linear', color=ehc.SCOLORS[0], marker='o', markersize=ehc.MARKERSIZE, label=None, snrcut=0., grid=True, ebar=True, axislabels=True, legend=False, show=True, export_pdf=""): """Plot two fields against each other. Args: field1 (str): x-axis field (from FIELDS) field2 (str): y-axis field (from FIELDS) conj (bool): Plot conjuage baseline data points if True debias (bool): If True, debias amplitudes. tag_bl (bool): if True, label each baseline ang_unit (str): phase unit 'deg' or 'rad' timetype (str): 'GMST' or 'UTC' axis (matplotlib.axes.Axes): add plot to this axis xscale (str): 'linear' or 'log' y-axis scale yscale (str): 'linear' or 'log' y-axis scale rangex (list): [xmin, xmax] x-axis limits rangey (list): [ymin, ymax] y-axis limits color (str): color for scatterplot points marker (str): matplotlib plot marker markersize (int): size of plot markers label (str): plot legend label snrcut (float): flag closure amplitudes with snr lower than this grid (bool): Plot gridlines if True ebar (bool): Plot error bars if True axislabels (bool): Show axis labels if True legend (bool): Show legend if True show (bool): Display the plot if true export_pdf (str): path to pdf file to save figure Returns: (matplotlib.axes.Axes): Axes object with data plot """ if timetype is False: timetype = self.timetype # Determine if fields are valid field1 = field1.lower() field2 = field2.lower() if (field1 not in ehc.FIELDS) and (field2 not in ehc.FIELDS): raise Exception("valid fields are " + ' '.join(ehc.FIELDS)) if 'amp' in [field1, field2] and not (self.amp is None): print("Warning: plotall is not using amplitudes in Obsdata.amp array!") # Label individual baselines # ANDREW TODO this is way too slow, make it faster?? if tag_bl: clist = ehc.SCOLORS # make a color coding dictionary cdict = {} ii = 0 baselines = np.sort(list(it.combinations(self.tarr['site'], 2))) for baseline in baselines: cdict[(baseline[0], baseline[1])] = clist[ii % len(clist)] cdict[(baseline[1], baseline[0])] = clist[ii % len(clist)] ii += 1 # get unique baselines -- TODO easier way? separate function? alldata = [] allsigx = [] allsigy = [] bllist = [] colors = [] bldata = self.bllist(conj=conj) for bl in bldata: t1 = bl['t1'][0] t2 = bl['t2'][0] bllist.append((t1, t2)) colors.append(cdict[(t1, t2)]) # Unpack data dat = self.unpack_dat(bl, [field1, field2], ang_unit=ang_unit, debias=debias, timetype=timetype) alldata.append(dat) # X error bars if obsh.sigtype(field1): allsigx.append(self.unpack_dat(bl, [obsh.sigtype(field1)], ang_unit=ang_unit)[obsh.sigtype(field1)]) else: allsigx.append(None) # Y error bars if obsh.sigtype(field2): allsigy.append(self.unpack_dat(bl, [obsh.sigtype(field2)], ang_unit=ang_unit)[obsh.sigtype(field2)]) else: allsigy.append(None) # Don't Label individual baselines else: bllist = [['All', 'All']] colors = [color] # unpack data alldata = [self.unpack([field1, field2], conj=conj, ang_unit=ang_unit, debias=debias, timetype=timetype)] # X error bars if obsh.sigtype(field1): allsigx = self.unpack(obsh.sigtype(field2), conj=conj, ang_unit=ang_unit) allsigx = [allsigx[obsh.sigtype(field1)]] else: allsigx = [None] # Y error bars if obsh.sigtype(field2): allsigy = self.unpack(obsh.sigtype(field2), conj=conj, ang_unit=ang_unit) allsigy = [allsigy[obsh.sigtype(field2)]] else: allsigy = [None] # make plot(s) if axis: x = axis else: fig = plt.figure() x = fig.add_subplot(1, 1, 1) xmins = [] xmaxes = [] ymins = [] ymaxes = [] for i in range(len(alldata)): data = alldata[i] sigy = allsigy[i] sigx = allsigx[i] color = colors[i] bl = bllist[i] # Flag out nans (to avoid problems determining plotting limits) mask = ~(np.isnan(data[field1]) + np.isnan(data[field2])) # Flag out due to snrcut if snrcut > 0.: sigs = [sigx, sigy] for jj, field in enumerate([field1, field2]): if field in ehc.FIELDS_AMPS: fmask = data[field] / sigs[jj] > snrcut elif field in ehc.FIELDS_PHASE: fmask = sigs[jj] < (180. / np.pi / snrcut) elif field in ehc.FIELDS_SNRS: fmask = data[field] > snrcut else: fmask = np.ones(mask.shape).astype(bool) mask *= fmask data = data[mask] if sigy is not None: sigy = sigy[mask] if sigx is not None: sigx = sigx[mask] if len(data) == 0: continue xmins.append(np.min(data[field1])) xmaxes.append(np.max(data[field1])) ymins.append(np.min(data[field2])) ymaxes.append(np.max(data[field2])) # Plot the data tolerance = len(data[field2]) if label is None: labelstr = "%s-%s" % ((str(bl[0]), str(bl[1]))) else: labelstr = str(label) if ebar and (np.any(sigy) or np.any(sigx)): x.errorbar(data[field1], data[field2], xerr=sigx, yerr=sigy, label=labelstr, fmt=marker, markersize=markersize, color=color, picker=tolerance) else: x.plot(data[field1], data[field2], marker, markersize=markersize, color=color, label=labelstr, picker=tolerance) # axis scales x.set_xscale(xscale) x.set_yscale(yscale) # Data ranges if not rangex: rangex = [np.min(xmins) - 0.2 * np.abs(np.min(xmins)), np.max(xmaxes) + 0.2 * np.abs(np.max(xmaxes))] if np.any(np.isnan(np.array(rangex))): print("Warning: NaN in data x range: specifying rangex to default") rangex = [-100, 100] if not rangey: rangey = [np.min(ymins) - 0.2 * np.abs(np.min(ymins)), np.max(ymaxes) + 0.2 * np.abs(np.max(ymaxes))] if np.any(np.isnan(np.array(rangey))): print("Warning: NaN in data y range: specifying rangey to default") rangey = [-100, 100] x.set_xlim(rangex) x.set_ylim(rangey) # label and save if axislabels: try: x.set_xlabel(ehc.FIELD_LABELS[field1]) x.set_ylabel(ehc.FIELD_LABELS[field2]) except KeyError: x.set_xlabel(field1.capitalize()) x.set_ylabel(field2.capitalize()) if legend and tag_bl: plt.legend(ncol=2) elif legend: plt.legend() if grid: x.grid() if export_pdf != "" and not axis: fig.savefig(export_pdf, bbox_inches='tight') if export_pdf != "" and axis: fig = plt.gcf() fig.savefig(export_pdf, bbox_inches='tight') if show: #plt.show(block=False) ehc.show_noblock() return x
[docs] def plot_bl(self, site1, site2, field, debias=False, ang_unit='deg', timetype=False, axis=False, rangex=False, rangey=False, snrcut=0., color=ehc.SCOLORS[0], marker='o', markersize=ehc.MARKERSIZE, label=None, grid=True, ebar=True, axislabels=True, legend=False, show=True, export_pdf=""): """Plot a field over time on a baseline site1-site2. Args: site1 (str): station 1 name site2 (str): station 2 name field (str): y-axis field (from FIELDS) debias (bool): If True, debias amplitudes. ang_unit (str): phase unit 'deg' or 'rad' timetype (str): 'GMST' or 'UTC' axis (matplotlib.axes.Axes): add plot to this axis rangex (list): [xmin, xmax] x-axis limits rangey (list): [ymin, ymax] y-axis limits color (str): color for scatterplot points marker (str): matplotlib plot marker markersize (int): size of plot markers label (str): plot legend label grid (bool): Plot gridlines if True ebar (bool): Plot error bars if True axislabels (bool): Show axis labels if True legend (bool): Show legend if True show (bool): Display the plot if true export_pdf (str): path to pdf file to save figure Returns: (matplotlib.axes.Axes): Axes object with data plot """ if timetype is False: timetype = self.timetype field = field.lower() if field == 'amp' and not (self.amp is None): print("Warning: plot_bl is not using amplitudes in Obsdata.amp array!") if label is None: label = str(self.source) else: label = str(label) # Determine if fields are valid if field not in ehc.FIELDS: raise Exception("valid fields are " + string.join(ehc.FIELDS)) plotdata = self.unpack_bl(site1, site2, field, ang_unit=ang_unit, debias=debias, timetype=timetype) sigmatype = obsh.sigtype(field) if obsh.sigtype(field): errdata = self.unpack_bl(site1, site2, obsh.sigtype(field), ang_unit=ang_unit, debias=debias) else: errdata = None # Flag out nans (to avoid problems determining plotting limits) mask = ~np.isnan(plotdata[field][:, 0]) # Flag out due to snrcut if snrcut > 0.: if field in ehc.FIELDS_AMPS: fmask = plotdata[field] / errdata[sigmatype] > snrcut elif field in ehc.FIELDS_PHASE: fmask = errdata[sigmatype] < (180. / np.pi / snrcut) elif field in ehc.FIELDS_SNRS: fmask = plotdata[field] > snrcut else: fmask = np.ones(mask.shape).astype(bool) fmask = fmask[:, 0] mask *= fmask plotdata = plotdata[mask] if errdata is not None: errdata = errdata[mask] if not rangex: rangex = [self.tstart, self.tstop] if np.any(np.isnan(np.array(rangex))): print("Warning: NaN in data x range: specifying rangex to default") rangex = [0, 24] if not rangey: rangey = [np.min(plotdata[field]) - 0.2 * np.abs(np.min(plotdata[field])), np.max(plotdata[field]) + 0.2 * np.abs(np.max(plotdata[field]))] if np.any(np.isnan(np.array(rangey))): print("Warning: NaN in data y range: specifying rangex to default") rangey = [-100, 100] # Plot the data if axis: x = axis else: fig = plt.figure() x = fig.add_subplot(1, 1, 1) if ebar and obsh.sigtype(field) is not False: x.errorbar(plotdata['time'][:, 0], plotdata[field][:, 0], yerr=errdata[obsh.sigtype(field)][:, 0], fmt=marker, markersize=markersize, color=color, linestyle='none', label=label) else: x.plot(plotdata['time'][:, 0], plotdata[field][:, 0], marker, markersize=markersize, color=color, label=label, linestyle='none') x.set_xlim(rangex) x.set_ylim(rangey) if axislabels: x.set_xlabel(timetype + ' (hr)') try: x.set_ylabel(ehc.FIELD_LABELS[field]) except KeyError: x.set_ylabel(field.capitalize()) x.set_title('%s - %s' % (site1, site2)) if grid: x.grid() if legend: plt.legend() if export_pdf != "" and not axis: fig.savefig(export_pdf, bbox_inches='tight') if export_pdf != "" and axis: fig = plt.gcf() fig.savefig(export_pdf, bbox_inches='tight') if show: #plt.show(block=False) ehc.show_noblock() return x
[docs] def plot_cphase(self, site1, site2, site3, vtype='vis', cphases=[], force_recompute=False, ang_unit='deg', timetype=False, snrcut=0., axis=False, rangex=False, rangey=False, color=ehc.SCOLORS[0], marker='o', markersize=ehc.MARKERSIZE, label=None, grid=True, ebar=True, axislabels=True, legend=False, show=True, export_pdf=""): """Plot a closure phase over time on a triangle site1-site2-site3. Args: site1 (str): station 1 name site2 (str): station 2 name site3 (str): station 3 name vtype (str): The visibilty type from which to assemble closure phases ('vis','qvis','uvis','vvis','pvis') cphases (list): optionally pass in the prcomputed, time-sorted closure phases force_recompute (bool): if True, do not use stored closure phase able snrcut (float): flag closure amplitudes with snr lower than this ang_unit (str): phase unit 'deg' or 'rad' timetype (str): 'GMST' or 'UTC' axis (matplotlib.axes.Axes): add plot to this axis rangex (list): [xmin, xmax] x-axis limits rangey (list): [ymin, ymax] y-axis limits color (str): color for scatterplot points marker (str): matplotlib plot marker markersize (int): size of plot markers label (str): plot legend label grid (bool): Plot gridlines if True ebar (bool): Plot error bars if True axislabels (bool): Show axis labels if True legend (bool): Show legend if True show (bool): Display the plot if True export_pdf (str): path to pdf file to save figure Returns: (matplotlib.axes.Axes): Axes object with data plot """ if timetype is False: timetype = self.timetype if ang_unit == 'deg': angle = 1.0 else: angle = ehc.DEGREE if label is None: label = str(self.source) else: label = str(label) # Get closure phases (maximal set) if (len(cphases) == 0) and (self.cphase is not None) and not force_recompute: cphases = self.cphase cpdata = self.cphase_tri(site1, site2, site3, vtype=vtype, timetype=timetype, cphases=cphases, force_recompute=force_recompute, snrcut=snrcut) plotdata = np.array([[obs['time'], obs['cphase'] * angle, obs['sigmacp']] for obs in cpdata]) nan_mask = np.isnan(plotdata[:, 1]) plotdata = plotdata[~nan_mask] if len(plotdata) == 0: print("%s %s %s : No closure phases on this triangle!" % (site1, site2, site3)) return # Plot the data if axis: x = axis else: fig = plt.figure() x = fig.add_subplot(1, 1, 1) # Data ranges if not rangex: rangex = [self.tstart, self.tstop] if np.any(np.isnan(np.array(rangex))): print("Warning: NaN in data x range: specifying rangex to default") rangex = [0, 24] if not rangey: if ang_unit == 'deg': rangey = [-190, 190] else: rangey = [-1.1 * np.pi, 1.1 * np.pi] x.set_xlim(rangex) x.set_ylim(rangey) if ebar and np.any(plotdata[:, 2]): x.errorbar(plotdata[:, 0], plotdata[:, 1], yerr=plotdata[:, 2], fmt=marker, markersize=markersize, color=color, linestyle='none', label=label) else: x.plot(plotdata[:, 0], plotdata[:, 1], marker, markersize=markersize, color=color, linestyle='none', label=label) if axislabels: x.set_xlabel(self.timetype + ' (h)') if ang_unit == 'deg': x.set_ylabel(r'Closure Phase $(^\circ)$') else: x.set_ylabel(r'Closure Phase (radian)') x.set_title('%s - %s - %s' % (site1, site2, site3)) if grid: x.grid() if legend: plt.legend() if export_pdf != "" and not axis: fig.savefig(export_pdf, bbox_inches='tight') if export_pdf != "" and axis: fig = plt.gcf() fig.savefig(export_pdf, bbox_inches='tight') if show: #plt.show(block=False) ehc.show_noblock() return x
[docs] def plot_camp(self, site1, site2, site3, site4, vtype='vis', ctype='camp', camps=[], force_recompute=False, debias=False, timetype=False, snrcut=0., axis=False, rangex=False, rangey=False, color=ehc.SCOLORS[0], marker='o', markersize=ehc.MARKERSIZE, label=None, grid=True, ebar=True, axislabels=True, legend=False, show=True, export_pdf=""): """Plot closure amplitude over time on a quadrangle (1-2)(3-4)/(1-4)(2-3). Args: site1 (str): station 1 name site2 (str): station 2 name site3 (str): station 3 name site4 (str): station 4 name vtype (str): The visibilty type from which to assemble closure amplitudes ('vis','qvis','uvis','vvis','pvis') ctype (str): The closure amplitude type ('camp' or 'logcamp') camps (list): optionally pass in camps so they don't have to be recomputed force_recompute (bool): if True, recompute camps instead of using stored data snrcut (float): flag closure amplitudes with snr lower than this debias (bool): If True, debias the closure amplitude timetype (str): 'GMST' or 'UTC' axis (matplotlib.axes.Axes): amake_cdd plot to this axis rangex (list): [xmin, xmax] x-axis limits rangey (list): [ymin, ymax] y-axis limits color (str): color for scatterplot points marker (str): matplotlib plot marker markersize (int): size of plot markers label (str): plot legend label grid (bool): Plot gridlines if True ebar (bool): Plot error bars if True axislabels (bool): Show axis labels if True legend (bool): Show legend if True show (bool): Display the plot if True export_pdf (str): path to pdf file to save figure Returns: (matplotlib.axes.Axes): Axes object with data plot """ if timetype is False: timetype = self.timetype if label is None: label = str(self.source) else: label = str(label) # Get closure amplitudes (maximal set) if ((ctype == 'camp') and (len(camps) == 0) and (self.camp is not None) and not (len(self.camp) == 0) and not force_recompute): camps = self.camp elif ((ctype == 'logcamp') and (len(camps) == 0) and (self.logcamp is not None) and not (len(self.logcamp) == 0) and not force_recompute): camps = self.logcamp # Get closure amplitudes (maximal set) cpdata = self.camp_quad(site1, site2, site3, site4, vtype=vtype, ctype=ctype, snrcut=snrcut, debias=debias, timetype=timetype, camps=camps, force_recompute=force_recompute) if len(cpdata) == 0: print('No closure amplitudes on this triangle!') return plotdata = np.array([[obs['time'], obs['camp'], obs['sigmaca']] for obs in cpdata]) plotdata = np.array(plotdata) nan_mask = np.isnan(plotdata[:, 1]) plotdata = plotdata[~nan_mask] if len(plotdata) == 0: print("No closure amplitudes on this quadrangle!") return # Data ranges if not rangex: rangex = [self.tstart, self.tstop] if np.any(np.isnan(np.array(rangex))): print("Warning: NaN in data x range: specifying rangex to default") rangex = [0, 24] if not rangey: rangey = [np.min(plotdata[:, 1]) - 0.2 * np.abs(np.min(plotdata[:, 1])), np.max(plotdata[:, 1]) + 0.2 * np.abs(np.max(plotdata[:, 1]))] if np.any(np.isnan(np.array(rangey))): print("Warning: NaN in data y range: specifying rangey to default") if ctype == 'camp': rangey = [0, 100] if ctype == 'logcamp': rangey = [-10, 10] # Plot the data if axis: x = axis else: fig = plt.figure() x = fig.add_subplot(1, 1, 1) if ebar and np.any(plotdata[:, 2]): x.errorbar(plotdata[:, 0], plotdata[:, 1], yerr=plotdata[:, 2], fmt=marker, markersize=markersize, color=color, linestyle='none', label=label) else: x.plot(plotdata[:, 0], plotdata[:, 1], marker, markersize=markersize, color=color, linestyle='none', label=label) x.set_xlim(rangex) x.set_ylim(rangey) if axislabels: x.set_xlabel(self.timetype + ' (h)') if ctype == 'camp': x.set_ylabel('Closure Amplitude') elif ctype == 'logcamp': x.set_ylabel('Log Closure Amplitude') x.set_title('(%s - %s)(%s - %s)/(%s - %s)(%s - %s)' % (site1, site2, site3, site4, site1, site4, site2, site3)) if grid: x.grid() if legend: plt.legend() if export_pdf != "" and not axis: fig.savefig(export_pdf, bbox_inches='tight') if export_pdf != "" and axis: fig = plt.gcf() fig.savefig(export_pdf, bbox_inches='tight') if show: #plt.show(block=False) ehc.show_noblock() return else: return x
[docs] def save_txt(self, fname): """Save visibility data to a text file. Args: fname (str): path to output text file """ ehtim.io.save.save_obs_txt(self, fname) return
[docs] def save_uvfits(self, fname, force_singlepol=False, polrep_out='circ'): """Save visibility data to uvfits file. Args: fname (str): path to output uvfits file. force_singlepol (str): if 'R' or 'L', will interpret stokes I field as 'RR' or 'LL' polrep_out (str): 'circ' or 'stokes': how data should be stored in the uvfits file """ if (force_singlepol is not False) and (self.polrep != 'stokes'): raise Exception( "force_singlepol is incompatible with polrep!='stokes'") output = ehtim.io.save.save_obs_uvfits(self, fname, force_singlepol=force_singlepol, polrep_out=polrep_out) return
[docs] def make_hdulist(self, force_singlepol=False, polrep_out='circ'): """Returns an hdulist in the same format as in a saved .uvfits file. Args: force_singlepol (str): if 'R' or 'L', will interpret stokes I field as 'RR' or 'LL' polrep_out (str): 'circ' or 'stokes': how data should be stored in the uvfits file Returns: hdulist (astropy.io.fits.HDUList) """ if (force_singlepol is not False) and (self.polrep != 'stokes'): raise Exception( "force_singlepol is incompatible with polrep!='stokes'") hdulist = ehtim.io.save.save_obs_uvfits(self, None, force_singlepol=force_singlepol, polrep_out=polrep_out) return hdulist
[docs] def save_oifits(self, fname, flux=1.0): """ Save visibility data to oifits. Polarization data is NOT saved. Args: fname (str): path to output text file flux (float): normalization total flux """ if self.polrep != 'stokes': raise Exception("save_oifits not yet implemented for polreps other than 'stokes'") # Antenna diameters are currently incorrect # the exact times are also not correct in the datetime object ehtim.io.save.save_obs_oifits(self, fname, flux=flux) return
################################################################################################## # Observation creation functions ##################################################################################################
[docs]def merge_obs(obs_List, force_merge=False): """Merge a list of observations into a single observation file. Args: obs_List (list): list of split observation Obsdata objects. force_merge (bool): forces the observations to merge even if parameters are different Returns: mergeobs (Obsdata): merged Obsdata object containing all scans in input list """ if (len(set([obs.polrep for obs in obs_List])) > 1): raise Exception("All observations must have the same polarization representation !") return if np.any([obs.timetype == 'GMST' for obs in obs_List]): raise Exception("merge_obs only works for observations with obs.timetype='UTC'!") return if not force_merge: if (len(set([obs.ra for obs in obs_List])) > 1 or len(set([obs.dec for obs in obs_List])) > 1 or len(set([obs.rf for obs in obs_List])) > 1 or len(set([obs.bw for obs in obs_List])) > 1 or len(set([obs.source for obs in obs_List])) > 1): # or len(set([np.floor(obs.mjd) for obs in obs_List])) > 1): raise Exception("All observations must have the same parameters!") return # the reference observation is the one with the minimum mjd obs_idx = np.argmin([obs.mjd for obs in obs_List]) obs_ref = obs_List[obs_idx] # re-reference times to new mjd # must be in UTC! mjd_ref = obs_ref.mjd for obs in obs_List: mjd_offset = obs.mjd - mjd_ref obs.data['time'] += mjd_offset * 24 if not(obs.scans is None or len(obs.scans)==0): obs.scans += mjd_offset * 24 # merge the data data_merge = np.hstack([obs.data for obs in obs_List]) # merge the scan list scan_merge = None for obs in obs_List: if (obs.scans is None or len(obs.scans)==0): continue if (scan_merge is None or len(scan_merge)==0): scan_merge = [obs.scans] else: scan_merge.append(obs.scans) if not (scan_merge is None or len(scan_merge) == 0): scan_merge = np.vstack(scan_merge) _idxsort = np.argsort(scan_merge[:, 0]) scan_merge = scan_merge[_idxsort] # merge the list of telescopes tarr_merge = np.unique(np.concatenate([obs.tarr for obs in obs_List])) arglist, argdict = obs_ref.obsdata_args() arglist[DATPOS] = data_merge arglist[TARRPOS] = tarr_merge argdict['scantable'] = scan_merge mergeobs = Obsdata(*arglist, **argdict) return mergeobs
[docs]def load_txt(fname, polrep='stokes'): """Read an observation from a text file. Args: fname (str): path to input text file polrep (str): load data as either 'stokes' or 'circ' Returns: obs (Obsdata): Obsdata object loaded from file """ return ehtim.io.load.load_obs_txt(fname, polrep=polrep)
[docs]def load_uvfits(fname, flipbl=False, remove_nan=False, force_singlepol=None, channel=all, IF=all, polrep='stokes', allow_singlepol=True, ignore_pzero_date=True, trial_speedups=False): """Load observation data from a uvfits file. Args: fname (str or HDUList): path to input text file or HDUList object flipbl (bool): flip baseline phases if True. remove_nan (bool): True to remove nans from missing polarizations polrep (str): load data as either 'stokes' or 'circ' force_singlepol (str): 'R' or 'L' to load only 1 polarization channel (list): list of channels to average in the import. channel=all averages all IF (list): list of IFs to average in the import. IF=all averages all IFS remove_nan (bool): whether or not to remove entries with nan data ignore_pzero_date (bool): if True, ignore the offset parameters in DATE field TODO: what is the correct behavior per AIPS memo 117? Returns: obs (Obsdata): Obsdata object loaded from file """ return ehtim.io.load.load_obs_uvfits(fname, flipbl=flipbl, force_singlepol=force_singlepol, channel=channel, IF=IF, polrep=polrep, remove_nan=remove_nan, allow_singlepol=allow_singlepol, ignore_pzero_date=ignore_pzero_date, trial_speedups=trial_speedups)
[docs]def load_oifits(fname, flux=1.0): """Load data from an oifits file. Does NOT currently support polarization. Args: fname (str): path to input text file flux (float): normalization total flux Returns: obs (Obsdata): Obsdata object loaded from file """ return ehtim.io.load.load_obs_oifits(fname, flux=flux)
[docs]def load_maps(arrfile, obsspec, ifile, qfile=0, ufile=0, vfile=0, src=ehc.SOURCE_DEFAULT, mjd=ehc.MJD_DEFAULT, ampcal=False, phasecal=False): """Read an observation from a maps text file and return an Obsdata object. Args: arrfile (str): path to input array file obsspec (str): path to input obs spec file ifile (str): path to input Stokes I data file qfile (str): path to input Stokes Q data file ufile (str): path to input Stokes U data file vfile (str): path to input Stokes V data file src (str): source name mjd (int): integer observation MJD ampcal (bool): True if amplitude calibrated phasecal (bool): True if phase calibrated Returns: obs (Obsdata): Obsdata object loaded from file """ return ehtim.io.load.load_obs_maps(arrfile, obsspec, ifile, qfile=qfile, ufile=ufile, vfile=vfile, src=src, mjd=mjd, ampcal=ampcal, phasecal=phasecal)
[docs]def load_obs( fname, polrep='stokes', flipbl=False, remove_nan=False, force_singlepol=None, channel=all, IF=all, allow_singlepol=True, flux=1.0, obsspec=None, ifile=None, qfile=None, ufile=None, vfile=None, src=ehc.SOURCE_DEFAULT, mjd=ehc.MJD_DEFAULT, ampcal=False, phasecal=False ): """Smart obs read-in, detects file type and loads appropriately. Args: fname (str): path to input text file polrep (str): load data as either 'stokes' or 'circ' flipbl (bool): flip baseline phases if True. remove_nan (bool): True to remove nans from missing polarizations polrep (str): load data as either 'stokes' or 'circ' force_singlepol (str): 'R' or 'L' to load only 1 polarization channel (list): list of channels to average in the import. channel=all averages all IF (list): list of IFs to average in the import. IF=all averages all IFS flux (float): normalization total flux obsspec (str): path to input obs spec file ifile (str): path to input Stokes I data file qfile (str): path to input Stokes Q data file ufile (str): path to input Stokes U data file vfile (str): path to input Stokes V data file src (str): source name mjd (int): integer observation MJD ampcal (bool): True if amplitude calibrated phasecal (bool): True if phase calibrated Returns: obs (Obsdata): Obsdata object loaded from file """ ## grab file ending ## fname_extension = fname.split('.')[-1] print(f"Extension is {fname_extension}.") ## check extension ## if fname_extension.lower() == 'uvfits': return load_uvfits(fname, flipbl=flipbl, remove_nan=remove_nan, force_singlepol=force_singlepol, channel=channel, IF=IF, polrep=polrep, allow_singlepol=allow_singlepol) elif fname_extension.lower() in ['txt', 'text']: return load_txt(fname, polrep=polrep) elif fname_extension.lower() == 'oifits': return load_oifits(fname, flux=flux) else: if obsspec is not None and ifile is None: print("You have provided a value for <obsspec> but no value for <ifile>") return elif obsspec is None and ifile is not None: print("You have provided a value for <ifile> but no value for <obsspec>") return elif obsspec is not None and ifile is not None: return load_maps(fname, obsspec, ifile, qfile=qfile, ufile=ufile, vfile=vfile, src=src, mjd=mjd, ampcal=ampcal, phasecal=phasecal)