Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Get the pixel area. We assume a rectangular pixelization, so this is just
# a function of y
ipixsize = 4 * np.pi / (12 * nside ** 2)
opixsize = get_pixsize_rect(shape, wcs)
nblock = (shape[-2] + rstep - 1) // rstep
for bi in range(comm.rank, nblock, comm.size):
if bi % comm.size != comm.rank:
continue
i = bi * rstep
rdec = dec[i : i + rstep]
opos = np.zeros((2, len(rdec), len(ra)))
opos[0] = rdec[:, None]
opos[1] = ra[None, :]
if rot:
# This is unreasonably slow
ipos = coordinates.transform("equ", "gal", opos[::-1], pol=True)
else:
ipos = opos[::-1]
pix[i : i + rstep, :] = hp.ang2pix(nside, np.pi / 2 - ipos[1], ipos[0])
del ipos, opos
for i in range(0, shape[-2], rstep):
pix[i : i + rstep] = utils.allreduce(pix[i : i + rstep], comm)
omap = enmap.zeros((1,) + shape, wcs, dtype)
imap = np.array(hmap).astype(dtype)
imap = imap[None]
if do_mask:
bad = hp.mask_bad(imap)
bad |= imap <= 0
imap[bad] = 0
del bad
# Read off the nearest neighbor values
omap[:] = imap[:, pix]
print("P -> alm")
sht.map2alm(m[1:3], alm[1:3], spin=2)
del m
else:
alm = hp_map
if f_ell is not None: alm = curvedsky.almxfl(alm,f_ell)
if rot is not None:
# Rotate by displacing coordinates and then fixing the polarization
print("Computing pixel positions")
pmap = enmap.posmap(shape, wcs)
if rot:
print("Computing rotated positions")
s1, s2 = rot.split(",")
opos = coordinates.transform(s2, s1, pmap[::-1], pol=ncomp == 3)
pmap[...] = opos[1::-1]
if len(opos) == 3:
psi = -opos[2].copy()
del opos
print("Projecting")
res = curvedsky.alm2map_pos(alm, pmap)
if rot and ncomp == 3:
print("Rotating polarization vectors")
res[1:3] = enmap.rotate_pol(res[1:3], psi)
else:
print("Projecting")
res = enmap.zeros((len(alm),) + shape[-2:], wcs, dtype)
res = curvedsky.alm2map(alm, res)
if return_alm: return res,alm
return res
If recenter is True, then the mean of the position shift will be subtracted. This
only makes sense for transformations of points covering only a part of the sky.
It is intended for visualization purposes."""
# Flatten, so we're [{ra,dec,phi?},:]
pos = np.asarray(pos)
res = pos.copy().reshape(pos.shape[0],-1)
# First transform to a coordinate system where we're moving in the z direction.
res = coordinates.transform("equ",["equ",[dir,False]],res,pol=pol)
if recenter: before = np.mean(res[1,::10])
# Apply aberration, which is a pure change of z in this system
z = np.cos(np.pi/2-res[1])
z_obs, A = calc_boost_1d(z, beta)
res[1] = np.pi/2-np.arccos(z_obs)
if recenter: res[1] -= np.mean(res[1,::10])-before
res = coordinates.transform(["equ",[dir,False]],"equ",res,pol=pol)
# Reshape to original shape
res = res.reshape(res.shape[:1]+pos.shape[1:])
A = A.reshape(pos.shape[1:])
return res, A
def distortion(pos, dir, beta):
"""Returns the local aberration distortion, defined as the
second derivative of the aberration displacement."""
pos = coordinates.transform("equ",["equ",[dir,False]],pos,pol=True)
return aber_deriv(np.pi/2-pos[1], -beta)-1
def remap(pos, dir, beta, pol=True, modulation=True, recenter=False):
"""Given a set of coordinates "pos[{ra,dec]}", computes the aberration
deflected positions for a speed beta in units of c in the
direction dir. If pol=True (the default), then the output
will have three columns, with the third column being
the aberration-induced rotation of the polarization angle."""
pos = coordinates.transform("equ",["equ",[dir,False]],pos,pol=pol)
if recenter: before = np.mean(pos[1,::10])
# Use -beta here to get the original position from the deflection,
# instead of getting the deflection from the original one as
# aber_angle normally computes.
pos[1] = np.pi/2-aber_angle(np.pi/2-pos[1], -beta)
if recenter:
after = np.mean(pos[1,::10])
pos[1] -= after-before
res = coordinates.transform(["equ",[dir,False]],"equ",pos,pol=pol)
if modulation:
amp = mod_amplitude(np.pi/2-pos[1], beta)
res = np.concatenate([res,[amp]])
return res
def remap(pos, dir, beta, pol=True, modulation=True, recenter=False):
"""Given a set of coordinates "pos[{ra,dec]}", computes the aberration
deflected positions for a speed beta in units of c in the
direction dir. If pol=True (the default), then the output
will have three columns, with the third column being
the aberration-induced rotation of the polarization angle."""
pos = coordinates.transform("equ",["equ",[dir,False]],pos,pol=pol)
if recenter: before = np.mean(pos[1,::10])
# Use -beta here to get the original position from the deflection,
# instead of getting the deflection from the original one as
# aber_angle normally computes.
pos[1] = np.pi/2-aber_angle(np.pi/2-pos[1], -beta)
if recenter:
after = np.mean(pos[1,::10])
pos[1] -= after-before
res = coordinates.transform(["equ",[dir,False]],"equ",pos,pol=pol)
if modulation:
amp = mod_amplitude(np.pi/2-pos[1], beta)
res = np.concatenate([res,[amp]])
return res
returns the aberrated pos_obs[{ra,dec,[phi]}], and the corresponding modulation
amplitude A[...] as a tuple. To get the inverse transform, from observed to
rest-frame coordinates, pass in -beta instead of beta.
phi is the optional local basis rotation, which will be computed if
pol=True (the default). This angle needs to be taken into account for vector fields
like polarization.
If recenter is True, then the mean of the position shift will be subtracted. This
only makes sense for transformations of points covering only a part of the sky.
It is intended for visualization purposes."""
# Flatten, so we're [{ra,dec,phi?},:]
pos = np.asarray(pos)
res = pos.copy().reshape(pos.shape[0],-1)
# First transform to a coordinate system where we're moving in the z direction.
res = coordinates.transform("equ",["equ",[dir,False]],res,pol=pol)
if recenter: before = np.mean(res[1,::10])
# Apply aberration, which is a pure change of z in this system
z = np.cos(np.pi/2-res[1])
z_obs, A = calc_boost_1d(z, beta)
res[1] = np.pi/2-np.arccos(z_obs)
if recenter: res[1] -= np.mean(res[1,::10])-before
res = coordinates.transform(["equ",[dir,False]],"equ",res,pol=pol)
# Reshape to original shape
res = res.reshape(res.shape[:1]+pos.shape[1:])
A = A.reshape(pos.shape[1:])
return res, A
# Make the pixbox fft-friendly
for i in range(2):
pixbox[1,i] = pixbox[0,i] + fft.fft_len(pixbox[1,i]-pixbox[0,i], direction="above", factors=[2,3,5])
ithumb = imap.extract_pixbox(pixbox)
if extensive: ithumb /= ithumb.pixsizemap()
ithumb = ithumb.apod(apod_pix, fill="median")
if filter is not None: ithumb = filter(ithumb)
if verbose:
print("%4d/%d %6.2f %6.2f %8.2f %dx%d" % (si+1, nsrc, coords[si,0]/utils.degree, coords[si,1]/utils.degree, np.max(ithumb), ithumb.shape[-2], ithumb.shape[-1]))
# Oversample using fourier if requested. We do this because fourier
# interpolation is better than spline interpolation overall
if oversample > 1:
fshape = utils.nint(np.array(oshape[-2:])*oversample)
ithumb = ithumb.resample(fshape, method="fft")
# I apologize for the syntax. There should be a better way of doing this
ipos = coordinates.transform("cel", ["cel",[[0,0,coords[si,1],coords[si,0]],False]], opos[::-1], pol=pol)
ipos, rest = ipos[1::-1], ipos[2:]
omaps[si] = ithumb.at(ipos, order=order)
# Apply the polarization rotation. The sign is flipped because we computed the
# rotation from the output to the input
if pol: omaps[si] = enmap.rotate_pol(omaps[si], -rest[0])
if extensive: omaps *= omaps.pixsizemap()
# Restore original dimension
omaps = omaps.reshape(ishape + omaps.shape[1:])
return omaps