Source code for ehtim.movie

# movie.py
# a interferometric movie class
#
#    Copyright (C) 2018 Andrew Chael
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import division
from __future__ import print_function

from builtins import str
from builtins import range
from builtins import object

import string
import numpy as np
import scipy.interpolate
import scipy.ndimage.filters as filt

import ehtim.image
import ehtim.obsdata
import ehtim.observing.obs_simulate as simobs
import ehtim.io.save
import ehtim.io.load
import ehtim.const_def as ehc
import ehtim.observing.obs_helpers as obsh

INTERPOLATION_KINDS = ['linear', 'nearest', 'zero', 'slinear',
                       'quadratic', 'cubic', 'previous', 'next']

###################################################################################################
# Movie object
###################################################################################################


[docs]class Movie(object): """A polarimetric movie (in units of Jy/pixel). Attributes: pulse (function): The function convolved with the pixel values for continuous image psize (float): The pixel dimension in radians xdim (int): The number of pixels along the x dimension ydim (int): The number of pixels along the y dimension mjd (int): The integer MJD of the image source (str): The astrophysical source name ra (float): The source Right Ascension in fractional hours dec (float): The source declination in fractional degrees rf (float): The image frequency in Hz polrep (str): polarization representation, either 'stokes' or 'circ' pol_prim (str): The default image: I,Q,U or V for Stokes, or RR,LL,LR,RL for Circular interp (str): Interpolation method, for scipy.interpolate.interp1d 'kind' keyword (e.g. 'linear', 'nearest', 'quadratic', 'cubic', 'previous', 'next'...) bounds_error (bool): if False, return nearest frame when outside [start_hr, stop_hr] times (list): The list of frame time stamps in hours _movdict (dict): The dictionary with the lists of frames """ def __init__(self, frames, times, psize, ra, dec, rf=ehc.RF_DEFAULT, polrep='stokes', pol_prim=None, pulse=ehc.PULSE_DEFAULT, source=ehc.SOURCE_DEFAULT, mjd=ehc.MJD_DEFAULT, bounds_error=ehc.BOUNDS_ERROR, interp=ehc.INTERP_DEFAULT): """A polarimetric image (in units of Jy/pixel). Args: frames (list): The list of 2D frames; each is a Jy/pixel array times (list): The list of frame time stamps in hours psize (float): The pixel dimension in radians ra (float): The source Right Ascension in fractional hours dec (float): The source declination in fractional degrees rf (float): The image frequency in Hz polrep (str): polarization representation, either 'stokes' or 'circ' pol_prim (str): The default image: I,Q,U or V for Stokes, RR,LL,LR,RL for Circular pulse (function): The function convolved with the pixel values for continuous image. source (str): The source name mjd (int): The integer MJD of the image interp (str): Interpolation method, for scipy.interpolate.interp1d 'kind' keyword (e.g. 'linear', 'nearest', 'quadratic', 'cubic', 'previous', 'next'...) bounds_error (bool): if False, return nearest frame when outside [start_hr, stop_hr] Returns: (Image): the Image object """ if len(frames[0].shape) != 2: raise Exception("frames must each be a 2D numpy array") if len(frames) != len(times): raise Exception("len(frames) != len(times) !") if not (interp in INTERPOLATION_KINDS): raise Exception( "'interp' must be a valid argument for scipy.interpolate.interp1d: " + string.join(INTERPOLATION_KINDS)) self.times = times start_hr = np.min(self.times) self.mjd = int(mjd) if start_hr > 24: self.mjd += int((start_hr - start_hr % 24)/24) self.start_hr = float(start_hr % 24) else: self.start_hr = start_hr self.stop_hr = np.max(self.times) self.duration = self.stop_hr - self.start_hr # frame shape parameters self.nframes = len(frames) self.polrep = polrep self.pulse = pulse self.psize = float(psize) self.xdim = frames[0].shape[1] self.ydim = frames[0].shape[0] # the list of frames frames = np.array([image.flatten() for image in frames]) self.interp = interp self.bounds_error = bounds_error fill_value = (frames[0], frames[-1]) fun = scipy.interpolate.interp1d(self.times, frames.T, kind=interp, bounds_error=bounds_error, fill_value=fill_value) if polrep == 'stokes': if pol_prim is None: pol_prim = 'I' if pol_prim == 'I': self._movdict = {'I': frames, 'Q': [], 'U': [], 'V': []} self._fundict = {'I': fun, 'Q': None, 'U': None, 'V': None} elif pol_prim == 'V': self._movdict = {'I': [], 'Q': [], 'U': [], 'V': frames} self._fundict = {'I': None, 'Q': None, 'U': None, 'V': fun} elif pol_prim == 'Q': self._movdict = {'I': [], 'Q': frames, 'U': [], 'V': []} self._fundict = {'I': None, 'Q': fun, 'U': None, 'V': None} elif pol_prim == 'U': self._movdict = {'I': [], 'Q': [], 'U': frames, 'V': []} self._fundict = {'I': None, 'Q': None, 'U': frames, 'V': None} else: raise Exception("for polrep=='stokes', pol_prim must be 'I','Q','U', or 'V'!") elif polrep == 'circ': if pol_prim is None: print("polrep is 'circ' and no pol_prim specified! Setting pol_prim='RR'") pol_prim = 'RR' if pol_prim == 'RR': self._movdict = {'RR': frames, 'LL': [], 'RL': [], 'LR': []} self._fundict = {'RR': fun, 'LL': None, 'RL': None, 'LR': None} elif pol_prim == 'LL': self._movdict = {'RR': [], 'LL': frames, 'RL': [], 'LR': []} self._fundict = {'RR': None, 'LL': fun, 'RL': None, 'LR': None} else: raise Exception("for polrep=='circ', pol_prim must be 'RR' or 'LL'!") self.pol_prim = pol_prim self.ra = float(ra) self.dec = float(dec) self.rf = float(rf) self.source = str(source) self.pa = 0.0 # TODO: The pa needs to be properly implemented in the movie object # TODO: What is this doing?? @property def frames(self): frames = self._movdict[self.pol_prim] return frames @frames.setter def frames(self, frames): if len(frames[0]) != self.xdim*self.ydim: raise Exception("imvec size is not consistent with xdim*ydim!") # TODO -- more checks on consistency with the existing pol data??? frames = np.array(frames) self._movdict[self.pol_prim] = frames fill_value = (frames[0], frames[-1]) fun = scipy.interpolate.interp1d(self.times, frames.T, kind=self.interp, bounds_error=self.bounds_error, fill_value=fill_value) self._fundict[self.pol_prim] = fun @property def iframes(self): if self.polrep != 'stokes': raise Exception( "iframes is not defined unless self.polrep=='stokes' -- try self.switch_polrep()") frames = self._movdict['I'] return frames @iframes.setter def iframes(self, frames): if len(frames[0]) != self.xdim*self.ydim: raise Exception("vec size is not consistent with xdim*ydim!") # TODO -- more checks on the consistency of the imvec with the existing pol data??? frames = np.array(frames) self._movdict['I'] = frames fill_value = (frames[0], frames[-1]) fun = scipy.interpolate.interp1d(self.times, frames.T, kind=self.interp, bounds_error=self.bounds_error, fill_value=fill_value) self._fundict['I'] = fun @property def qframes(self): if self.polrep != 'stokes': raise Exception( "qframes is not defined unless self.polrep=='stokes' -- try self.switch_polrep()") frames = self._movdict['Q'] return frames @qframes.setter def qframes(self, frames): if len(frames[0]) != self.xdim*self.ydim: raise Exception("vec size is not consistent with xdim*ydim!") # TODO -- more checks on the consistency of the imvec with the existing pol data??? frames = np.array(frames) self._movdict['Q'] = frames fill_value = (frames[0], frames[-1]) fun = scipy.interpolate.interp1d(self.times, frames.T, kind=self.interp, bounds_error=self.bounds_error, fill_value=fill_value) self._fundict['Q'] = fun @property def uframes(self): if self.polrep != 'stokes': raise Exception( "uframes is not defined unless self.polrep=='stokes' -- try self.switch_polrep()") frames = self._movdict['U'] return frames @uframes.setter def uframes(self, frames): if len(frames[0]) != self.xdim*self.ydim: raise Exception("vec size is not consistent with xdim*ydim!") # TODO -- more checks on the consistency of the imvec with the existing pol data??? frames = np.array(frames) self._movdict['U'] = frames fill_value = (frames[0], frames[-1]) fun = scipy.interpolate.interp1d(self.times, frames.T, kind=self.interp, bounds_error=self.bounds_error, fill_value=fill_value) self._fundict['U'] = fun @property def vframes(self): if self.polrep != 'stokes': raise Exception( "vframes is not defined unless self.polrep=='stokes' -- try self.switch_polrep()") frames = self._movdict['V'] return frames @vframes.setter def vframes(self, frames): if len(frames[0]) != self.xdim*self.ydim: raise Exception("vec size is not consistent with xdim*ydim!") # TODO -- more checks on the consistency of the imvec with the existing pol data??? frames = np.array(frames) self._movdict['V'] = frames fill_value = (frames[0], frames[-1]) fun = scipy.interpolate.interp1d(self.times, frames.T, kind=self.interp, bounds_error=self.bounds_error, fill_value=fill_value) self._fundict['V'] = fun @property def rrframes(self): if self.polrep != 'circ': raise Exception( "rrframes is not defined unless self.polrep=='circ' -- try self.switch_polrep()") frames = self._movdict['RR'] return frames @rrframes.setter def rrframes(self, frames): if len(frames[0]) != self.xdim*self.ydim: raise Exception("vec size is not consistent with xdim*ydim!") # TODO -- more checks on the consistency of the imvec with the existing pol data??? frames = np.array(frames) self._movdict['RR'] = frames fill_value = (frames[0], frames[-1]) fun = scipy.interpolate.interp1d(self.times, frames.T, kind=self.interp, bounds_error=self.bounds_error, fill_value=fill_value) self._fundict['RR'] = fun @property def llframes(self): if self.polrep != 'circ': raise Exception( "llframes is not defined unless self.polrep=='circ' -- try self.switch_polrep()") frames = self._movdict['LL'] return frames @llframes.setter def llframes(self, frames): if len(frames[0]) != self.xdim*self.ydim: raise Exception("vec size is not consistent with xdim*ydim!") # TODO -- more checks on the consistency of the imvec with the existing pol data??? frames = np.array(frames) self._movdict['LL'] = frames fill_value = (frames[0], frames[-1]) fun = scipy.interpolate.interp1d(self.times, frames.T, kind=self.interp, bounds_error=self.bounds_error, fill_value=fill_value) self._fundict['LL'] = fun @property def rlvec(self): if self.polrep != 'circ': raise Exception( "rlframes is not defined unless self.polrep=='circ' -- try self.switch_polrep()") frames = self._movdict['RL'] return frames @rlvec.setter def rlvec(self, frames): if len(frames[0]) != self.xdim*self.ydim: raise Exception("vec size is not consistent with xdim*ydim!") # TODO -- more checks on the consistency of the imvec with the existing pol data??? frames = np.array(frames) self._movdict['RL'] = frames fill_value = (frames[0], frames[-1]) fun = scipy.interpolate.interp1d(self.times, frames.T, kind=self.interp, bounds_error=self.bounds_error, fill_value=fill_value) self._fundict['RL'] = fun @property def lrvec(self): if self.polrep != 'circ': raise Exception( "lrframes is not defined unless self.polrep=='circ' -- try self.switch_polrep()") frames = self._movdict['LR'] return frames @lrvec.setter def lrvec(self, frames): if len(frames[0]) != self.xdim*self.ydim: raise Exception("vec size is not consistent with xdim*ydim!") # TODO -- more checks on the consistency of the imvec with the existing pol data??? frames = np.array(frames) self._movdict['LR'] = frames fill_value = (frames[0], frames[-1]) fun = scipy.interpolate.interp1d(self.times, frames.T, kind=self.interp, bounds_error=self.bounds_error, fill_value=fill_value) self._fundict['LR'] = fun
[docs] def movie_args(self): """"Copy arguments for making a new Movie into a list and dictonary """ frames2D = self.frames.reshape((self.nframes, self.ydim, self.xdim)) arglist = [frames2D.copy(), self.times.copy(), self.psize, self.ra, self.dec] #arglist = [frames2D, self.times, self.psize, self.ra, self.dec] argdict = {'rf': self.rf, 'polrep': self.polrep, 'pol_prim': self.pol_prim, 'pulse': self.pulse, 'source': self.source, 'mjd': self.mjd, 'interp': self.interp, 'bounds_error': self.bounds_error} return (arglist, argdict)
[docs] def copy(self): """Return a copy of the Movie object. Args: Returns: (Movie): copy of the Image. """ arglist, argdict = self.movie_args() # Make new movie with primary polarization newmov = Movie(*arglist, **argdict) # Copy over all polarization movies for pol in list(self._movdict.keys()): if pol == self.pol_prim: continue polframes = self._movdict[pol] if len(polframes): polframes = polframes.reshape((self.nframes, self.ydim, self.xdim)) newmov.add_pol_movie(polframes, pol) return newmov
[docs] def reset_interp(self, interp=None, bounds_error=None): """Reset the movie interpolation function to change the interp. type or change the frames Args: interp (str): Interpolation method, input to scipy.interpolate.interp1d kind keyword bounds_error (bool): if False, return nearest frame outside [start_hr, stop_hr] """ if interp is None: interp = self.interp if bounds_error is None: bounds_error = self.bounds_error # Copy over all polarization movies for pol in list(self._movdict.keys()): polframes = self._movdict[pol] if len(polframes): fill_value = (polframes[0], polframes[-1]) fun = scipy.interpolate.interp1d(self.times, polframes.T, kind=interp, fill_value=fill_value, bounds_error=bounds_error) self._fundict[pol] = fun else: self._fundict[pol] = None self.interp = interp self.bounds_error = bounds_error return
[docs] def offset_time(self, t_offset): """Offset the movie in time by t_offset Args: t_offset (float): offset time in hours Returns: """ mov = self.copy() mov.start_hr += t_offset mov.stop_hr += t_offset mov.times += t_offset mov.reset_interp(interp=mov.interp, bounds_error=mov.bounds_error) return mov
[docs] def add_pol_movie(self, movie, pol): """Add another movie polarization. Args: movie (list): list of 2D frames (possibly complex) in a Jy/pixel array pol (str): The image type: 'I','Q','U','V' for stokes, 'RR','LL','RL','LR' for circ """ if not(len(movie) == self.nframes): raise Exception("new pol movies must have same length as primary movie!") if pol == self.pol_prim: raise Exception("new pol in add_pol_movie is the same as pol_prim!") if np.any(np.array([image.shape != (self.ydim, self.xdim) for image in movie])): raise Exception("add_pol_movie image shapes incompatible with primary image!") if not (pol in list(self._movdict.keys())): raise Exception("for polrep==%s, pol in add_pol_movie must be in " % self.polrep + ",".join(list(self._movdict.keys()))) if self.polrep == 'stokes': if pol == 'I': self.iframes = [image.flatten() for image in movie] elif pol == 'Q': self.qframes = [image.flatten() for image in movie] elif pol == 'U': self.uframes = [image.flatten() for image in movie] elif pol == 'V': self.vframes = [image.flatten() for image in movie] if len(self.iframes) > 0: fill_value = (self.iframes[0], self.iframes[-1]) ifun = scipy.interpolate.interp1d(self.times, self.iframes.T, kind=self.interp, fill_value=fill_value, bounds_error=self.bounds_error) else: ifun = None if len(self.vframes) > 0: fill_value = (self.vframes[0], self.vframes[-1]) vfun = scipy.interpolate.interp1d(self.times, self.vframes.T, kind=self.interp, fill_value=fill_value, bounds_error=self.bounds_error) else: vfun = None if len(self.qframes) > 0: fill_value = (self.qframes[0], self.qframes[-1]) qfun = scipy.interpolate.interp1d(self.times, self.qframes.T, kind=self.interp, fill_value=fill_value, bounds_error=self.bounds_error) else: qfun = None if len(self.uframes) > 0: fill_value = (self.uframes[0], self.uframes[-1]) ufun = scipy.interpolate.interp1d(self.times, self.uframes.T, kind=self.interp, fill_value=fill_value, bounds_error=self.bounds_error) else: ufun = None self._movdict = {'I': self.iframes, 'Q': self.qframes, 'U': self.uframes, 'V': self.vframes} self._fundict = {'I': ifun, 'Q': qfun, 'U': ufun, 'V': vfun} elif self.polrep == 'circ': if pol == 'RR': self.rrframes = [image.flatten() for image in movie] elif pol == 'LL': self.llframes = [image.flatten() for image in movie] elif pol == 'RL': self.rlframes = [image.flatten() for image in movie] elif pol == 'LR': self.lrframes = [image.flatten() for image in movie] if len(self.rrframes) > 0: fill_value = (self.rrframes[0], self.rrframes[-1]) rrfun = scipy.interpolate.interp1d(self.times, self.rrframes.T, kind=self.interp, fill_value=fill_value, bounds_error=self.bounds_error) else: rrfun = None if len(self.llframes) > 0: fill_value = (self.llframes[0], self.llframes[-1]) llfun = scipy.interpolate.interp1d(self.times, self.llframes.T, kind=self.interp, fill_value=fill_value, bounds_error=self.bounds_error) else: llfun = None if len(self.rlframes) > 0: fill_value = (self.rlframes[0], self.rlframes[-1]) rlfun = scipy.interpolate.interp1d(self.times, self.rlframes.T, kind=self.interp, fill_value=fill_value, bounds_error=self.bounds_error) else: rlfun = None if len(self.lrframes) > 0: fill_value = (self.lrframes[0], self.lrframes[-1]) lrfun = scipy.interpolate.interp1d(self.times, self.lrframes.T, kind=self.interp, fill_value=fill_value, bounds_error=self.bounds_error) else: lrfun = None self._movdict = {'RR': self.rrframes, 'LL': self.llframes, 'RL': self.rlframes, 'LR': self.lrframes} self._fundict = {'RR': rrfun, 'LL': llfun, 'RL': rlfun, 'LR': lrfun} return
# TODO deprecated -- replace with generic add_pol_movie
[docs] def add_qu(self, qmovie, umovie): """Add Stokes Q and U movies. self.polrep must be 'stokes' Args: qmovie (list): list of 2D Stokes Q frames in Jy/pixel array umovie (list): list of 2D Stokes U frames in Jy/pixel array Returns: """ if self.polrep != 'stokes': raise Exception("polrep must be 'stokes' for add_qu() !") self.add_pol_movie(qmovie, 'Q') self.add_pol_movie(umovie, 'U') return
# TODO deprecated -- replace with generic add_pol_movie
[docs] def add_v(self, vmovie): """Add Stokes V movie. self.polrep must be 'stokes' Args: vmovie (list): list of 2D Stokes Q frames in Jy/pixel array Returns: """ if self.polrep != 'stokes': raise Exception("polrep must be 'stokes' for add_v() !") self.add_pol_movie(vmovie, 'V') return
[docs] def switch_polrep(self, polrep_out='stokes', pol_prim_out=None): """Return a new movie with the polarization representation changed Args: polrep_out (str): the polrep of the output data pol_prim_out (str): The default movie: I,Q,U or V for Stokes, RR,LL,LR,RL for Circular Returns: (Movie): new movie object with potentially different polrep """ if polrep_out not in ['stokes', 'circ']: raise Exception("polrep_out must be either 'stokes' or 'circ'") if pol_prim_out is None: if polrep_out == 'stokes': pol_prim_out = 'I' elif polrep_out == 'circ': pol_prim_out = 'RR' # Simply copy if the polrep is unchanged if polrep_out == self.polrep and pol_prim_out == self.pol_prim: return self.copy() # Assemble a dictionary of new polarization vectors framedim = (self.nframes, self.ydim, self.xdim) if polrep_out == 'stokes': if self.polrep == 'stokes': movdict = {'I': self.iframes, 'Q': self.qframes, 'U': self.uframes, 'V': self.vframes} else: if len(self.rrframes) == 0 or len(self.llframes) == 0: iframes = [] vframes = [] else: iframes = 0.5*(self.rrframes.reshape(framedim) + self.llframes.reshape(framedim)) vframes = 0.5*(self.rrframes.reshape(framedim) - self.llframes.reshape(framedim)) if len(self.rlframes) == 0 or len(self.lrframes) == 0: qframes = [] uframes = [] else: qframes = np.real(0.5*(self.lrframes.reshape(framedim) + self.rlframes.reshape(framedim))) uframes = np.real(0.5j*(self.lrframes.reshape(framedim) - self.rlframes.reshape(framedim))) movdict = {'I': iframes, 'Q': qframes, 'U': uframes, 'V': vframes} elif polrep_out == 'circ': if self.polrep == 'circ': movdict = {'RR': self.rrframes, 'LL': self.llframes, 'RL': self.rlframes, 'LR': self.lrframes} else: if len(self.iframes) == 0 or len(self.vframes) == 0: rrframes = [] llframes = [] else: rrframes = (self.iframes.reshape(framedim) + self.vframes.reshape(framedim)) llframes = (self.iframes.reshape(framedim) - self.vframes.reshape(framedim)) if len(self.qframes) == 0 or len(self.uframes) == 0: rlframes = [] lrframes = [] else: rlframes = (self.qframes.reshape(framedim) + 1j*self.uframes.reshape(framedim)) lrframes = (self.qframes.reshape(framedim) - 1j*self.uframes.reshape(framedim)) movdict = {'RR': rrframes, 'LL': llframes, 'RL': rlframes, 'LR': lrframes} # Assemble the new movie frames = movdict[pol_prim_out] if len(frames) == 0: raise Exception("switch_polrep to " + "%s with pol_prim_out=%s, \n" % (polrep_out, pol_prim_out) + "output movie is not defined") # Make new movie with primary polarization arglist, argdict = self.movie_args() arglist[0] = frames argdict['polrep'] = polrep_out argdict['pol_prim'] = pol_prim_out newmov = Movie(*arglist, **argdict) # Add in any other polarizations for pol in list(movdict.keys()): if pol == pol_prim_out: continue polframes = movdict[pol] if len(polframes): polframes = polframes.reshape((self.nframes, self.ydim, self.xdim)) newmov.add_pol_movie(polframes, pol) return newmov
[docs] def orth_chi(self): """Rotate the EVPA 90 degrees Args: Returns: (Image): movie with rotated EVPA """ mov = self.copy() if mov.polrep == 'stokes': mov.qframes *= -1 mov.uframes *= -1 elif mov.polrep == 'circ': mov.lrframes *= -1 mov.rlframes *= -1 return mov
[docs] def fovx(self): """Return the movie fov in x direction in radians. Args: Returns: (float) : movie fov in x direction (radian) """ return self.psize * self.xdim
[docs] def fovy(self): """Returns the movie fov in y direction in radians. Args: Returns: (float) : movie fov in y direction (radian) """ return self.psize * self.ydim
@property def lightcurve(self): """Return the total flux over time of the image in Jy. Args: Returns: (numpy.Array) : image total flux (Jy) over time """ if self.polrep == 'stokes': flux = [np.sum(ivec) for ivec in self.iframes] elif self.polrep == 'circ': flux = [0.5*(np.sum(self.rrframes[i])+np.sum(self.llframes[i])) for i in range(self.nframes)] return np.array(flux)
[docs] def lin_polfrac_curve(self): """Return the total fractional linear polarized flux over time Args: Returns: (numpy.ndarray) : image fractional linear polarized flux per frame """ if self.polrep == 'stokes': frac = [np.abs(np.sum(self.qframes[i] + 1j*self.uframes[i])) / np.abs(np.sum(self.iframes[i])) for i in range(self.nframes)] elif self.polrep == 'circ': frac = [2*np.abs(np.sum(self.rlframes[i])) / np.abs(np.sum(self.rrframes[i]+self.llframes[i])) for i in range(self.nframes)] return np.array(frac)
[docs] def circ_polfrac_curve(self): """Return the (signed) total fractional circular polarized flux over time Args: Returns: (numpy.ndarray) : image fractional circular polarized flux per frame """ if self.polrep == 'stokes': frac = [np.sum(self.vframes[i]) / np.abs(np.sum(self.iframes[i])) for i in range(self.nframes)] elif self.polrep == 'circ': frac = [np.sum(self.rrframes[i]-self.llframes[i]) / np.abs(np.sum(self.rrframes[i] + self.llframes[i])) for i in range(self.nframes)] return np.array(frac)
[docs] def get_image(self, time): """Return an Image at time Args: time (float): the time in hours Returns: (Image): the Image object at the given time """ if (time < self.start_hr): if not(self.bounds_error): pass # print ("time %f before movie start time %f" % (time, self.start_hr)) # print ("returning constant frame 0! \n") else: raise Exception("time %f must be in the range %f - %f" % (time, self.start_hr, self.stop_hr)) if (time > self.stop_hr): if not(self.bounds_error): pass # print ("time %f after movie stop time %f" % (time, self.stop_hr)) # print ("returning constant frame -1! \n") else: raise Exception("time %f must be in the range %f - %f" % (time, self.start_hr, self.stop_hr)) # interpolate the imvec to the given time imvec = self._fundict[self.pol_prim](time) # Make the primary image imarr = imvec.reshape(self.ydim, self.xdim) outim = ehtim.image.Image(imarr, self.psize, self.ra, self.dec, self.pa, polrep=self.polrep, pol_prim=self.pol_prim, time=time, rf=self.rf, source=self.source, mjd=self.mjd, pulse=self.pulse) # Copy over the rest of the polarizations for pol in list(self._movdict.keys()): if pol == self.pol_prim: continue polframes = self._movdict[pol] if len(polframes): polvec = self._fundict[pol](time) polarr = polvec.reshape(self.ydim, self.xdim).copy() outim.add_pol_image(polarr, pol) return outim
[docs] def get_frame(self, n): """Return an Image of the nth frame Args: n (int): the frame number Returns: (Image): the Image object of the nth frame """ if n < 0 or n >= len(self.frames): raise Exception("n must be in the range 0 - %i" % self.nframes) time = self.times[n] # Make the primary image imarr = self.frames[n].reshape(self.ydim, self.xdim) outim = ehtim.image.Image(imarr, self.psize, self.ra, self.dec, self.pa, polrep=self.polrep, pol_prim=self.pol_prim, time=time, rf=self.rf, source=self.source, mjd=self.mjd, pulse=self.pulse) # Copy over the rest of the polarizations for pol in list(self._movdict.keys()): if pol == self.pol_prim: continue polframes = self._movdict[pol] if len(polframes): polvec = polframes[n] polarr = polvec.reshape(self.ydim, self.xdim).copy() outim.add_pol_image(polarr, pol) return outim
[docs] def im_list(self): """Return a list of the movie frames Args: Returns: (list): list of Image objects """ return [self.get_frame(j) for j in range(self.nframes)]
[docs] def avg_frame(self): """Coherently Average the movie frames into a single image. Returns: (Image) : averaged image of all frames """ # Make the primary image avg_imvec = np.mean(np.array(self.frames), axis=0) avg_imarr = avg_imvec.reshape(self.ydim, self.xdim) outim = ehtim.image.Image(avg_imarr, self.psize, self.ra, self.dec, self.pa, polrep=self.polrep, pol_prim=self.pol_prim, time=self.start_hr, rf=self.rf, source=self.source, mjd=self.mjd, pulse=self.pulse) # Copy over the rest of the average polarizations for pol in list(self._movdict.keys()): if pol == self.pol_prim: continue polframes = self._movdict[pol] if len(polframes): avg_polvec = np.mean(np.array(polframes), axis=0) avg_polarr = avg_polvec.reshape(self.ydim, self.xdim) outim.add_pol_image(avg_polarr, pol) return outim
[docs] def blur_circ(self, fwhm_x, fwhm_t, fwhm_x_pol=0): """Apply a Gaussian filter to a list of images. Args: fwhm_x (float): circular beam size for spatial blurring in radians fwhm_t (float): temporal blurring in frames fwhm_x_pol (float): circular beam size for Stokes Q,U,V spatial blurring in radians Returns: (Image): output image list """ # Unpack the frames frames = self.im_list() # Blur Stokes I sigma_x = fwhm_x / self.psize / (2. * np.sqrt(2. * np.log(2.))) sigma_t = fwhm_t / (2. * np.sqrt(2. * np.log(2.))) sigma_x_pol = fwhm_x_pol / self.psize / (2. * np.sqrt(2. * np.log(2.))) arr = np.array([im.imvec.reshape(self.ydim, self.xdim) for im in frames]) arr = filt.gaussian_filter(arr, (sigma_t, sigma_x, sigma_x)) # Make a new blurred movie arglist, argdict = self.movie_args() arglist[0] = arr movie_blur = Movie(*arglist, **argdict) # Process the remaining polarizations for pol in list(self._movdict.keys()): if pol == self.pol_prim: continue polframes = self._movdict[pol] if len(polframes): arr = np.array([imvec.reshape(self.ydim, self.xdim) for imvec in polframes]) arr = filt.gaussian_filter(arr, (sigma_t, sigma_x_pol, sigma_x_pol)) movie_blur.add_pol_movie(arr, pol) return movie_blur
[docs] def observe_same_nonoise(self, obs, repeat=False, sgrscat=False, ttype="nfft", fft_pad_factor=2, zero_empty_pol=True, verbose=True): """Observe the movie on the same baselines as an existing observation without adding noise. Args: obs (Obsdata): existing observation with baselines where the FT will be sampled repeat (bool): if True, repeat the movie to fill up the observation interval sgrscat (bool): if True, the visibilites are blurred by the Sgr A* scattering kernel ttype (str): if "fast", use FFT to produce visibilities. Else "direct" for DTFT fft_pad_factor (float): zero pad the image to fft_pad_factor * image size in FFT zero_empty_pol (bool): if True, returns zero vec if the polarization doesn't exist. Otherwise return None verbose (bool): Boolean value controls output prints. Returns: (Obsdata): an observation object """ # Check for agreement in coordinates and frequency tolerance = 1e-8 if (np.abs(self.ra - obs.ra) > tolerance) or (np.abs(self.dec - obs.dec) > tolerance): raise Exception("Movie coordinates are not the same as observtion coordinates!") if (np.abs(self.rf - obs.rf)/obs.rf > tolerance): raise Exception("Movie frequency is not the same as observation frequency!") if ttype == 'direct' or ttype == 'fast' or ttype == 'nfft': if verbose: print("Producing clean visibilities from movie with " + ttype + " FT . . . ") else: raise Exception("ttype=%s, options for ttype are 'direct', 'fast', 'nfft'" % ttype) # Get data obslist = obs.tlist() obstimes = np.array([obsdata[0]['time'] for obsdata in obslist]) if (obstimes < self.start_hr).any(): if repeat: print("Some observation times before movie start time %f" % self.start_hr) print("Looping movie before start\n") elif not(self.bounds_error): print("Some observation times before movie start time %f" % self.start_hr) print("bounds_error is False: using constant frame 0 before start_hr! \n") else: raise Exception("Some observation times before movie start time %f" % self.start_hr) if (obstimes > self.stop_hr).any(): if repeat: print("Some observation times after movie stop time %f" % self.stop_hr) print("Looping movie after stop\n") elif not(self.bounds_error): print("Some observation times after movie stop time %f" % self.stop_hr) print("bounds_error is False: using constant frame -1 after stop_hr! \n") else: raise Exception("Some observation times after movie stop time %f" % self.stop_hr) # Observe nearest frame obsdata_out = [] for i in range(len(obslist)): obsdata = obslist[i] # Frame number time = obsdata[0]['time'] if self.bounds_error: if (time < self.start_hr or time > self.stop_hr): if repeat: time = self.start_hr + np.mod(time - self.start_hr, self.duration) else: raise Exception("Obs time %f outside movie range %f--%f" % (time, self.start_hr, self.stop_hr)) # Get the frame visibilities uv = obsh.recarr_to_ndarr(obsdata[['u', 'v']], 'f8') try: im = self.get_image(time) except ValueError: raise Exception("Interpolation error for time %f: movie range %f--%f" % (time, self.start_hr, self.stop_hr)) data = simobs.sample_vis(im, uv, sgrscat=sgrscat, polrep_obs=obs.polrep, ttype=ttype, fft_pad_factor=fft_pad_factor, zero_empty_pol=zero_empty_pol, verbose=verbose) verbose = False # only print for one frame # Put visibilities into the obsdata if obs.polrep == 'stokes': obsdata['vis'] = data[0] if not(data[1] is None): obsdata['qvis'] = data[1] obsdata['uvis'] = data[2] obsdata['vvis'] = data[3] elif obs.polrep == 'circ': obsdata['rrvis'] = data[0] if not(data[1] is None): obsdata['llvis'] = data[1] if not(data[2] is None): obsdata['rlvis'] = data[2] obsdata['lrvis'] = data[3] if len(obsdata_out): obsdata_out = np.hstack((obsdata_out, obsdata)) else: obsdata_out = obsdata obsdata_out = np.array(obsdata_out, dtype=obs.poltype) obs_no_noise = ehtim.obsdata.Obsdata(self.ra, self.dec, self.rf, obs.bw, obsdata_out, obs.tarr, source=self.source, mjd=np.floor(obs.mjd), polrep=obs.polrep, ampcal=True, phasecal=True, opacitycal=True, dcal=True, frcal=True, timetype=obs.timetype, scantable=obs.scans) return obs_no_noise
[docs] def observe_same(self, obs_in, repeat=False, ttype='nfft', fft_pad_factor=2, sgrscat=False, add_th_noise=True, jones=False, inv_jones=False, opacitycal=True, ampcal=True, phasecal=True, frcal=True, dcal=True, rlgaincal=True, stabilize_scan_phase=False, stabilize_scan_amp=False, neggains=False, taup=ehc.GAINPDEF, gain_offset=ehc.GAINPDEF, gainp=ehc.GAINPDEF, dterm_offset=ehc.DTERMPDEF, rlratio_std=0.,rlphase_std=0., caltable_path=None, seed=False, sigmat=None, verbose=True): """Observe the image on the same baselines as an existing observation object and add noise. Args: obs_in (Obsdata): existing observation with baselines where the FT will be sampled repeat (bool): if True, repeat the movie to fill up the observation interval ttype (str): "fast" or "nfft" or "direct" fft_pad_factor (float): zero pad the image to fft_pad_factor * image size in FFT sgrscat (bool): if True, the visibilites will be blurred by the Sgr A* kernel add_th_noise (bool): if True, baseline-dependent thermal noise is added jones (bool): if True, uses Jones matrix to apply mis-calibration effects inv_jones (bool): if True, applies estimated inverse Jones matrix (not including random terms) to a priori calibrate data opacitycal (bool): if False, time-dependent gaussian errors are added to opacities ampcal (bool): if False, time-dependent gaussian errors are added to station gains phasecal (bool): if False, time-dependent station-based random phases are added frcal (bool): if False, feed rotation angle terms are added to Jones matrices. dcal (bool): if False, time-dependent gaussian errors added to D-terms. rlgaincal (bool): if False, time-dependent gains are not equal for R and L pol stabilize_scan_phase (bool): if True, random phase errors are constant over scans stabilize_scan_amp (bool): if True, random amplitude errors are constant over scans neggains (bool): if True, force the applied gains to be <1 taup (float): the fractional std. dev. of the random error on the opacities gainp (float): the fractional std. dev. of the random error on the gains or a dict giving one std. dev. per site gain_offset (float): the base gain offset at all sites, or a dict giving one gain offset per site dterm_offset (float): the base std. dev. of random additive error at all sites, or a dict giving one std. dev. per site rlratio_std (float): the fractional std. dev. of the R/L gain offset or a dict giving one std. dev. per site rlphase_std (float): std. dev. of R/L phase offset, or a dict giving one std. dev. per site a negative value samples from uniform caltable_path (string): If not None, path and prefix for saving the applied caltable seed (int): seeds the random component of the noise terms. DO NOT set to 0! sigmat (float): temporal std for a Gaussian Process used to generate gain noise. if sigmat=None then an iid gain noise is applied. verbose (bool): print updates and warnings Returns: (Obsdata): an observation object """ if seed: np.random.seed(seed=seed) # print("Producing clean visibilities from movie . . . ") obs = self.observe_same_nonoise(obs_in, repeat=repeat, sgrscat=sgrscat, ttype=ttype, fft_pad_factor=fft_pad_factor, zero_empty_pol=True, verbose=verbose) # Jones Matrix Corruption & Calibration if jones: obsdata = simobs.add_jones_and_noise(obs, add_th_noise=add_th_noise, opacitycal=opacitycal, ampcal=ampcal, phasecal=phasecal, frcal=frcal, dcal=dcal, rlgaincal=rlgaincal, stabilize_scan_phase=stabilize_scan_phase, stabilize_scan_amp=stabilize_scan_amp, neggains=neggains, taup=taup, gain_offset=gain_offset, gainp=gainp, dterm_offset=dterm_offset, rlratio_std=rlratio_std, rlphase_std=rlphase_std, caltable_path=caltable_path, seed=seed, sigmat=sigmat, verbose=verbose) obs = ehtim.obsdata.Obsdata(obs.ra, obs.dec, obs.rf, obs.bw, obsdata, obs.tarr, source=obs.source, mjd=obs.mjd, polrep=obs_in.polrep, ampcal=ampcal, phasecal=phasecal, opacitycal=opacitycal, dcal=dcal, frcal=frcal, timetype=obs.timetype, scantable=obs.scans) if inv_jones: obsdata = simobs.apply_jones_inverse(obs, opacitycal=opacitycal, dcal=dcal, frcal=frcal, verbose=verbose) obs = ehtim.obsdata.Obsdata(obs.ra, obs.dec, obs.rf, obs.bw, obsdata, obs.tarr, source=obs.source, mjd=obs.mjd, polrep=obs_in.polrep, ampcal=ampcal, phasecal=phasecal, opacitycal=True, dcal=True, frcal=True, timetype=obs.timetype, scantable=obs.scans) # No Jones Matrices, Add noise the old way # TODO There is an asymmetry here - in the old way, we don't offer the ability to # *not* unscale estimated noise. else: if caltable_path: print('WARNING: the caltable is only saved if you apply noise with a Jones Matrix') obsdata = simobs.add_noise(obs, add_th_noise=add_th_noise, opacitycal=opacitycal, ampcal=ampcal, phasecal=phasecal, stabilize_scan_phase=stabilize_scan_phase, stabilize_scan_amp=stabilize_scan_amp, neggains=neggains, taup=taup, gain_offset=gain_offset, gainp=gainp, caltable_path=caltable_path, seed=seed, sigmat=sigmat, verbose=verbose) obs = ehtim.obsdata.Obsdata(obs.ra, obs.dec, obs.rf, obs.bw, obsdata, obs.tarr, source=obs.source, mjd=obs.mjd, polrep=obs_in.polrep, ampcal=ampcal, phasecal=phasecal, opacitycal=True, dcal=True, frcal=True, timetype=obs.timetype, scantable=obs.scans) return obs
[docs] def observe(self, array, tint, tadv, tstart, tstop, bw, repeat=False, mjd=None, timetype='UTC', polrep_obs=None, elevmin=ehc.ELEV_LOW, elevmax=ehc.ELEV_HIGH, no_elevcut_space=False, ttype='nfft', fft_pad_factor=2, fix_theta_GMST=False, sgrscat=False, add_th_noise=True, jones=False, inv_jones=False, opacitycal=True, ampcal=True, phasecal=True, frcal=True, dcal=True, rlgaincal=True, stabilize_scan_phase=False, stabilize_scan_amp=False, neggains=False, tau=ehc.TAUDEF, taup=ehc.GAINPDEF, gain_offset=ehc.GAINPDEF, gainp=ehc.GAINPDEF, dterm_offset=ehc.DTERMPDEF, rlratio_std=0.,rlphase_std=0., caltable_path=None, seed=False, sigmat=None, verbose=True): """Generate baselines from an array object and observe the movie. Args: array (Array): an array object containing sites with which to generate baselines tint (float): the scan integration time in seconds tadv (float): the uniform cadence between scans in seconds tstart (float): the start time of the observation in hours tstop (float): the end time of the observation in hours bw (float): the observing bandwidth in Hz repeat (bool): if True, repeat the movie to fill up the observation interval mjd (int): the mjd of the observation, if set as different from the image mjd timetype (str): how to interpret tstart and tstop; either 'GMST' or 'UTC' polrep_obs (str): 'stokes' or 'circ' sets the data polarimetric representation elevmin (float): station minimum elevation in degrees elevmax (float): station maximum elevation in degrees no_elevcut_space (bool): if True, do not apply elevation cut to orbiters ttype (str): "fast", "nfft" or "dtft" fft_pad_factor (float): zero pad the image to fft_pad_factor * image size in the FFT fix_theta_GMST (bool): if True, stops earth rotation to sample fixed u,v sgrscat (bool): if True, the visibilites will be blurred by the Sgr A* kernel add_th_noise (bool): if True, baseline-dependent thermal noise is added jones (bool): if True, uses Jones matrix to apply mis-calibration effects otherwise uses old formalism without D-terms inv_jones (bool): if True, applies estimated inverse Jones matrix (not including random terms) to calibrate data opacitycal (bool): if False, time-dependent gaussian errors are added to opacities ampcal (bool): if False, time-dependent gaussian errors are added to station gains phasecal (bool): if False, time-dependent station-based random phases are added frcal (bool): if False, feed rotation angle terms are added to Jones matrix. dcal (bool): if False, time-dependent gaussian errors added to Jones matrix D-terms. rlgaincal (bool): if False, time-dependent gains are not equal for R and L pol stabilize_scan_phase (bool): if True, random phase errors are constant over scans stabilize_scan_amp (bool): if True, random amplitude errors are constant over scans neggains (bool): if True, force the applied gains to be <1 tau (float): the base opacity at all sites, or a dict giving one opacity per site taup (float): the fractional std. dev. of the random error on the opacities gainp (float): the fractional std. dev. of the random error on the gains or a dict giving one std. dev. per site gain_offset (float): the base gain offset at all sites, or a dict giving one gain offset per site dterm_offset (float): the base std. dev. of random additive error at all sites, or a dict giving one std. dev. per site rlratio_std (float): the fractional std. dev. of the R/L gain offset or a dict giving one std. dev. per site rlphase_std (float): std. dev. of R/L phase offset, or a dict giving one std. dev. per site a negative value samples from uniform caltable_path (string): If not None, path and prefix for saving the applied caltable seed (int): seeds the random component of the noise terms. DO NOT set to 0! sigmat (float): temporal std for a Gaussian Process used to generate gain noise. if sigmat=None then an iid gain noise is applied. verbose (bool): print updates and warnings Returns: (Obsdata): an observation object """ # Generate empty observation if verbose: print("Generating empty observation file . . . ") if mjd is None: mjd = self.mjd if polrep_obs is None: polrep_obs = self.polrep obs = array.obsdata(self.ra, self.dec, self.rf, bw, tint, tadv, tstart, tstop, mjd=mjd, polrep=polrep_obs, tau=tau, timetype=timetype, elevmin=elevmin, elevmax=elevmax, no_elevcut_space=no_elevcut_space, fix_theta_GMST=fix_theta_GMST) # Observe on the same baselines as the empty observation and add noise obs = self.observe_same(obs, repeat=repeat, ttype=ttype, fft_pad_factor=fft_pad_factor, sgrscat=sgrscat, add_th_noise=add_th_noise, jones=jones, inv_jones=inv_jones, opacitycal=opacitycal, ampcal=ampcal, phasecal=phasecal, dcal=dcal, frcal=frcal, rlgaincal=rlgaincal, stabilize_scan_phase=stabilize_scan_phase, stabilize_scan_amp=stabilize_scan_amp, neggains=neggains, taup=taup, gain_offset=gain_offset, gainp=gainp, dterm_offset=dterm_offset, rlratio_std=rlratio_std,rlphase_std=rlphase_std, caltable_path=caltable_path, seed=seed, sigmat=sigmat, verbose=verbose) return obs
[docs] def observe_vex(self, vex, source, synchronize_start=True, t_int=0.0, polrep_obs=None, ttype='nfft', fft_pad_factor=2, fix_theta_GMST=False, sgrscat=False, add_th_noise=True, jones=False, inv_jones=False, opacitycal=True, ampcal=True, phasecal=True, frcal=True, dcal=True, rlgaincal=True, stabilize_scan_phase=False, stabilize_scan_amp=False, neggains=False, tau=ehc.TAUDEF, taup=ehc.GAINPDEF, gain_offset=ehc.GAINPDEF, gainp=ehc.GAINPDEF, dterm_offset=ehc.DTERMPDEF, caltable_path=None, seed=False, sigmat=None, verbose=True): """Generate baselines from a vex file and observe the movie. Args: vex (Vex): an vex object containing sites and scan information source (str): the source to observe synchronize_start (bool): if True, the start of the movie is defined as the start of the observations t_int (float): if not zero, overrides the vex scan lengths polrep_obs (str): 'stokes' or 'circ' sets the data polarimetric representation ttype (str): "fast" or "nfft" or "dtft" fft_pad_factor (float): zero pad the image to fft_pad_factor * image size in FFT fix_theta_GMST (bool): if True, stops earth rotation to sample fixed u,v sgrscat (bool): if True, the visibilites will be blurred by the Sgr A* kernel add_th_noise (bool): if True, baseline-dependent thermal noise is added jones (bool): if True, uses Jones matrix to apply mis-calibration effects otherwise uses old formalism without D-terms inv_jones (bool): if True, applies estimated inverse Jones matrix (not including random terms) to calibrate data opacitycal (bool): if False, time-dependent gaussian errors are added to opacities ampcal (bool): if False, time-dependent gaussian errors are added to station gains phasecal (bool): if False, time-dependent station-based random phases are added frcal (bool): if False, feed rotation angle terms are added to Jones matrix. dcal (bool): if False, time-dependent gaussian errors added to Jones matrix D-terms. rlgaincal (bool): if False, time-dependent gains are not equal for R and L pol stabilize_scan_phase (bool): if True, random phase errors are constant over scans stabilize_scan_amp (bool): if True, random amplitude errors are constant over scans neggains (bool): if True, force the applied gains to be <1 tau (float): the base opacity at all sites, or a dict giving one opacity per site taup (float): the fractional std. dev. of the random error on the opacities gain_offset (float): the base gain offset at all sites, or a dict giving one gain offset per site gainp (float): the fractional std. dev. of the random error on the gains dterm_offset (float): the base dterm offset at all sites, or a dict giving one dterm offset per site caltable_path (string): If not None, path and prefix for saving the applied caltable seed (int): seeds the random component of the noise terms. DO NOT set to 0! sigmat (float): temporal std for a Gaussian Process used to generate gain noise. if sigmat=None then an iid gain noise is applied. verbose (bool): print updates and warnings Returns: (Obsdata): an observation object """ if polrep_obs is None: polrep_obs = self.polrep obs_List = [] movie = self.copy() if synchronize_start: movie.mjd = vex.sched[0]['mjd_floor'] movie.start_hr = vex.sched[0]['start_hr'] movie_start = float(movie.mjd) + movie.start_hr/24.0 movie_end = float(movie.mjd) + movie.stop_hr/24.0 print("Movie MJD Range: ", movie_start, movie_end) snapshot = 1.0 if t_int > 0.0: snapshot = 0.0 for i_scan in range(len(vex.sched)): if vex.sched[i_scan]['source'] != source: continue scankeys = list(vex.sched[i_scan]['scan'].keys()) subarray = vex.array.make_subarray([vex.sched[i_scan]['scan'][key]['site'] for key in scankeys]) if snapshot == 1.0: t_int = np.max(np.array([vex.sched[i_scan]['scan'][site] ['scan_sec'] for site in scankeys])) print(t_int) vex_scan_start_mjd = float(vex.sched[i_scan]['mjd_floor']) vex_scan_start_mjd += vex.sched[i_scan]['start_hr']/24.0 vex_scan_length_mjd = vex.sched[i_scan]['scan'][0]['scan_sec']/3600.0/24.0 vex_scan_stop_mjd = vex_scan_start_mjd + vex_scan_length_mjd print("Scan MJD Range: ", vex_scan_start_mjd, vex_scan_stop_mjd) if vex_scan_start_mjd < movie_start or vex_scan_stop_mjd > movie_end: continue t_start = vex.sched[i_scan]['start_hr'] t_stop = t_start + vex.sched[i_scan]['scan'][0]['scan_sec']/3600.0 - ehc.EP mjd = vex.sched[i_scan]['mjd_floor'] obs = subarray.obsdata(movie.ra, movie.dec, movie.rf, vex.bw_hz, t_int, t_int, t_start, t_stop, mjd=mjd, polrep=polrep_obs, tau=tau, elevmin=.01, elevmax=89.99, timetype='UTC', fix_theta_GMST=fix_theta_GMST) obs_List.append(obs) if len(obs_List) == 0: raise Exception("Movie has no overlap with the vex file") obs = ehtim.obsdata.merge_obs(obs_List) obsout = movie.observe_same(obs, repeat=False, ttype=ttype, fft_pad_factor=fft_pad_factor, sgrscat=sgrscat, add_th_noise=add_th_noise, jones=jones, inv_jones=inv_jones, opacitycal=opacitycal, ampcal=ampcal, phasecal=phasecal, frcal=frcal, dcal=dcal, rlgaincal=rlgaincal, stabilize_scan_phase=stabilize_scan_phase, stabilize_scan_amp=stabilize_scan_amp, neggains=neggains, taup=taup, gain_offset=gain_offset, gainp=gainp, dterm_offset=dterm_offset, caltable_path=caltable_path, seed=seed,sigmat=sigmat, verbose=verbose) return obsout
[docs] def save_txt(self, fname): """Save the Movie data to individual text files with filenames basename + 00001, etc. Args: fname (str): basename of output files Returns: """ ehtim.io.save.save_mov_txt(self, fname) return
[docs] def save_fits(self, fname): """Save the Movie data to individual fits files with filenames basename + 00001, etc. Args: fname (str): basename of output files Returns: """ ehtim.io.save.save_mov_fits(self, fname) return
[docs] def save_hdf5(self, fname): """Save the Movie data to a single hdf5 file. Args: fname (str): output file name Returns: """ ehtim.io.save.save_mov_hdf5(self, fname) return
[docs] def export_mp4(self, out='movie.mp4', fps=10, dpi=120, interp='gaussian', scale='lin', dynamic_range=1000.0, cfun='afmhot', nvec=20, pcut=0.01, plotp=False, gamma=0.5, frame_pad_factor=1, label_time=False, verbose=False): """Save the Movie to an mp4 file """ import matplotlib matplotlib.use('agg') import matplotlib.pyplot as plt import matplotlib.animation as animation if self.polrep != 'stokes': raise Exception("export_mp4 requires self.polrep=='stokes' -- try self.switch_polrep()") if (interp in ['gauss', 'gaussian', 'Gaussian', 'Gauss']): interp = 'gaussian' else: interp = 'nearest' if scale == 'lin': unit = 'Jy/pixel' elif scale == 'log': unit = 'log(Jy/pixel)' elif scale == 'gamma': unit = '(Jy/pixel)^gamma' else: raise Exception("Scale not recognized!") fig = plt.figure() maxi = np.max(np.concatenate([im for im in self.frames])) if len(self.qframes) and plotp: thin = self.xdim//nvec mask = (self.frames[0]).reshape(self.ydim, self.xdim) > pcut * np.max(self.frames[0]) mask2 = mask[::thin, ::thin] x = (np.array([[i for i in range(self.xdim)] for j in range(self.ydim)])[::thin, ::thin])[mask2] y = (np.array([[j for i in range(self.xdim)] for j in range(self.ydim)])[::thin, ::thin])[mask2] a = (-np.sin(np.angle(self.qframes[0]+1j*self.uframes[0]) / 2).reshape(self.ydim, self.xdim)[::thin, ::thin])[mask2] b = (np.cos(np.angle(self.qframes[0]+1j*self.uframes[0]) / 2).reshape(self.ydim, self.xdim)[::thin, ::thin])[mask2] m = (np.abs(self.qframes[0] + 1j*self.uframes[0]) / self.frames[0]).reshape(self.ydim, self.xdim) m[np.logical_not(mask)] = 0 Q1 = plt.quiver(x, y, a, b, headaxislength=20, headwidth=1, headlength=.01, minlength=0, minshaft=1, width=.01*self.xdim, units='x', pivot='mid', color='k', angles='uv', scale=1.0/thin) Q2 = plt.quiver(x, y, a, b, headaxislength=20, headwidth=1, headlength=.01, minlength=0, minshaft=1, width=.005*self.xdim, units='x', pivot='mid', color='w', angles='uv', scale=1.1/thin) def im_data(n): n_data = int((n-n % frame_pad_factor)/frame_pad_factor) if len(self.qframes) and plotp: a = (-np.sin(np.angle(self.qframes[n_data]+1j*self.uframes[n_data] )/2).reshape(self.ydim, self.xdim)[::thin, ::thin])[mask2] b = (np.cos(np.angle(self.qframes[n_data]+1j*self.uframes[n_data] )/2).reshape(self.ydim, self.xdim)[::thin, ::thin])[mask2] Q1.set_UVC(a, b) Q2.set_UVC(a, b) if scale == 'lin': return self.frames[n_data].reshape((self.ydim, self.xdim)) elif scale == 'log': return np.log(self.frames[n_data].reshape( (self.ydim, self.xdim)) + maxi/dynamic_range) elif scale == 'gamma': return (self.frames[n_data]**(gamma)).reshape((self.ydim, self.xdim)) plt_im = plt.imshow(im_data(0), cmap=plt.get_cmap(cfun), interpolation=interp) plt.colorbar(plt_im, fraction=0.046, pad=0.04, label=unit) if scale == 'lin': plt_im.set_clim([0, maxi]) else: plt_im.set_clim([np.log(maxi/dynamic_range), np.log(maxi)]) xticks = obsh.ticks(self.xdim, self.psize/ehc.RADPERAS/1e-6) yticks = obsh.ticks(self.ydim, self.psize/ehc.RADPERAS/1e-6) plt.xticks(xticks[0], xticks[1]) plt.yticks(yticks[0], yticks[1]) plt.xlabel(r'Relative RA ($\mu$as)') plt.ylabel(r'Relative Dec ($\mu$as)') fig.set_size_inches([5, 5]) plt.tight_layout() def update_img(n): if verbose: print("processing frame {0} of {1}".format(n, len(self.frames)*frame_pad_factor)) plt_im.set_data(im_data(n)) if label_time: time = self.times[n] time_str = ("%02d:%02d:%02d" % (int(time), (time*60) % 60, (time*3600) % 60)) fig.suptitle(time_str) return plt_im ani = animation.FuncAnimation(fig, update_img, len( self.frames)*frame_pad_factor, interval=1e3/fps) writer = animation.writers['ffmpeg'](fps=fps, bitrate=1e6) ani.save(out, writer=writer, dpi=dpi)
################################################################################################## # Movie creation functions ##################################################################################################
[docs]def export_multipanel_mp4(input_list, out='movie.mp4', start_hr=None, stop_hr=None, nframes=100, fov=None, npix=None, nrows=1, fps=10, dpi=120, verbose=False, titles=None, panel_size=4.0, common_scale=False, scale='linear', label_type='scale', has_cbar=False, **kwargs): """Export a movie comparing multiple movies in a grid. Args: input_list (list): The list of input Movies or Images out (string): The output filename start_hr (float): The start time in hours. If None, defaults to first start time end_hr (float): The end time in hours. If None, defaults to last start time nframes (int): The number of frames in the output movie fov (float): If specified, use this field of view for all panels npix (int): If specified, use this linear pixel dimension for all panels nrows (int): Number of rows in movie fps (int): Frames per second titles (list): List of panel titles for input_list panel_size (float): Size of individual panels (inches) """ import matplotlib matplotlib.use('agg') import matplotlib.pyplot as plt import matplotlib.animation as animation if start_hr is None: try: start_hr = np.min([x.start_hr for x in input_list if hasattr(x, 'start_hr')]) except ValueError: raise Exception("no movies in input_list!") if stop_hr is None: try: stop_hr = np.max([x.stop_hr for x in input_list if hasattr(x, 'stop_hr')]) except ValueError: raise Exception("no movies in input_list!") print("%s will have %i frames in the range %f-%f hr" % (out, nframes, start_hr, stop_hr)) ncols = int(np.ceil(len(input_list)/nrows)) suptitle_space = 0.6 # inches w = panel_size*ncols h = panel_size*nrows + suptitle_space tgap = suptitle_space / h bgap = .1 rgap = .1 lgap = .1 subw = (1-lgap-rgap)/ncols subh = (1-tgap-bgap)/nrows print("Rows: " + str(nrows)) print("Cols: " + str(ncols)) fig = plt.figure(figsize=(w, h)) ax_all = [[] for j in range(nrows)] for y in range(nrows): for x in range(ncols): ax = fig.add_axes([lgap+subw*x, bgap+subh*(nrows-y-1), subw, subh]) ax_all[y].append(ax) times = np.linspace(start_hr, stop_hr, nframes) hr_step = times[1]-times[0] mjd_step = hr_step/24. im_List_Set = [[x.get_image(time) if hasattr(x, 'get_image') else x.copy() for time in times] for x in input_list] if fov and npix: im_List_Set = [[x.regrid_image(fov, npix) for x in y] for y in im_List_Set] else: print('not rescaling images to common fov and npix!') maxi = [np.max([im.imvec for im in im_List_Set[j]]) for j in range(len(im_List_Set))] if common_scale: maxi = np.max(maxi) + 0.0*maxi i = 0 for y in range(nrows): for x in range(ncols): if i >= len(im_List_Set): ax_all[y][x].set_visible(False) else: kwargs.get('ttype', 'nfft') if (y == nrows-1 and x == 0) or fov is None: label_type_cur = label_type else: label_type_cur = 'none' im_List_Set[i][0].display(axis=ax_all[y][x], scale=scale, label_type=label_type_cur, has_cbar=has_cbar, **kwargs) if y == nrows-1 and x == 0: plt.xlabel(r'Relative RA ($\mu$as)') plt.ylabel(r'Relative Dec ($\mu$as)') else: plt.xlabel('') plt.ylabel('') if not titles: ax_all[y][x].set_title('') else: ax_all[y][x].set_title(titles[i]) i = i+1 def im_data(i, n): if scale == 'linear': return im_List_Set[i][n].imvec.reshape((im_List_Set[i][n].ydim, im_List_Set[i][n].xdim)) else: return np.log(im_List_Set[i][n].imvec.reshape( (im_List_Set[i][n].ydim, im_List_Set[i][n].xdim)) + 1e-20) def update_img(n): if verbose: print("processing frame {0} of {1}".format(n, len(im_List_Set[0]))) i = 0 for y in range(nrows): for x in range(ncols): ax_all[y][x].images[0].set_data(im_data(i, n)) i = i+1 if i >= len(im_List_Set): break if mjd_step > 0.1: # , verticalalignment=verticalalignment) fig.suptitle('MJD: ' + str(im_List_Set[0][n].mjd)) else: time = im_List_Set[0][n].time time_str = ("%d:%02d.%02d" % (int(time), (time*60) % 60, (time*3600) % 60)) fig.suptitle(time_str) return ani = animation.FuncAnimation(fig, update_img, len(im_List_Set[0]), interval=1e3/fps) writer = animation.writers['ffmpeg'](fps=fps, bitrate=1e6) ani.save(out, writer=writer, dpi=dpi)
[docs]def merge_im_list(imlist, framedur=-1, interp=ehc.INTERP_DEFAULT, bounds_error=ehc.BOUNDS_ERROR): """Merge a list of image objects into a movie object. Args: imlist (list): list of Image objects framedur (float): duration of a movie frame in seconds use to override times in the individual movies interp (str): Interpolation method, input to scipy.interpolate.interp1d kind keyword bounds_error (bool): if False, return nearest frame outside interval [start_hr, stop_hr] Returns: (Movie): a Movie object assembled from the images """ framelist = [] nframes = len(imlist) print("\nMerging %i frames from MJD %i %.2f hr to MJD %i %.2f hr" % ( nframes, imlist[0].mjd, imlist[0].time, imlist[-1].mjd, imlist[-1].time)) for i in range(nframes): im = imlist[i] if i == 0: polrep0 = im.polrep pol_prim0 = im.pol_prim movdict = {key: [] for key in list(im._imdict.keys())} psize0 = im.psize xdim0 = im.xdim ydim0 = im.ydim ra0 = im.ra dec0 = im.dec rf0 = im.rf src0 = im.source mjd0 = im.mjd hour0 = im.time pulse = im.pulse times = [hour0] else: if (im.polrep != polrep0): raise Exception("polrep of image %i != polrep of image 0!" % i) if (im.psize != psize0): raise Exception("psize of image %i != psize of image 0!" % i) if (im.xdim != xdim0): raise Exception("xdim of image %i != xdim of image 0!" % i) if (im.ydim != ydim0): raise Exception("ydim of image %i != ydim of image 0!" % i) if (im.ra != ra0): raise Exception("RA of image %i != RA of image 0!" % i) if (im.dec != dec0): raise Exception("DEC of image %i != DEC of image 0!" % i) if (im.rf != rf0): raise Exception("rf of image %i != rf of image 0!" % i) if (im.source != src0): raise Exception("source of image %i != src of image 0!" % i) if (im.mjd < mjd0): raise Exception("mjd of image %i < mjd of image 0!" % i) hour = im.time if im.mjd > mjd0: hour += 24*(im.mjd - mjd0) times.append(hour) imarr = im.imvec.reshape(ydim0, xdim0) framelist.append(imarr) # Look for other polarizations for pol in list(movdict.keys()): polvec = im._imdict[pol] if len(polvec): polarr = polvec.reshape(ydim0, xdim0) movdict[pol].append(polarr) else: if movdict[pol]: raise Exception("all frames in merge_im_list must have the same pol layout: " + "error in frame %i" % i) # assume equispaced with a given framedur instead of reading the individual image times if framedur != -1: framedur_hr = framedur/3600. tstart = hour0 tstop = hour0 + framedur_hr*nframes times = np.linspace(tstart, tstop, nframes) elif len(set(times)) < len(framelist): raise Exception("image times have duplicates!") # Make new movie with primary polarization newmov = Movie(framelist, times, psize0, ra0, dec0, interp=interp, bounds_error=bounds_error, polrep=polrep0, pol_prim=pol_prim0, rf=rf0, source=src0, mjd=mjd0, pulse=pulse) # Copy over all polarization movies for pol in list(movdict.keys()): if pol == newmov.pol_prim: continue polframes = np.array(movdict[pol]) if len(polframes): polframes = polframes.reshape((newmov.nframes, newmov.ydim, newmov.xdim)) newmov.add_pol_movie(polframes, pol) return newmov
[docs]def load_hdf5(file_name, pulse=ehc.PULSE_DEFAULT, interp=ehc.INTERP_DEFAULT, bounds_error=ehc.BOUNDS_ERROR): """Read in a movie from an hdf5 file and create a Movie object. Args: file_name (str): The name of the hdf5 file. pulse (function): The function convolved with the pixel values for continuous image interp (str): Interpolation method, input to scipy.interpolate.interp1d kind keyword bounds_error (bool): if False, return nearest frame outside interval [start_hr, stop_hr] Returns: Movie: a Movie object """ return ehtim.io.load.load_movie_hdf5(file_name, pulse=pulse, interp=interp, bounds_error=bounds_error)
[docs]def load_txt(basename, nframes, framedur=-1, pulse=ehc.PULSE_DEFAULT, polrep='stokes', pol_prim=None, zero_pol=True, interp=ehc.INTERP_DEFAULT, bounds_error=ehc.BOUNDS_ERROR): """Read in a movie from text files and create a Movie object. Args: basename (str): The base name of individual movie frames. Files should have names basename + 00001, etc. nframes (int): The total number of frames framedur (float): The frame duration in seconds if famedur==-1, frame duration taken from file headers pulse (function): The function convolved with the pixel values for continuous image polrep (str): polarization representation, either 'stokes' or 'circ' pol_prim (str): The default image: I,Q,U or V for Stokes, RR,LL,LR,RL for Circular zero_pol (bool): If True, loads any missing polarizations as zeros interp (str): Interpolation method, input to scipy.interpolate.interp1d kind keyword bounds_error (bool): if False, return nearest frame outside interval [start_hr, stop_hr] Returns: Movie: a Movie object """ return ehtim.io.load.load_movie_txt(basename, nframes, framedur=framedur, pulse=pulse, polrep=polrep, pol_prim=pol_prim, zero_pol=zero_pol, interp=interp, bounds_error=bounds_error)
[docs]def load_fits(basename, nframes, framedur=-1, pulse=ehc.PULSE_DEFAULT, polrep='stokes', pol_prim=None, zero_pol=True, interp=ehc.INTERP_DEFAULT, bounds_error=ehc.BOUNDS_ERROR): """Read in a movie from fits files and create a Movie object. Args: basename (str): The base name of individual movie frames. Files should have names basename + 00001, etc. nframes (int): The total number of frames framedur (float): The frame duration in seconds. if famedur==-1, frame duration taken from file headers pulse (function): The function convolved with the pixel values for continuous image polrep (str): polarization representation, either 'stokes' or 'circ' pol_prim (str): The default image: I,Q,U or V for Stokes, RR,LL,LR,RL for Circular zero_pol (bool): If True, loads any missing polarizations as zeros interp (str): Interpolation method, input to scipy.interpolate.interp1d kind keyword bounds_error (bool): if False, return nearest frame outside interval [start_hr, stop_hr] Returns: Movie: a Movie object """ return ehtim.io.load.load_movie_fits(basename, nframes, framedur=framedur, pulse=pulse, polrep=polrep, pol_prim=pol_prim, zero_pol=zero_pol, interp=interp, bounds_error=bounds_error)