Source code for ehtim.statistics.stats

# stats.py
# variety of statistical functions useful for 
#
#    Copyright (C) 2018 Maciek Wielgus
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import division
from __future__ import print_function
from builtins import str
from builtins import map
from builtins import range

import numpy as np
import numpy.random as npr
import sys

from ehtim.const_def import *

[docs]def circular_mean(theta, unit='deg'): '''circular mean for averaging angular quantities Args: theta: list/vector of angles to average unit: degrees ('deg') or radians (any other string) Returns: circular mean ''' theta = np.asarray(theta, dtype=np.float32) theta= theta.flatten() theta = theta[theta==theta] if unit=='deg': theta *= np.pi/180. if len(theta)==0: return None else: C = np.mean(np.cos(theta)) S = np.mean(np.sin(theta)) circ_mean = np.arctan2(S,C) if unit=='deg': circ_mean *= 180./np.pi return np.mod(circ_mean+180.,360.)-180. else: return np.mod(circ_mean+np.pi,2.*np.pi)-np.pi
[docs]def circular_std(theta,unit='deg'): '''standard deviation of a circular distribution Args: theta: list/vector of angles unit: degrees ('deg') or radians (any other string) Returns: circular standard deviation ''' theta = np.asarray(theta, dtype=np.float32) theta= theta.flatten() theta = theta[theta==theta] if unit=='deg': theta *= np.pi/180. if len(theta)<2: return None else: C = np.mean(np.cos(theta)) S = np.mean(np.sin(theta)) circ_std = np.sqrt(-2.*np.log(np.sqrt(C**2+S**2))) if unit=='deg': circ_std *= 180./np.pi return circ_std
[docs]def circular_std_of_mean(theta,unit='deg'): '''standard deviation of mean for a circular distribution Args: theta: list/vector of angles unit: degrees ('deg') or radians (any other string) Returns: circular standard deviation of mean ''' theta = np.asarray(theta, dtype=np.float32) theta= theta.flatten() theta = theta[theta==theta] return circular_std(theta,unit)/np.sqrt(len(theta))
[docs]def mean_incoh_amp(amp,sigma,debias=True,err_type='predicted',num_samples=int(1e3)): """amplitude from ensemble of Rice-distributed measurements with debiasing Args: amp: vector of (biased) amplitudes sigma: vector of errors debias: whether debiasing is applied Returns: amp0: estimator of unbiased amplitude """ if (not hasattr(amp, "__len__")): amp = [amp] amp = np.asarray(amp, dtype=np.float32) N = len(amp) if (not hasattr(sigma, "__len__")): sigma = sigma*np.ones(N) elif len(sigma)==1: sigma = sigma*np.ones(N) sigma = np.asarray(sigma, dtype=np.float32) if len(sigma)!=len(amp): print('Inconsistent length of amp and sigma') return None else: amp_clean=amp[(amp==amp)&(sigma==sigma)&(sigma>0)&(amp>0)] sigma_clean=sigma[(amp==amp)&(sigma==sigma)&(sigma>0)&(amp>0)] #eq. 9.86 from Thompson et al. if debias==True: amp0sq = ( np.mean(amp_clean**2 - (2. - 1./N)*sigma_clean**2) ) else: amp0sq = np.mean(amp_clean**2) amp0sq = np.maximum(amp0sq,0.) amp0 = np.sqrt(amp0sq) #getting errors if err_type=='predicted': sigma0 = np.sqrt(np.sum(sigma_clean**2)/len(sigma_clean)**2) elif err_type=='measured': ampfoo, ci = bootstrap(amp_clean, np.mean, num_samples=num_samples,wrapping_variable=False) sigma0 = 0.5*(ci[1]-ci[0]) return amp0,sigma0
[docs]def mean_incoh_amp_from_vis(vis,sigma,debias=True,err_type='predicted',num_samples=int(1e3)): """Amplitude from ensemble of visibility measurements with debiasing Args: amp: vector of (biased) amplitudes sigma: vector of errors debias: whether debiasing is applied Returns: amp0: estimator of unbiased amplitude """ if (not hasattr(vis, "__len__")): vis = [vis] vis= np.asarray(vis) vis= vis[vis==vis] amp=np.abs(vis) N = len(amp) if (not hasattr(sigma, "__len__")): sigma = sigma*np.ones(N) elif len(sigma)==1: sigma = sigma*np.ones(N) sigma = np.asarray(sigma, dtype=np.float32) if len(sigma)!=len(amp): print('Inconsistent length of amp and sigma') return None, None else: amp_clean=amp[(amp==amp)&(sigma==sigma)&(sigma>=0)&(amp>=0)] sigma_clean=sigma[(amp==amp)&(sigma==sigma)&(sigma>=0)&(amp>=0)] Nc=len(amp_clean) if Nc<1: return None, None else: #eq. 9.86 from Thompson et al. if debias==True: amp0sq = ( np.mean(amp_clean**2 - (2. - 1./Nc)*sigma_clean**2) ) else: amp0sq = np.mean(amp_clean**2) if (amp0sq!=amp0sq): amp0sq=0. amp0sq = np.maximum(amp0sq,0.) amp0 = np.sqrt(amp0sq) #getting errors if err_type=='predicted': #sigma0 = np.sqrt(np.sum(sigma_clean**2)/Nc**2) #Esigma = np.median(sigma_clean) #snr0 = amp0/Esigma #snrA = 1./(np.sqrt(1. + 2./np.sqrt(Nc)*(1./snr0)*np.sqrt(1.+1./snr0**2)) - 1.) #sigma0=amp0/snrA sigma0 = np.sqrt(np.sum(sigma_clean**2)/Nc**2) elif err_type=='measured': ampfoo, ci = bootstrap(amp_clean, np.mean, num_samples=num_samples,wrapping_variable=False,alpha='1sig') sigma0 = 0.5*(ci[1]-ci[0]) return amp0,sigma0
[docs]def bootstrap(data, statistic, num_samples=int(1e3), alpha='1sig',wrapping_variable=False): """bootstrap estimate of 100.0*(1-alpha) confidence interval for a given statistic Args: data: vector of data to estimate bootstrap statistic on statistic: function representing the statistic to be evaluated num_samples: number of bootstrap (re)samples alpha: parameter of the confidence interval, '1s' gets an analog of 1 sigma confidence for a normal variable wrapping_variable: True for circular variables, attempts to avoid problem related to estimating variability of wrapping variable Returns: bootstrap_value: bootstrap-estimated value of the statistic bootstrap_CI: bootstrap-estimated confidence interval """ if alpha=='1sig': alpha=0.3173 elif alpha=='2sig': alpha=0.0455 elif alpha=='3sig': alpha=0.0027 stat = np.zeros(num_samples) data = np.asarray(data) if wrapping_variable==True: m=statistic(data) else: m=0 data = data-m n = len(data) idx = npr.randint(0, n, (num_samples, n)) samples = data[idx] for cou in range(num_samples): stat[cou] = statistic(samples[cou,:]) stat = np.sort(stat) bootstrap_value = np.median(stat)+m bootstrap_CI = [stat[int((alpha/2.0)*num_samples)]+m, stat[int((1-alpha/2.0)*num_samples)]+m] return bootstrap_value, bootstrap_CI
def mean_incoh_avg(x,debias=True): amp = np.abs(np.asarray([y[0] for y in x])) sig = np.asarray([y[1] for y in x]) ampN = amp[(amp==amp)&(amp>=0)&(sig==sig)&(sig>=0)] sigN = sig[(amp==amp)&(amp>=0)&(sig==sig)&(sig>=0)] amp = ampN sig = sigN Nc = len(sig) if Nc==0: amp0 = -1 sig0 = -1 elif Nc==1: amp0 = amp[0] sig0 = sig[0] else: if debias==True: amp0 = deb_amp(amp,sig) else: amp0= np.sqrt(np.maximum(np.mean(amp**2),0.)) sig0 = inc_sig(amp,sig) #sig0 = coh_sig(amp,sig) return amp0,sig0 def deb_amp(amp,sig): #eq. 9.86 from Thompson et al. amp = np.abs(np.asarray(amp)) sig = np.asarray(sig) Nc = len(amp) amp0sq = ( np.mean(amp**2 - (2. - 1./Nc)*sig**2) ) amp0sq = np.maximum(amp0sq,0.) amp0 = np.sqrt(amp0sq) return amp0 def inc_sig(amp,sig): amp = np.abs(np.asarray(amp)) sig = np.asarray(sig) Nc = len(amp) amp0 = deb_amp(amp,sig) Esigma = np.median(sig) snr0 = amp0/Esigma snrA = 1./(np.sqrt(1. + 2./np.sqrt(Nc)*(1./snr0)*np.sqrt(1.+1./snr0**2)) - 1.) if snrA>0: sigma0=amp0/snrA else: sigma0=coh_sig(amp,sig) return sigma0 def coh_sig(amp,sig): amp = np.abs(np.asarray(amp)) sig = np.asarray(sig) Nc = len(amp) sigma0 = np.sqrt(np.sum(sig**2)/Nc**2) return sigma0
[docs]def dicts_TV_report(obs,snr_cut=2.): """Computes mean total variation reports Args: obs: ObsData object snr_cut: threshold for data snr Returns: amptv: dictionary of baseline mean TV cptv: dictionary of triangle mean TV lcatv: dictionary of quadrangle mean TV """ amp = obs.data baselines = list(set([(x[0],x[1]) for x in lca[['t1','t2']]])) amptv = {} for cou,quad in enumerate(baselines): amptv[quad] = np.mean(np.abs(np.diff(np.abs(amp[(amp['t1']==baselines[cou][0])&(amp['t2']==baselines[cou][1])]['vis'])))) cp = obs.c_phases() obs = obs.flag_low_snr(snr_cut=snr_cut) triangles = list(set([(x[0],x[1],x[2]) for x in cp[['t1','t2','t3']]])) cptv = {} for cou,tri in enumerate(triangles): cptv[tri] = np.mean(np.abs(np.diff(cp[(cp['t1']==triangles[cou][0])&(cp['t2']==triangles[cou][1])&(cp['t3']==triangles[cou][2])]['cphase']))) lca = obs.c_amplitudes(ctype='logcamp') quadrangles = list(set([(x[0],x[1],x[2],x[3]) for x in lca[['t1','t2','t3','t4']]])) lcatv = {} for cou,quad in enumerate(quadrangles): lcatv[quad] = np.mean(np.abs(np.diff(lca[(lca['t1']==quadrangles[cou][0])&(lca['t2']==quadrangles[cou][1])&(lca['t3']==quadrangles[cou][2])&(lca['t4']==quadrangles[cou][3])]['camp']))) return amptv, cptv, lcatv
[docs]def compare_TV(obs,obsref,snr_cut=2.,output=''): """Computes mean total variation reports Args: obs: ObsData object obref: ObsData object to use as a reference snr_cut: threshold for data snr output (str): if empty, returns median relative TV across all (baselines, triangles, quadrangles) if 'Full', returns full dictionaries comparing all (baselines, triangles, quadrangles) Returns: amprel / ampmed: dictionary of baseline relative differences in mean TV / median of baseline relative differences in mean TV cprel / cpmed: dictionary of triangle relative differences in mean TV / median of triangle relative differences in mean TV lcarel / lcamed: dictionary of quadrangle relative differences in mean TV / median of quadrangle relative differences in mean TV """ amptv, cptv, lcatv = dicts_TV(obs,snr_cut=snr_cut) ampref, cpref, lcaref = dicts_TV(obsref,snr_cut=snr_cut) cprel = {key: (cptv[key] - cpref[key])/cpref[key] for key in cptv.keys() if key in set(cpref.keys())} amprel = {key: (amptv[key] - ampref[key])/ampref[key] for key in amptv.keys() if key in set(ampref.keys())} lcarel = {key: (lcatv[key] - lcaref[key])/lcaref[key] for key in lcatv.keys() if key in set(lcaref.keys())} amprel = {key:amprel[key] for key in amprel.keys() if amprel[key]==amprel[key]} cprel = {key:cprel[key] for key in cprel.keys() if cprel[key]==cprel[key]} lcarel = {key:lcarel[key] for key in lcarel.keys() if lcarel[key]==lcarel[key]} if output=='Full': return amprel, cprel, lcarel else: ampmed =np.median([amprel[key] for key in amprel.keys() if amprel[key]==amprel[key]]) cpmed =np.median([cprel[key] for key in cprel.keys() if cprel[key]==cprel[key]]) lcamed =np.median([lcarel[key] for key in lcarel.keys() if lcarel[key]==lcarel[key]]) return ampmed,cpmed,lcamed