Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
obsdata = simobs.add_jones_and_noise(obs, add_th_noise=add_th_noise,
opacitycal=opacitycal, ampcal=ampcal,
phasecal=phasecal, dcal=dcal, frcal=frcal, rlgaincal=rlgaincal,
stabilize_scan_phase=stabilize_scan_phase,
stabilize_scan_amp=stabilize_scan_amp, neggains=neggains,
gainp=gainp, taup=taup, gain_offset=gain_offset,
dterm_offset=dterm_offset,
caltable_path=caltable_path, seed=seed)
obs = ehtim.obsdata.Obsdata(obs.ra, obs.dec, obs.rf, obs.bw, obsdata, obs.tarr,
source=obs.source, mjd=obs.mjd, polrep=obs_in.polrep,
ampcal=ampcal, phasecal=phasecal, opacitycal=opacitycal, dcal=dcal, frcal=frcal,
timetype=obs.timetype, scantable=obs.scans)
if inv_jones:
obsdata = simobs.apply_jones_inverse(obs, opacitycal=opacitycal, dcal=dcal, frcal=frcal)
obs = ehtim.obsdata.Obsdata(obs.ra, obs.dec, obs.rf, obs.bw, obsdata, obs.tarr,
source=obs.source, mjd=obs.mjd, polrep=obs_in.polrep,
ampcal=ampcal, phasecal=phasecal,
opacitycal=True, dcal=True, frcal=True,
timetype=obs.timetype, scantable=obs.scans)
#these are always set to True after inverse jones call
# No Jones Matrices, Add noise the old way
# NOTE There is an asymmetry here - in the old way, we don't offer the ability to *not* unscale estimated noise.
else:
if caltable_path:
print('WARNING: the caltable is currently only saved if you apply noise with a Jones Matrix')
print(' D_R: {:.4f}'.format(D_fit[2*isite]))
print(' D_L: {:.4f}\n'.format(D_fit[2*isite+1]))
if const_fpol:
print('Source Fractional Polarization Magnitude: {:.4f}'.format(np.abs(D_fit[-1])))
print('Source Fractional Polarization EVPA [deg]: {:.4f}\n'.format(90./np.pi*np.angle(D_fit[-1])))
tstop = time.time()
print("\nleakage_cal time: %f s" % (tstop - tstart))
if not obs_apply==False:
obs_test = obs_apply.copy()
# Apply the solution
for isite in range(len(sites)):
obs_test.tarr['dr'][site_index[isite]] = D_fit[2*isite]
obs_test.tarr['dl'][site_index[isite]] = D_fit[2*isite+1]
obs_test.data = simobs.apply_jones_inverse(obs_test,dcal=False,verbose=False)
obs_test.dcal = True
else:
obs_test = obs_test.switch_polrep(obs.polrep)
if not const_fpol:
return obs_test
else:
return [obs_test, D_fit[-1]]
print("Applying Jones Matrices to data . . . ")
obsdata = simobs.add_jones_and_noise(obs, add_th_noise=add_th_noise,
opacitycal=opacitycal, ampcal=ampcal,
phasecal=phasecal, dcal=dcal, frcal=frcal,
stabilize_scan_phase=stabilize_scan_phase,
stabilize_scan_amp=stabilize_scan_amp,
gainp=gainp, taup=taup, gain_offset=gain_offset,
dtermp=dtermp,dterm_offset=dterm_offset)
obs = ehtim.obsdata.Obsdata(obs.ra, obs.dec, obs.rf, obs.bw, obsdata, obs.tarr,
source=obs.source, mjd=obs.mjd, polrep=obs_in.polrep,
ampcal=ampcal, phasecal=phasecal, opacitycal=opacitycal, dcal=dcal, frcal=frcal,
timetype=obs.timetype, scantable=obs.scans)
if inv_jones:
obsdata = simobs.apply_jones_inverse(obs, opacitycal=opacitycal, dcal=dcal, frcal=frcal)
obs = ehtim.obsdata.Obsdata(obs.ra, obs.dec, obs.rf, obs.bw, obsdata, obs.tarr,
source=obs.source, mjd=obs.mjd, polrep=obs_in.polrep,
ampcal=ampcal, phasecal=phasecal,
opacitycal=True, dcal=True, frcal=True,
timetype=obs.timetype, scantable=obs.scans)
#these are always set to True after inverse jones call
# No Jones Matrices, Add noise the old way
# TODO There is an asymmetry here - in the old way, we don't offer the ability to *not* unscale estimated noise.
elif add_th_noise:
obsdata = simobs.add_noise(obs, add_th_noise=add_th_noise,
ampcal=ampcal, phasecal=phasecal, opacitycal=opacitycal,
gainp=gainp, taup=taup, gain_offset=gain_offset)
obs = ehtim.obsdata.Obsdata(obs.ra, obs.dec, obs.rf, obs.bw, obsdata, obs.tarr,
return chisq + chisq_D
# Now, we will minimize the total chi-squared. We need two complex leakage terms for each site
optdict = {'maxiter' : MAXIT} # minimizer params
Dpar_guess = np.zeros((len(sites) + const_fpol)*2, dtype=np.complex128).view(dtype=np.float64)
print("Minimizing...")
res = opt.minimize(errfunc, Dpar_guess, method=minimizer_method, options=optdict)
# get solution
D_fit = res.x.astype(np.float64).view(dtype=np.complex128) # all the D-terms (complex)
# Apply the solution
for isite in range(len(sites)):
obs_test.tarr['dr'][site_index[isite]] = D_fit[2*isite]
obs_test.tarr['dl'][site_index[isite]] = D_fit[2*isite+1]
obs_test.data = simobs.apply_jones_inverse(obs_test,dcal=False,verbose=False)
obs_test.dcal = True
if show_solution:
print("Original chi-squared: {:.4f}".format(chisq_total(obs.switch_polrep('circ').data, im_circ, D_fit)))
print("New chi-squared: {:.4f}\n".format(chisq_total(obs_test.data, im_circ, D_fit)))
for isite in range(len(sites)):
print(sites[isite])
print(' D_R: {:.4f}'.format(D_fit[2*isite]))
print(' D_L: {:.4f}\n'.format(D_fit[2*isite+1]))
if const_fpol:
print('Source Fractional Polarization Magnitude: {:.4f}'.format(np.abs(D_fit[-1])))
print('Source Fractional Polarization EVPA [deg]: {:.4f}\n'.format(90./np.pi*np.angle(D_fit[-1])))
tstop = time.time()
print("\nleakage_cal time: %f s" % (tstop - tstart))
rlgaincal=rlgaincal,
stabilize_scan_phase=stabilize_scan_phase,
stabilize_scan_amp=stabilize_scan_amp,
neggains=neggains,
gainp=gainp, taup=taup, gain_offset=gain_offset,
dterm_offset=dterm_offset,
caltable_path=caltable_path, seed=seed)
obs = ehtim.obsdata.Obsdata(obs.ra, obs.dec, obs.rf, obs.bw, obsdata, obs.tarr,
source=obs.source, mjd=obs.mjd, polrep=obs_in.polrep,
ampcal=ampcal, phasecal=phasecal, opacitycal=opacitycal,
dcal=dcal, frcal=frcal,
timetype=obs.timetype, scantable=obs.scans)
if inv_jones:
obsdata = simobs.apply_jones_inverse(obs,
opacitycal=opacitycal, dcal=dcal, frcal=frcal)
obs = ehtim.obsdata.Obsdata(obs.ra, obs.dec, obs.rf, obs.bw, obsdata, obs.tarr,
source=obs.source, mjd=obs.mjd, polrep=obs_in.polrep,
ampcal=ampcal, phasecal=phasecal,
opacitycal=True, dcal=True, frcal=True,
timetype=obs.timetype, scantable=obs.scans)
#these are always set to True after inverse jones call
# No Jones Matrices, Add noise the old way
# NOTE There is an asymmetry here - in the old way, we don't offer the ability to *not*
# unscale estimated noise.
else:
if caltable_path:
def errfunc(Dpar):
D = Dpar.astype(np.float64).view(dtype=np.complex128) # all the D-terms (complex). If const_fpol, fpol is the last parameter.
for isite in range(len(sites)):
obs_test.tarr['dr'][site_index[isite]] = D[2*isite]
obs_test.tarr['dl'][site_index[isite]] = D[2*isite+1]
data = simobs.apply_jones_inverse(obs_test,dcal=False,verbose=False)
# goodness-of-fit for the leakage-corrected data
chisq = chisq_total(data, im_circ, D)
# prior on the D terms
chisq_D = np.sum(np.abs(D/leakage_tol)**2)
return chisq + chisq_D