Source code for ehtim.scattering.stochastic_optics

# Michael Johnson, 2/15/2017
# See http://adsabs.harvard.edu/abs/2016ApJ...833...74J for details about this module

from __future__ import print_function
from builtins import range
from builtins import object
import numpy as np
import scipy.signal
import scipy.special as sps
import scipy.integrate as integrate
from scipy.optimize import minimize

import matplotlib.pyplot as plt

import ehtim.image as image
import ehtim.movie as movie
import ehtim.obsdata as obsdata
from ehtim.observing.obs_helpers import *
from ehtim.const_def import * #Note: C is m/s rather than cm/s.

from multiprocessing import cpu_count
from multiprocessing import Pool

import math
import cmath

################################################################################
# The class ScatteringModel enscompasses a generic scattering model, determined by the power spectrum Q and phase structure function Dphi
################################################################################

[docs]class ScatteringModel(object): """A scattering model based on a thin-screen approximation. Models include: ('von_Mises', 'boxcar', 'dipole'): These scattering models are motivated by observations of Sgr A*. Each gives a Gaussian at long wavelengths that matches the model defined by {theta_maj_mas_ref, theta_min_mas_ref, POS_ANG} at the reference wavelength wavelength_reference_cm with a lambda^2 scaling. The source sizes {theta_maj, theta_min} are the image FWHM in milliarcseconds at the reference wavelength. Note that this may not match the ensemble-average kernel at the reference wavelength, if the reference wavelength is short enough to be beyond the lambda^2 regime! This model also includes an inner and outer scale and will thus transition to scattering with scatt_alpha at shorter wavelengths Note: This model *requires* a finite inner scale 'power-law': This scattering model gives a pure power law at all wavelengths. There is no inner scale, but there can be an outer scale. The ensemble-average image is given by {theta_maj_mas_ref, theta_min_mas_ref, POS_ANG} at the reference wavelength wavelength_reference_cm. The ensemble-average image size is proportional to wavelength^(1+2/scatt_alpha) = wavelength^(11/5) for Kolmogorov Attributes: model (string): The type of scattering model (determined by the power spectrum of phase fluctuations). scatt_alpha (float): The power-law index of the phase fluctuations (Kolmogorov is 5/3). observer_screen_distance (float): The distance from the observer to the scattering screen in cm. source_screen_distance (float): The distance from the source to the scattering screen in cm. theta_maj_mas_ref (float): FWHM in mas of the major axis angular broadening at the specified reference wavelength. theta_min_mas_ref (float): FWHM in mas of the minor axis angular broadening at the specified reference wavelength. POS_ANG (float): The position angle of the major axis of the scattering. wavelength_reference_cm (float): The reference wavelength for the scattering model in cm. r_in (float): The inner scale of the scattering screen in cm. r_out (float): The outer scale of the scattering screen in cm. rF (function): The Fresnel scale of the scattering screen at the specific wavelength. """ def __init__(self, model = 'dipole', scatt_alpha = 1.38, observer_screen_distance = 2.82 * 3.086e21, source_screen_distance = 5.53 * 3.086e21, theta_maj_mas_ref = 1.380, theta_min_mas_ref = 0.703, POS_ANG = 81.9, wavelength_reference_cm = 1.0, r_in = 800e5, r_out = 1e20): """To initialize the scattering model, specify: Attributes: model (string): The type of scattering model (determined by the power spectrum of phase fluctuations). Options are 'von_Mises', 'boxcar', 'dipole', and 'power-law' scatt_alpha (float): The power-law index of the phase fluctuations (Kolmogorov is 5/3). observer_screen_distance (float): The distance from the observer to the scattering screen in cm. source_screen_distance (float): The distance from the source to the scattering screen in cm. theta_maj_mas_ref (float): FWHM in mas of the major axis angular broadening at the specified reference wavelength. theta_min_mas_ref (float): FWHM in mas of the minor axis angular broadening at the specified reference wavelength. POS_ANG (float): The position angle of the major axis of the scattering. wavelength_reference_cm (float): The reference wavelength for the scattering model in cm. r_in (float): The inner scale of the scattering screen in cm. r_out (float): The outer scale of the scattering screen in cm. """ self.model = model self.POS_ANG = POS_ANG #Major axis position angle [degrees, east of north] self.observer_screen_distance = observer_screen_distance #cm self.source_screen_distance = source_screen_distance #cm M = observer_screen_distance/source_screen_distance self.wavelength_reference = wavelength_reference_cm #Reference wavelength [cm] self.r_in = r_in #inner scale [cm] self.r_out = r_out #outer scale [cm] self.scatt_alpha = scatt_alpha FWHM_fac = (2.0 * np.log(2.0))**0.5/np.pi self.Qbar = 2.0/sps.gamma((2.0 - self.scatt_alpha)/2.0) * (self.r_in**2*(1.0 + M)/(FWHM_fac*(self.wavelength_reference/(2.0*np.pi))**2) )**2 * ( (theta_maj_mas_ref**2 + theta_min_mas_ref**2)*(1.0/1000.0/3600.0*np.pi/180.0)**2) self.C_scatt_0 = (self.wavelength_reference/(2.0*np.pi))**2 * self.Qbar*sps.gamma(1.0 - self.scatt_alpha/2.0)/(8.0*np.pi**2*self.r_in**2) A = theta_maj_mas_ref/theta_min_mas_ref # Anisotropy, >=1, as lambda->infinity self.phi0 = (90 - self.POS_ANG) * np.pi/180.0 # Parameters for the approximate phase structure function theta_maj_rad_ref = theta_maj_mas_ref/1000.0/3600.0*np.pi/180.0 theta_min_rad_ref = theta_min_mas_ref/1000.0/3600.0*np.pi/180.0 self.Amaj_0 = ( self.r_in*(1.0 + M) * theta_maj_rad_ref/(FWHM_fac * (self.wavelength_reference/(2.0*np.pi)) * 2.0*np.pi ))**2 self.Amin_0 = ( self.r_in*(1.0 + M) * theta_min_rad_ref/(FWHM_fac * (self.wavelength_reference/(2.0*np.pi)) * 2.0*np.pi ))**2 if model == 'von_Mises': def avM_Anisotropy(kzeta): return np.abs( (kzeta*sps.i0(kzeta)/sps.i1(kzeta) - 1.0)**0.5 - A ) self.kzeta = minimize(avM_Anisotropy, A**2, method='nelder-mead', options={'xtol': 1e-8, 'disp': False}).x self.P_phi_prefac = 1.0/(2.0*np.pi*sps.i0(self.kzeta)) elif model == 'boxcar': def boxcar_Anisotropy(kzeta): return np.abs( np.sin(np.pi/(1.0 + kzeta))/(np.pi/(1.0 + kzeta)) - (theta_maj_mas_ref**2 - theta_min_mas_ref**2)/(theta_maj_mas_ref**2 + theta_min_mas_ref**2) ) self.kzeta = minimize(boxcar_Anisotropy, A, method='nelder-mead', options={'xtol': 1e-8, 'disp': False}).x self.P_phi_prefac = (1.0 + self.kzeta)/(2.0*np.pi) elif model == 'dipole': def dipole_Anisotropy(kzeta): return np.abs( sps.hyp2f1((self.scatt_alpha + 2.0)/2.0, 0.5, 2.0, -kzeta)/sps.hyp2f1((self.scatt_alpha + 2.0)/2.0, 1.5, 2.0, -kzeta) - A**2 ) self.kzeta = minimize(dipole_Anisotropy, A, method='nelder-mead', options={'xtol': 1e-8, 'disp': False}).x self.P_phi_prefac = 1.0/(2.0*np.pi*sps.hyp2f1((self.scatt_alpha + 2.0)/2.0, 0.5, 1.0, -self.kzeta)) else: print("Scattering Model Not Recognized!") # More parameters for the approximate phase structure function int_maj = integrate.quad(lambda phi_q: np.abs( np.cos( self.phi0 - phi_q ) )**self.scatt_alpha * self.P_phi(phi_q), 0, 2.0*np.pi, limit=250)[0] int_min = integrate.quad(lambda phi_q: np.abs( np.sin( self.phi0 - phi_q ) )**self.scatt_alpha * self.P_phi(phi_q), 0, 2.0*np.pi, limit=250)[0] B_prefac = self.C_scatt_0 * 2.0**(2.0 - self.scatt_alpha) * np.pi**0.5/(self.scatt_alpha * sps.gamma((self.scatt_alpha + 1.0)/2.0)) self.Bmaj_0 = B_prefac*int_maj self.Bmin_0 = B_prefac*int_min #Check normalization: #print("Checking Normalization:",integrate.quad(lambda phi_q: self.P_phi(phi_q), 0, 2.0*np.pi)[0]) return def P_phi(self, phi): if self.model == 'von_Mises': return self.P_phi_prefac * np.cosh(self.kzeta*np.cos(phi - self.phi0)) elif self.model == 'boxcar': return self.P_phi_prefac * (1.0 - ((np.pi/(2.0*(1.0 + self.kzeta)) < (phi - self.phi0) % np.pi) & ((phi - self.phi0) % np.pi < np.pi - np.pi/(2.0*(1.0 + self.kzeta))))) elif self.model == 'dipole': return self.P_phi_prefac * (1.0 + self.kzeta*np.sin(phi - self.phi0)**2)**(-(self.scatt_alpha + 2.0)/2.0)
[docs] def rF(self, wavelength): """Returns the Fresnel scale [cm] of the scattering screen at the specified wavelength [cm]. Args: wavelength (float): The desired wavelength [cm] Returns: rF (float): The Fresnel scale [cm] """ return (self.source_screen_distance*self.observer_screen_distance/(self.source_screen_distance + self.observer_screen_distance)*wavelength/(2.0*np.pi))**0.5
[docs] def Mag(self): """Returns the effective magnification the scattering screen: (observer-screen distance)/(source-screen distance). Returns: M (float): The effective magnification of the scattering screen. """ return self.observer_screen_distance/self.source_screen_distance
[docs] def dDphi_dz(self, r, phi, phi_q, wavelength): """differential contribution to the phase structure function """ return 4.0 * (wavelength/self.wavelength_reference)**2 * self.C_scatt_0/self.scatt_alpha * (sps.hyp1f1(-self.scatt_alpha/2.0, 0.5, -r**2/(4.0*self.r_in**2)*np.cos(phi - phi_q)**2) - 1.0)
def Dphi_exact(self, x, y, wavelength_cm): r = (x**2 + y**2)**0.5 phi = np.arctan2(y, x) return integrate.quad(lambda phi_q: self.dDphi_dz(r, phi, phi_q, wavelength_cm)*self.P_phi(phi_q), 0, 2.0*np.pi)[0] def Dmaj(self, r, wavelength_cm): return (wavelength_cm/self.wavelength_reference)**2 * self.Bmaj_0 * (2.0 * self.Amaj_0/(self.scatt_alpha * self.Bmaj_0))**(-self.scatt_alpha/(2.0 - self.scatt_alpha)) * ((1.0 + (2.0*self.Amaj_0/(self.scatt_alpha * self.Bmaj_0))**(2.0/(2.0 - self.scatt_alpha)) * (r/self.r_in)**2 )**(self.scatt_alpha/2.0) - 1.0) def Dmin(self, r, wavelength_cm): return (wavelength_cm/self.wavelength_reference)**2 * self.Bmin_0 * (2.0 * self.Amin_0/(self.scatt_alpha * self.Bmin_0))**(-self.scatt_alpha/(2.0 - self.scatt_alpha)) * ((1.0 + (2.0*self.Amin_0/(self.scatt_alpha * self.Bmin_0))**(2.0/(2.0 - self.scatt_alpha)) * (r/self.r_in)**2 )**(self.scatt_alpha/2.0) - 1.0) def Dphi_approx(self, x, y, wavelength_cm): r = (x**2 + y**2)**0.5 phi = np.arctan2(y, x) Dmaj_eval = self.Dmaj(r, wavelength_cm) Dmin_eval = self.Dmin(r, wavelength_cm) return (Dmaj_eval + Dmin_eval)/2.0 + (Dmaj_eval - Dmin_eval)/2.0*np.cos(2.0*(phi - self.phi0))
[docs] def Q(self, qx, qy): """Computes the power spectrum of the scattering model at a wavenumber {qx,qy} (in 1/cm). The power spectrum is part of what defines the scattering model (along with Dphi). Q(qx,qy) is independent of the observing wavelength. Args: qx (float): x coordinate of the wavenumber in 1/cm. qy (float): y coordinate of the wavenumber in 1/cm. Returns: (float): The power spectrum Q(qx,qy) """ q = (qx**2 + qy**2)**0.5 + 1e-12/self.r_in #Add a small offset to avoid division by zero phi_q = np.arctan2(qy, qx) return self.Qbar * (q*self.r_in)**(-(self.scatt_alpha + 2.0)) * np.exp(-(q * self.r_in)**2) * self.P_phi(phi_q)
[docs] def sqrtQ_Matrix(self, Reference_Image, Vx_km_per_s=50.0, Vy_km_per_s=0.0, t_hr=0.0): """Computes the square root of the power spectrum on a discrete grid. Because translation of the screen is done most conveniently in Fourier space, a screen translation can also be included. Args: Reference_Image (Image): Reference image to determine image and pixel dimensions and wavelength. Vx_km_per_s (float): Velocity of the scattering screen in the x direction (toward East) in km/s. Vy_km_per_s (float): Velocity of the scattering screen in the y direction (toward North) in km/s. t_hr (float): The current time of the scattering in hours. Returns: sqrtQ (2D complex ndarray): The square root of the power spectrum of the screen with an additional phase for rotation of the screen. """ #Derived parameters FOV = Reference_Image.psize * Reference_Image.xdim * self.observer_screen_distance #Field of view, in cm, at the scattering screen N = Reference_Image.xdim dq = 2.0*np.pi/FOV #this is the spacing in wavenumber screen_x_offset_pixels = (Vx_km_per_s * 1.e5) * (t_hr*3600.0) / (FOV/float(N)) screen_y_offset_pixels = (Vy_km_per_s * 1.e5) * (t_hr*3600.0) / (FOV/float(N)) s, t = np.meshgrid(np.fft.fftfreq(N, d=1.0/N), np.fft.fftfreq(N, d=1.0/N)) sqrtQ = np.sqrt(self.Q(dq*s, dq*t)) * np.exp(2.0*np.pi*1j*(s*screen_x_offset_pixels + t*screen_y_offset_pixels)/float(N)) sqrtQ[0][0] = 0.0 #A DC offset doesn't affect scattering return sqrtQ
[docs] def Ensemble_Average_Kernel(self, Reference_Image, wavelength_cm = None, use_approximate_form=True): """The ensemble-average convolution kernel for images; returns a 2D array corresponding to the image dimensions of the reference image Args: Reference_Image (Image): Reference image to determine image and pixel dimensions and wavelength. wavelength_cm (float): The observing wavelength for the scattering kernel in cm. If unspecified, this will default to the wavelength of the Reference image. Returns: ker (2D ndarray): The ensemble-average scattering kernel in the image domain. """ if wavelength_cm == None: wavelength_cm = C/Reference_Image.rf*100.0 #Observing wavelength [cm] uvlist = np.fft.fftfreq(Reference_Image.xdim)/Reference_Image.psize # assume square kernel. FIXME: create ulist and vlist, and construct u_grid and v_grid with the correct dimension if use_approximate_form == True: u_grid, v_grid = np.meshgrid(uvlist, uvlist) ker_uv = self.Ensemble_Average_Kernel_Visibility(u_grid, v_grid, wavelength_cm, use_approximate_form=use_approximate_form) else: ker_uv = np.array([[self.Ensemble_Average_Kernel_Visibility(u, v, wavelength_cm, use_approximate_form=use_approximate_form) for u in uvlist] for v in uvlist]) ker = np.real(np.fft.fftshift(np.fft.fft2(ker_uv))) ker = ker / np.sum(ker) # normalize to 1 return ker
[docs] def Ensemble_Average_Kernel_Visibility(self, u, v, wavelength_cm, use_approximate_form=True): """The ensemble-average multiplicative scattering kernel for visibilities at a particular {u,v} coordinate Args: u (float): u baseline coordinate (dimensionless) v (float): v baseline coordinate (dimensionless) wavelength_cm (float): The observing wavelength for the scattering kernel in cm. Returns: float: The ensemble-average kernel at the specified {u,v} point and observing wavelength. """ if use_approximate_form == True: return np.exp(-0.5*self.Dphi_approx(u*wavelength_cm/(1.0+self.Mag()), v*wavelength_cm/(1.0+self.Mag()), wavelength_cm)) else: return np.exp(-0.5*self.Dphi_exact(u*wavelength_cm/(1.0+self.Mag()), v*wavelength_cm/(1.0+self.Mag()), wavelength_cm))
[docs] def Ensemble_Average_Blur(self, im, wavelength_cm = None, ker = None, use_approximate_form=True): """Blurs an input Image with the ensemble-average scattering kernel. Args: im (Image): The unscattered image. wavelength_cm (float): The observing wavelength for the scattering kernel in cm. If unspecified, this will default to the wavelength of the input image. ker (2D ndarray): The user can optionally pass a pre-computed ensemble-average blurring kernel. Returns: out (Image): The ensemble-average scattered image. """ # Inputs an unscattered image and an ensemble-average blurring kernel (2D array); returns the ensemble-average image # The pre-computed kernel can optionally be specified (ker) if wavelength_cm == None: wavelength_cm = C/im.rf*100.0 #Observing wavelength [cm] if ker is None: ker = self.Ensemble_Average_Kernel(im, wavelength_cm, use_approximate_form) Iim = Wrapped_Convolve((im.imvec).reshape(im.ydim, im.xdim), ker) out = image.Image(Iim, im.psize, im.ra, im.dec, rf=C/(wavelength_cm/100.0), source=im.source, mjd=im.mjd, pulse=im.pulse) if len(im.qvec): Qim = Wrapped_Convolve((im.qvec).reshape(im.ydim, im.xdim), ker) Uim = Wrapped_Convolve((im.uvec).reshape(im.ydim, im.xdim), ker) out.add_qu(Qim, Uim) if len(im.vvec): Vim = Wrapped_Convolve((im.vvec).reshape(im.ydim, im.xdim), ker) out.add_v(Vim) return out
[docs] def Deblur_obs(self, obs, use_approximate_form=True): """Deblurs the observation obs by dividing visibilities by the ensemble-average scattering kernel. See Fish et al. (2014): arXiv:1409.4690. Args: obs (Obsdata): The observervation data (including scattering). Returns: obsdeblur (Obsdata): The deblurred observation. """ # make a copy of observation data datatable = (obs.copy()).data vis = datatable['vis'] qvis = datatable['qvis'] uvis = datatable['uvis'] vvis = datatable['vvis'] sigma = datatable['sigma'] qsigma = datatable['qsigma'] usigma = datatable['usigma'] vsigma = datatable['vsigma'] u = datatable['u'] v = datatable['v'] # divide visibilities by the scattering kernel for i in range(len(vis)): ker = self.Ensemble_Average_Kernel_Visibility(u[i], v[i], wavelength_cm = C/obs.rf*100.0, use_approximate_form=use_approximate_form) vis[i] = vis[i] / ker qvis[i] = qvis[i] / ker uvis[i] = uvis[i] / ker vvis[i] = vvis[i] / ker sigma[i] = sigma[i] / ker qsigma[i] = qsigma[i] / ker usigma[i] = usigma[i] / ker vsigma[i] = vsigma[i] / ker datatable['vis'] = vis datatable['qvis'] = qvis datatable['uvis'] = uvis datatable['vvis'] = vvis datatable['sigma'] = sigma datatable['qsigma'] = qsigma datatable['usigma'] = usigma datatable['vsigma'] = vsigma obsdeblur = obsdata.Obsdata(obs.ra, obs.dec, obs.rf, obs.bw, datatable, obs.tarr, source=obs.source, mjd=obs.mjd, ampcal=obs.ampcal, phasecal=obs.phasecal, opacitycal=obs.opacitycal, dcal=obs.dcal, frcal=obs.frcal) return obsdeblur
[docs] def MakePhaseScreen(self, EpsilonScreen, Reference_Image, obs_frequency_Hz=0.0, Vx_km_per_s=50.0, Vy_km_per_s=0.0, t_hr=0.0, sqrtQ_init=None): """Create a refractive phase screen from standardized Fourier components (the EpsilonScreen). All lengths should be specified in centimeters If the observing frequency (obs_frequency_Hz) is not specified, then it will be taken to be equal to the frequency of the Reference_Image Note: an odd image dimension is required! Args: EpsilonScreen (2D ndarray): Optionally, the scattering screen can be specified. If none is given, a random one will be generated. Reference_Image (Image): The reference image. obs_frequency_Hz (float): The observing frequency, in Hz. By default, it will be taken to be equal to the frequency of the Unscattered_Image. Vx_km_per_s (float): Velocity of the scattering screen in the x direction (toward East) in km/s. Vy_km_per_s (float): Velocity of the scattering screen in the y direction (toward North) in km/s. t_hr (float): The current time of the scattering in hours. ea_ker (2D ndarray): The used can optionally pass a precomputed array of the ensemble-average blurring kernel. sqrtQ_init (2D ndarray): The used can optionally pass a precomputed array of the square root of the power spectrum. Returns: phi_Image (Image): The phase screen. """ #Observing wavelength if obs_frequency_Hz == 0.0: obs_frequency_Hz = Reference_Image.rf wavelength = C/obs_frequency_Hz*100.0 #Observing wavelength [cm] wavelengthbar = wavelength/(2.0*np.pi) #lambda/(2pi) [cm] #Derived parameters FOV = Reference_Image.psize * Reference_Image.xdim * self.observer_screen_distance #Field of view, in cm, at the scattering screen rF = self.rF(wavelength) Nx = EpsilonScreen.shape[1] Ny = EpsilonScreen.shape[0] # if Nx%2 == 0: # print("The image dimension should really be odd...") #Now we'll calculate the power spectrum for each pixel in Fourier space screen_x_offset_pixels = (Vx_km_per_s*1.e5) * (t_hr*3600.0) / (FOV/float(Nx)) screen_y_offset_pixels = (Vy_km_per_s*1.e5) * (t_hr*3600.0) / (FOV/float(Nx)) if sqrtQ_init is None: sqrtQ = self.sqrtQ_Matrix(Reference_Image, Vx_km_per_s=Vx_km_per_s, Vy_km_per_s=Vy_km_per_s, t_hr=t_hr) else: #If a matrix for sqrtQ_init is passed, we still potentially need to rotate it if screen_x_offset_pixels != 0.0 or screen_y_offset_pixels != 0.0: s, t = np.meshgrid(np.fft.fftfreq(Nx, d=1.0/Nx), np.fft.fftfreq(Ny, d=1.0/Ny)) sqrtQ = sqrtQ_init * np.exp(2.0*np.pi*1j*(s*screen_x_offset_pixels + t*screen_y_offset_pixels)/float(Nx)) else: sqrtQ = sqrtQ_init #Now calculate the phase screen phi = np.real(wavelengthbar/FOV*EpsilonScreen.shape[0]*EpsilonScreen.shape[1]*np.fft.ifft2(sqrtQ*EpsilonScreen)) phi_Image = image.Image(phi, Reference_Image.psize, Reference_Image.ra, Reference_Image.dec, rf=Reference_Image.rf, source=Reference_Image.source, mjd=Reference_Image.mjd) return phi_Image
[docs] def Scatter2(self, args, kwargs): """Call self.Scatter with expanded args and kwargs.""" return self.Scatter(*args, **kwargs)
[docs] def Scatter(self, Unscattered_Image, Epsilon_Screen=np.array([]), obs_frequency_Hz=0.0, Vx_km_per_s=50.0, Vy_km_per_s=0.0, t_hr=0.0, ea_ker=None, sqrtQ=None, Linearized_Approximation=False, DisplayImage=False, Force_Positivity=False, use_approximate_form=True): """Scatter an image using the specified epsilon screen. All lengths should be specified in centimeters If the observing frequency (obs_frequency_Hz) is not specified, then it will be taken to be equal to the frequency of the Unscattered_Image Note: an odd image dimension is required! Args: Unscattered_Image (Image): The unscattered image. Epsilon_Screen (2D ndarray): Optionally, the scattering screen can be specified. If none is given, a random one will be generated. obs_frequency_Hz (float): The observing frequency, in Hz. By default, it will be taken to be equal to the frequency of the Unscattered_Image. Vx_km_per_s (float): Velocity of the scattering screen in the x direction (toward East) in km/s. Vy_km_per_s (float): Velocity of the scattering screen in the y direction (toward North) in km/s. t_hr (float): The current time of the scattering in hours. ea_ker (2D ndarray): The used can optionally pass a precomputed array of the ensemble-average blurring kernel. sqrtQ (2D ndarray): The used can optionally pass a precomputed array of the square root of the power spectrum. Linearized_Approximation (bool): If True, uses a linearized approximation for the scattering (Eq. 10 of Johnson & Narayan 2016). If False, uses Eq. 9 of that paper. DisplayImage (bool): If True, show a plot of the unscattered, ensemble-average, and scattered images as well as the phase screen. Force_Positivity (bool): If True, eliminates negative flux from the scattered image from the linearized approximation. Return_Image_List (bool): If True, returns a list of the scattered frames. If False, returns a movie object. Returns: AI_Image (Image): The scattered image. """ #Observing wavelength if obs_frequency_Hz == 0.0: obs_frequency_Hz = Unscattered_Image.rf wavelength = C/obs_frequency_Hz*100.0 #Observing wavelength [cm] wavelengthbar = wavelength/(2.0*np.pi) #lambda/(2pi) [cm] #Derived parameters FOV = Unscattered_Image.psize * Unscattered_Image.xdim * self.observer_screen_distance #Field of view, in cm, at the scattering screen rF = self.rF(wavelength) Nx = Unscattered_Image.xdim Ny = Unscattered_Image.ydim #First we need to calculate the ensemble-average image by blurring the unscattered image with the correct kernel EA_Image = self.Ensemble_Average_Blur(Unscattered_Image, wavelength, ker = ea_ker, use_approximate_form=use_approximate_form) # If no epsilon screen is specified, then generate a random realization if Epsilon_Screen.shape[0] == 0: Epsilon_Screen = MakeEpsilonScreen(Nx, Ny) #We'll now calculate the phase screen. phi_Image = self.MakePhaseScreen(Epsilon_Screen, Unscattered_Image, obs_frequency_Hz, Vx_km_per_s=Vx_km_per_s, Vy_km_per_s=Vy_km_per_s, t_hr=t_hr, sqrtQ_init=sqrtQ) phi = phi_Image.imvec.reshape(Ny,Nx) #Next, we need the gradient of the ensemble-average image phi_Gradient = Wrapped_Gradient(phi/(FOV/Nx)) #The gradient signs don't actually matter, but let's make them match intuition (i.e., right to left, bottom to top) phi_Gradient_x = -phi_Gradient[1] phi_Gradient_y = -phi_Gradient[0] if Linearized_Approximation == True: #Use Equation 10 of Johnson & Narayan (2016) #Calculate the gradient of the ensemble-average image EA_Gradient = Wrapped_Gradient((EA_Image.imvec/(FOV/Nx)).reshape(EA_Image.ydim, EA_Image.xdim)) #The gradient signs don't actually matter, but let's make them match intuition (i.e., right to left, bottom to top) EA_Gradient_x = -EA_Gradient[1] EA_Gradient_y = -EA_Gradient[0] #Now we can patch together the average image AI = (EA_Image.imvec).reshape(Ny,Nx) + rF**2.0 * ( EA_Gradient_x*phi_Gradient_x + EA_Gradient_y*phi_Gradient_y ) if len(Unscattered_Image.qvec): # Scatter the Q image EA_Gradient = Wrapped_Gradient((EA_Image.qvec/(FOV/Nx)).reshape(EA_Image.ydim, EA_Image.xdim)) EA_Gradient_x = -EA_Gradient[1] EA_Gradient_y = -EA_Gradient[0] AI_Q = (EA_Image.qvec).reshape(Ny,Nx) + rF**2.0 * ( EA_Gradient_x*phi_Gradient_x + EA_Gradient_y*phi_Gradient_y ) # Scatter the U image EA_Gradient = Wrapped_Gradient((EA_Image.uvec/(FOV/Nx)).reshape(EA_Image.ydim, EA_Image.xdim)) EA_Gradient_x = -EA_Gradient[1] EA_Gradient_y = -EA_Gradient[0] AI_U = (EA_Image.uvec).reshape(Ny,Nx) + rF**2.0 * ( EA_Gradient_x*phi_Gradient_x + EA_Gradient_y*phi_Gradient_y ) if len(Unscattered_Image.vvec): # Scatter the V image EA_Gradient = Wrapped_Gradient((EA_Image.vvec/(FOV/Nx)).reshape(EA_Image.ydim, EA_Image.xdim)) EA_Gradient_x = -EA_Gradient[1] EA_Gradient_y = -EA_Gradient[0] AI_V = (EA_Image.vvec).reshape(Ny,Nx) + rF**2.0 * ( EA_Gradient_x*phi_Gradient_x + EA_Gradient_y*phi_Gradient_y ) else: #Use Equation 9 of Johnson & Narayan (2016) EA_im = (EA_Image.imvec).reshape(Ny,Nx) AI = np.copy((EA_Image.imvec).reshape(Ny,Nx)) if len(Unscattered_Image.qvec): AI_Q = np.copy((EA_Image.imvec).reshape(Ny,Nx)) AI_U = np.copy((EA_Image.imvec).reshape(Ny,Nx)) EA_im_Q = (EA_Image.qvec).reshape(Ny,Nx) EA_im_U = (EA_Image.uvec).reshape(Ny,Nx) if len(Unscattered_Image.vvec): AI_V = np.copy((EA_Image.imvec).reshape(Ny,Nx)) EA_im_V = (EA_Image.vvec).reshape(Ny,Nx) for rx in range(Nx): for ry in range(Ny): # Annoyingly, the signs here must be negative to match the other approximation. I'm not sure which is correct, but it really shouldn't matter anyway because -phi has the same power spectrum as phi. However, getting the *relative* sign for the x- and y-directions correct is important. rxp = int(np.round(rx - rF**2.0 * phi_Gradient_x[ry,rx]/self.observer_screen_distance/Unscattered_Image.psize))%Nx ryp = int(np.round(ry - rF**2.0 * phi_Gradient_y[ry,rx]/self.observer_screen_distance/Unscattered_Image.psize))%Ny AI[ry,rx] = EA_im[ryp,rxp] if len(Unscattered_Image.qvec): AI_Q[ry,rx] = EA_im_Q[ryp,rxp] AI_U[ry,rx] = EA_im_U[ryp,rxp] if len(Unscattered_Image.vvec): AI_V[ry,rx] = EA_im_V[ryp,rxp] #Optional: eliminate negative flux if Force_Positivity == True: AI = abs(AI) #Make it into a proper image format AI_Image = image.Image(AI, EA_Image.psize, EA_Image.ra, EA_Image.dec, rf=EA_Image.rf, source=EA_Image.source, mjd=EA_Image.mjd) if len(Unscattered_Image.qvec): AI_Image.add_qu(AI_Q, AI_U) if len(Unscattered_Image.vvec): AI_Image.add_v(AI_V) if DisplayImage: plot_scatt(Unscattered_Image.imvec, EA_Image.imvec, AI_Image.imvec, phi_Image.imvec, Unscattered_Image, 0, 0, ipynb=False) return AI_Image
[docs] def Scatter_Movie(self, Unscattered_Movie, Epsilon_Screen=np.array([]), obs_frequency_Hz=0.0, Vx_km_per_s=50.0, Vy_km_per_s=0.0, framedur_sec=None, N_frames = None, ea_ker=None, sqrtQ=None, Linearized_Approximation=False, Force_Positivity=False, Return_Image_List=False, processes=0): """Scatter a movie using the specified epsilon screen. The movie can either be a movie object, an image list, or a static image If scattering a list of images or static image, the frame duration in seconds (framedur_sec) must be specified If scattering a static image, the total number of frames must be specified (N_frames) All lengths should be specified in centimeters If the observing frequency (obs_frequency_Hz) is not specified, then it will be taken to be equal to the frequency of the Unscattered_Movie Note: an odd image dimension is required! Args: Unscattered_Movie: This can be a movie object, an image list, or a static image Epsilon_Screen (2D ndarray): Optionally, the scattering screen can be specified. If none is given, a random one will be generated. obs_frequency_Hz (float): The observing frequency, in Hz. By default, it will be taken to be equal to the frequency of the Unscattered_Movie. Vx_km_per_s (float): Velocity of the scattering screen in the x direction (toward East) in km/s. Vy_km_per_s (float): Velocity of the scattering screen in the y direction (toward North) in km/s. framedur_sec (float): Duration of each frame, in seconds. Only needed if Unscattered_Movie is not a movie object. N_frames (int): Total number of frames. Only needed if Unscattered_Movie is a static image object. ea_ker (2D ndarray): The used can optionally pass a precomputed array of the ensemble-average blurring kernel. sqrtQ (2D ndarray): The used can optionally pass a precomputed array of the square root of the power spectrum. Linearized_Approximation (bool): If True, uses a linearized approximation for the scattering (Eq. 10 of Johnson & Narayan 2016). If False, uses Eq. 9 of that paper. Force_Positivity (bool): If True, eliminates negative flux from the scattered image from the linearized approximation. Return_Image_List (bool): If True, returns a list of the scattered frames. If False, returns a movie object. processes (int): Number of cores to use in multiprocessing. Default value (0) means no multiprocessing. Uses all available cores if processes < 0. Returns: Scattered_Movie: Either a movie object or a list of images, depending on the flag Return_Image_List. """ print("Warning!! assuming a constant frame duration, but Movie objects now support unequally spaced frames!") if type(Unscattered_Movie) != movie.Movie and framedur_sec is None: print("If scattering a list of images or static image, the framedur must be specified!") return if type(Unscattered_Movie) == image.Image and N_frames is None: print("If scattering a static image, the total number of frames must be specified (N_frames)!") return # time list in hr if hasattr(Unscattered_Movie, 'times'): tlist_hr = Unscattered_Movie.times else: tlist_hr = [framedur_sec/3600.0*j for j in range(N_frames)] if type(Unscattered_Movie) == movie.Movie: N = Unscattered_Movie.xdim N_frames = len(Unscattered_Movie.frames) psize = Unscattered_Movie.psize ra = Unscattered_Movie.ra dec = Unscattered_Movie.dec rf = Unscattered_Movie.rf pulse=Unscattered_Movie.pulse source=Unscattered_Movie.source mjd=Unscattered_Movie.mjd start_hr=Unscattered_Movie.start_hr has_pol = len(Unscattered_Movie.qframes) has_circ_pol = len(Unscattered_Movie.vframes) elif type(Unscattered_Movie) == list: N = Unscattered_Movie[0].xdim N_frames = len(Unscattered_Movie) psize = Unscattered_Movie[0].psize ra = Unscattered_Movie[0].ra dec = Unscattered_Movie[0].dec rf = Unscattered_Movie[0].rf pulse=Unscattered_Movie[0].pulse source=Unscattered_Movie[0].source mjd=Unscattered_Movie[0].mjd start_hr=0.0 has_pol = len(Unscattered_Movie[0].qvec) has_circ_pol = len(Unscattered_Movie[0].vvec) else: N = Unscattered_Movie.xdim psize = Unscattered_Movie.psize ra = Unscattered_Movie.ra dec = Unscattered_Movie.dec rf = Unscattered_Movie.rf pulse=Unscattered_Movie.pulse source=Unscattered_Movie.source mjd=Unscattered_Movie.mjd start_hr=0.0 has_pol = len(Unscattered_Movie.qvec) has_circ_pol = len(Unscattered_Movie.vvec) def get_frame(j): if type(Unscattered_Movie) == movie.Movie: im = image.Image(Unscattered_Movie.frames[j].reshape((N,N)), psize=psize, ra=ra, dec=dec, rf=rf, pulse=pulse, source=source, mjd=mjd) if len(Unscattered_Movie.qframes) > 0: im.add_qu(Unscattered_Movie.qframes[j].reshape((N,N)), Unscattered_Movie.uframes[j].reshape((N,N))) if len(Unscattered_Movie.vframes) > 0: im.add_v(Unscattered_Movie.vframes[j].reshape((N,N))) return im elif type(Unscattered_Movie) == list: return Unscattered_Movie[j] else: return Unscattered_Movie #If it isn't specified, calculate the matrix sqrtQ for efficiency if sqrtQ is None: sqrtQ = self.sqrtQ_Matrix(get_frame(0)) # If no epsilon screen is specified, then generate a random realization if Epsilon_Screen.shape[0] == 0: Epsilon_Screen = MakeEpsilonScreen(N, N) # possibly parallelize if processes < 0: processes = cpu_count() processes = min(processes, N_frames) # generate scattered images if processes > 0: pool = Pool(processes=processes) args = [ ( [get_frame(j), Epsilon_Screen], dict(obs_frequency_Hz = obs_frequency_Hz, Vx_km_per_s = Vx_km_per_s, Vy_km_per_s = Vy_km_per_s, t_hr=tlist_hr[j], sqrtQ=sqrtQ, Linearized_Approximation=Linearized_Approximation, Force_Positivity=Force_Positivity) ) for j in range(N_frames) ] scattered_im_List = pool.starmap(self.Scatter2, args) pool.close() pool.join() else: scattered_im_List = [self.Scatter(get_frame(j), Epsilon_Screen, obs_frequency_Hz = obs_frequency_Hz, Vx_km_per_s = Vx_km_per_s, Vy_km_per_s = Vy_km_per_s, t_hr=tlist_hr[j], ea_ker=ea_ker, sqrtQ=sqrtQ, Linearized_Approximation=Linearized_Approximation, Force_Positivity=Force_Positivity) for j in range(N_frames)] if Return_Image_List == True: return scattered_im_List Scattered_Movie = movie.Movie( [im.imvec.reshape((im.xdim,im.ydim)) for im in scattered_im_List], times=tlist_hr, psize = psize, ra = ra, dec = dec, rf=rf, pulse=pulse, source=source, mjd=mjd) if has_pol: Scattered_Movie_Q = [im.qvec.reshape((im.xdim,im.ydim)) for im in scattered_im_List] Scattered_Movie_U = [im.uvec.reshape((im.xdim,im.ydim)) for im in scattered_im_List] Scattered_Movie.add_qu(Scattered_Movie_Q, Scattered_Movie_U) if has_circ_pol: Scattered_Movie_V = [im.vvec.reshape((im.xdim,im.ydim)) for im in scattered_im_List] Scattered_Movie.add_v(Scattered_Movie_V) return Scattered_Movie
################################################################################ # These are helper functions ################################################################################ def Wrapped_Convolve(sig,ker): N = sig.shape[0] return scipy.signal.fftconvolve(np.pad(sig,((N, N), (N, N)), 'wrap'), np.pad(ker,((N, N), (N, N)), 'constant'),mode='same')[N:(2*N),N:(2*N)] def Wrapped_Gradient(M): G = np.gradient(np.pad(M,((1, 1), (1, 1)), 'wrap')) Gx = G[0][1:-1,1:-1] Gy = G[1][1:-1,1:-1] return (Gx, Gy) def MakeEpsilonScreenFromList(EpsilonList, N): epsilon = np.zeros((N,N),dtype=np.complex) #If N is odd: there are (N^2-1)/2 real elements followed by their corresponding (N^2-1)/2 imaginary elements #If N is even: there are (N^2+2)/2 of each, although 3 of these must be purely real, also giving a total of N^2-1 degrees of freedom #This is because of conjugation symmetry in Fourier space to ensure a real Fourier transform #The first (N-1)/2 are the top row N_re = (N*N-1)//2 # FIXME: check logic if N is even i = 0 for x in range(1,(N+1)//2): # FIXME: check logic if N is even epsilon[0][x] = EpsilonList[i] + 1j * EpsilonList[i+N_re] epsilon[0][N-x] = np.conjugate(epsilon[0][x]) i=i+1 #The next N(N-1)/2 are filling the next N rows for y in range(1,(N+1)//2): # FIXME: check logic if N is even for x in range(N): epsilon[y][x] = EpsilonList[i] + 1j * EpsilonList[i+N_re] x2 = N - x y2 = N - y if x2 == N: x2 = 0 if y2 == N: y2 = 0 epsilon[y2][x2] = np.conjugate(epsilon[y][x]) i=i+1 return epsilon
[docs]def MakeEpsilonScreen(Nx, Ny, rngseed = 0): """Create a standardized Fourier representation of a scattering screen Args: Nx (int): Number of pixels in the x direction Ny (int): Number of pixels in the y direction rngseed (int): Seed for the random number generator Returns: epsilon: A 2D numpy ndarray. """ if rngseed != 0: np.random.seed( rngseed ) epsilon = np.random.normal(loc=0.0, scale=1.0/math.sqrt(2), size=(Ny,Nx)) + 1j * np.random.normal(loc=0.0, scale=1.0/math.sqrt(2), size=(Ny,Nx)) # The zero frequency doesn't affect scattering epsilon[0][0] = 0.0 #Now let's ensure that it has the necessary conjugation symmetry if Nx%2 == 0: epsilon[0][Nx//2] = np.real(epsilon[0][Nx//2]) if Ny%2 == 0: epsilon[Ny//2][0] = np.real(epsilon[Ny//2][0]) if Nx%2 == 0 and Ny%2 == 0: epsilon[Ny//2][Nx//2] = np.real(epsilon[Ny//2][Nx//2]) for x in range(Nx): if x > (Nx-1)//2: epsilon[0][x] = np.conjugate(epsilon[0][Nx-x]) for y in range((Ny-1)//2, Ny): x2 = Nx - x y2 = Ny - y if x2 == Nx: x2 = 0 if y2 == Ny: y2 = 0 epsilon[y][x] = np.conjugate(epsilon[y2][x2]) return epsilon
################################################################################################## # Plotting Functions ################################################################################################## def plot_scatt(im_unscatt, im_ea, im_scatt, im_phase, Prior, nit, chi2, ipynb=False): # Get vectors and ratio from current image x = np.array([[i for i in range(Prior.xdim)] for j in range(Prior.ydim)]) y = np.array([[j for i in range(Prior.xdim)] for j in range(Prior.ydim)]) # Create figure and title plt.ion() plt.clf() if chi2 > 0.0: plt.suptitle("step: %i $\chi^2$: %f " % (nit, chi2), fontsize=20) # Unscattered Image plt.subplot(141) plt.imshow(im_unscatt.reshape(Prior.ydim, Prior.xdim), cmap=plt.get_cmap('afmhot'), interpolation='gaussian', vmin=0) xticks = ticks(Prior.xdim, Prior.psize/RADPERAS/1e-6) yticks = ticks(Prior.ydim, Prior.psize/RADPERAS/1e-6) plt.xticks(xticks[0], xticks[1]) plt.yticks(yticks[0], yticks[1]) plt.xlabel('Relative RA ($\mu$as)') plt.ylabel('Relative Dec ($\mu$as)') plt.title('Unscattered') # Ensemble Average plt.subplot(142) plt.imshow(im_ea.reshape(Prior.ydim, Prior.xdim), cmap=plt.get_cmap('afmhot'), interpolation='gaussian', vmin=0) xticks = ticks(Prior.xdim, Prior.psize/RADPERAS/1e-6) yticks = ticks(Prior.ydim, Prior.psize/RADPERAS/1e-6) plt.xticks(xticks[0], xticks[1]) plt.yticks(yticks[0], yticks[1]) plt.xlabel('Relative RA ($\mu$as)') plt.ylabel('Relative Dec ($\mu$as)') plt.title('Ensemble Average') # Scattered plt.subplot(143) plt.imshow(im_scatt.reshape(Prior.ydim, Prior.xdim), cmap=plt.get_cmap('afmhot'), interpolation='gaussian', vmin=0) xticks = ticks(Prior.xdim, Prior.psize/RADPERAS/1e-6) yticks = ticks(Prior.ydim, Prior.psize/RADPERAS/1e-6) plt.xticks(xticks[0], xticks[1]) plt.yticks(yticks[0], yticks[1]) plt.xlabel('Relative RA ($\mu$as)') plt.ylabel('Relative Dec ($\mu$as)') plt.title('Average Image') # Phase plt.subplot(144) plt.imshow(im_phase.reshape(Prior.ydim, Prior.xdim), cmap=plt.get_cmap('afmhot'), interpolation='gaussian') xticks = ticks(Prior.xdim, Prior.psize/RADPERAS/1e-6) yticks = ticks(Prior.ydim, Prior.psize/RADPERAS/1e-6) plt.xticks(xticks[0], xticks[1]) plt.yticks(yticks[0], yticks[1]) plt.xlabel('Relative RA ($\mu$as)') plt.ylabel('Relative Dec ($\mu$as)') plt.title('Phase Screen') # Display plt.draw()