Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def save_txt(self, fname):
"""Save visibility data to a text file.
Args:
fname (str): path to output text file
"""
ehtim.io.save.save_obs_txt(self,fname)
return
data = obs.unpack(['u','v','amp','phase', 'sigma', 'time', 't1', 't2', 'tint'])
biarr = obs.bispectra(mode="all", count="min")
# extract the telescope names and parameters
antennaNames = obs.tarr['site']
sefd = obs.tarr['sefdr']
antennaX = obs.tarr['x']
antennaY = obs.tarr['y']
antennaZ = obs.tarr['z']
#antennaDiam = -np.ones(antennaX.shape) #todo: this is incorrect and there is just a dummy variable here
antennaDiam = sefd # replace antennaDiam with SEFD for radio observtions
# create dictionary
union = {};
union = ehtim.io.writeData.arrayUnion(antennaNames, union)
# extract the integration time
intTime = data['tint'][0]
if not all(data['tint'][0] == item for item in np.reshape(data['tint'], (-1)) ):
raise TypeError("The time integrations for each visibility are different")
# get visibility information
amp = data['amp']
phase = data['phase']
viserror = data['sigma']
u = data['u']
v = data['v']
# convert antenna name strings to number identifiers
ant1 = ehtim.io.writeData.convertStrings(data['t1'], union)
ant2 = ehtim.io.writeData.convertStrings(data['t2'], union)
data.target = np.append(data.target, ehtim.io.oifits.OI_TARGET(name, RA, DEC, veltyp='LSR') )
#calulate wavelength and bandpass
wavelength = speedoflight/frequency
bandlow = speedoflight/(frequency+(0.5*bandWidth))
bandhigh = speedoflight/(frequency-(0.5*bandWidth))
bandpass = bandhigh-bandlow
# put in the wavelength information - only using a single frequency
data.wavelength['WAVELENGTH_NAME'] = ehtim.io.oifits.OI_WAVELENGTH(wavelength, eff_band=bandpass)
# put in information about the telescope stations in the array
stations = [];
for i in range(0, len(antennaNames)):
stations.append( (antennaNames[i], antennaNames[i], i+1, antennaDiam[i], [antennaX[i], antennaY[i], antennaZ[i]]) )
data.array['ARRAY_NAME'] = ehtim.io.oifits.OI_ARRAY('GEOCENTRIC', [0, 0, 0], stations);
print('Warning: set cflux and cfluxerr = False because otherwise problems were being generated...are they the total flux density?')
print('Warning: are there any true flags?')
# put in the visibility information - note this does not include phase errors!
for i in range(0, len(u)):
station_curr = (data.array['ARRAY_NAME'].station[ int(ant1[i] - 1) ] , data.array['ARRAY_NAME'].station[ int(ant2[i] - 1) ]);
currVis = ehtim.io.oifits.OI_VIS(timeobs[i], intTime, visamp[i], visamperr[i], visphi[i], visphierr[i], flagVis, u[i]*wavelength, v[i]*wavelength, data.wavelength['WAVELENGTH_NAME'], data.target[0], array=data.array['ARRAY_NAME'], station=station_curr, cflux=False, cfluxerr=False);
data.vis = np.append( data.vis, currVis );
# put in bispectrum information
for j in range(0, len(uClosure)):
station_curr = (data.array['ARRAY_NAME'].station[ int(antOrder[j][0] - 1) ] , data.array['ARRAY_NAME'].station[ int(antOrder[j][1] - 1) ], data.array['ARRAY_NAME'].station[ int(antOrder[j][2] - 1) ]);
currT3 = ehtim.io.oifits.OI_T3(timeClosure[j], intTime, t3amp[j], t3amperr[j], t3phi[j], t3phierr[j], flagVis, uClosure[j][0]*wavelength, vClosure[j][0]*wavelength, uClosure[j][1]*wavelength, vClosure[j][1]*wavelength, data.wavelength['WAVELENGTH_NAME'], data.target[0], array=data.array['ARRAY_NAME'], station=station_curr);
data.t3 = np.append(data.t3, currT3 );
def save_txt(self, fname):
"""Save the Movie data to individual text files with filenames basename + 00001, etc.
Args:
fname (str): basename of output files
Returns:
"""
ehtim.io.save.save_mov_txt(self, fname)
return
def save_txt(self, fname):
"""Save image data to text file.
Args:
fname (str): path to output text file
Returns:
"""
ehtim.io.save.save_im_txt(self, fname)
return
def load_caltable(obs, datadir, sqrt_gains=False ):
"""Load apriori Caltable object from text files in the given directory
Args:
obs (Obsdata): The observation object associated with the Caltable
datadir (str): directory to save caltable in
sqrt_gains (bool): If True, we take the sqrt of table gains before loading.
Returns:
(Caltable): a caltable object
"""
tarr = obs.tarr
array_filename = datadir + '/array.txt'
if os.path.exists(array_filename):
tarr = ehtim.io.load.load_array_txt(array_filename).tarr
datatables = {}
for s in range(0, len(tarr)):
site = tarr[s]['site']
filename = datadir + obs.source + '_' + site + '.txt'
try:
data = np.loadtxt(filename, dtype=bytes).astype(str)
except IOError:
continue
#print ("filename)
datatable = []
for row in data:
def writeOIFITS(filename, RA, DEC, frequency, bandWidth, intTime,
visamp, visamperr, visphi, visphierr, u, v, ant1, ant2, timeobs,
t3amp, t3amperr, t3phi, t3phierr, uClosure, vClosure, antOrder, timeClosure,
antennaNames, antennaDiam, antennaX, antennaY, antennaZ):
speedoflight = C;
flagVis = False; # do not flag any data
# open a new oifits file
data = ehtim.io.oifits.oifits();
# put in the target information - RA and DEC should be in degrees
name = 'TARGET_NAME';
data.target = np.append(data.target, ehtim.io.oifits.OI_TARGET(name, RA, DEC, veltyp='LSR') )
#calulate wavelength and bandpass
wavelength = speedoflight/frequency
bandlow = speedoflight/(frequency+(0.5*bandWidth))
bandhigh = speedoflight/(frequency-(0.5*bandWidth))
bandpass = bandhigh-bandlow
# put in the wavelength information - only using a single frequency
data.wavelength['WAVELENGTH_NAME'] = ehtim.io.oifits.OI_WAVELENGTH(wavelength, eff_band=bandpass)
# put in information about the telescope stations in the array
stations = [];
def load_obs_oifits(filename, flux=1.0):
"""Load data from an oifits file. Does NOT currently support polarization.
Args:
fname (str): path to input text file
flux (float): normalization total flux
Returns:
obs (Obsdata): Obsdata object loaded from file
"""
print('Warning: load_obs_oifits does NOT currently support polarimetric data!')
# open oifits file and get visibilities
oidata=ehtim.io.oifits.open(filename)
vis_data = oidata.vis
# get source info
src = oidata.target[0].target
ra = oidata.target[0].raep0.angle
dec = oidata.target[0].decep0.angle
# get annena info
nAntennas = len(oidata.array[list(oidata.array.keys())[0]].station)
sites = np.array([oidata.array[list(oidata.array.keys())[0]].station[i].sta_name for i in range(nAntennas)])
arrayX = oidata.array[list(oidata.array.keys())[0]].arrxyz[0]
arrayY = oidata.array[list(oidata.array.keys())[0]].arrxyz[1]
arrayZ = oidata.array[list(oidata.array.keys())[0]].arrxyz[2]
x = np.array([arrayX + oidata.array[list(oidata.array.keys())[0]].station[i].staxyz[0] for i in range(nAntennas)])
y = np.array([arrayY + oidata.array[list(oidata.array.keys())[0]].station[i].staxyz[1] for i in range(nAntennas)])
z = np.array([arrayZ + oidata.array[list(oidata.array.keys())[0]].station[i].staxyz[2] for i in range(nAntennas)])