Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def pixshapemap(shape, wcs, bsize=1000, separable="auto", signed=False):
"""Returns the physical width and heigh of each pixel in the map in radians.
Heavy for big maps. Much faster approaches are possible for known pixelizations."""
if wcsutils.is_plain(wcs):
cdelt = wcs.wcs.cdelt
pshape = np.zeros([2])
pshape[0] = wcs.wcs.cdelt[1]*get_unit(wcs)
pshape[1] = wcs.wcs.cdelt[0]*get_unit(wcs)
if not signed: pshape = np.abs(pshape)
pshape = np.broadcast_to(pshape[:,None,None], (2,)+shape[-2:])
return ndmap(pshape, wcs)
elif separable == True or (separable == "auto" and wcsutils.is_cyl(wcs)):
pshape = pixshapes_cyl(shape, wcs, signed=signed)
pshape = np.broadcast_to(pshape[:,:,None], (2,)+shape[-2:])
return ndmap(pshape, wcs)
else:
pshape = zeros((2,)+shape[-2:], wcs)
# Loop over blocks in y to reduce memory usage
for i1 in range(0, shape[-2], bsize):
i2 = min(i1+bsize, shape[-2])
def box(shape, wcs, npoint=10, corner=True):
"""Compute a bounding box for the given geometry."""
# Because of wcs's wrapping, we need to evaluate several
# extra pixels to make our unwinding unambiguous
pix = np.array([np.linspace(0,shape[-2],num=npoint,endpoint=True),
np.linspace(0,shape[-1],num=npoint,endpoint=True)])
if corner: pix -= 0.5
coords = wcsutils.nobcheck(wcs).wcs_pix2world(pix[1],pix[0],0)[::-1]
if wcsutils.is_plain(wcs):
return np.array(coords).T[[0,-1]]*get_unit(wcs)
else:
return utils.unwind(np.array(coords)*get_unit(wcs)).T[[0,-1]]
def distance_from(shape, wcs, points, omap=None, odomains=None, domains=False, method="bubble", rmax=None, step=1024):
"""Find the distance from each pixel in the geometry (shape, wcs) to the
nearest of the points[{dec,ra},npoint], returning a [ny,nx] map of distances.
If domains==True, then it will also return a [ny,nx] map of the index of the point
that was closest to each pixel. If rmax is specified and the method is "bubble", then
distances will only be computed up to rmax. Beyond that distance will be set to rmax
and domains to -1. This can be used to speed up the calculation when one only cares
about nearby areas."""
from pixell import distances
if wcsutils.is_plain(wcs): warnings.warn("Distance functions are not tested on plain coordinate systems.")
if omap is None: omap = empty(shape[-2:], wcs)
if domains and odomains is None: odomains = empty(shape[-2:], wcs, np.int32)
if wcsutils.is_cyl(wcs):
dec, ra = posaxes(shape, wcs)
if method == "bubble":
point_pix = utils.nint(sky2pix(shape, wcs, points))
return distances.distance_from_points_bubble_separable(dec, ra, points, point_pix, rmax=rmax, omap=omap, odomains=odomains, domains=domains)
elif method == "simple":
return distances.distance_from_points_simple_separable(dec, ra, points, omap=omap, odomains=odomains, domains=domains)
else: raise ValueError("Unknown method '%s'" % str(method))
else:
# We have a general geometry, so we need the full posmap. But to avoid wasting memory we
# can loop over chunks of the posmap.
if method == "bubble":
# Not sure how to slice bubble. Just do it in one go for now
pos = posmap(shape, wcs, safe=False)
def area(shape, wcs, nsamp=1000, method="auto"):
"""Returns the area of a patch with the given shape
and wcs, in steradians."""
if method == "auto":
if wcsutils.is_plain(wcs): method = "intermediate"
elif wcsutils.is_cyl(wcs): method = "cylindrical"
else: method = "contour"
if method in ["inter","intermediate"]: return area_intermediate(shape, wcs)
elif method in ["cyl", "cylindrical"]: return area_cyl(shape, wcs)
elif method in ["cont", "contour"]: return area_contour(shape, wcs, nsamp=nsamp)
else: raise ValueError("Unrecognized method '%s' in area()" % method)
def extract_pixbox(map, pixbox, omap=None, wrap="auto", op=lambda a,b:b, cval=0, iwcs=None, reverse=False):
"""This function extracts a rectangular area from an enmap based on the
given pixbox[{from,to,[stride]},{y,x}]. The difference between this function
and plain slicing of the enmap is that this one supports wrapping around the
sky. This is necessary to make things like fast thumbnail or tile extraction
at the edge of a (horizontally) fullsky map work."""
if iwcs is None: iwcs = map.wcs
pixbox = np.asarray(pixbox)
if omap is None:
oshape, owcs = slice_geometry(map.shape, iwcs, (slice(*pixbox[:,-2]),slice(*pixbox[:,-1])), nowrap=True)
omap = full(map.shape[:-2]+tuple(oshape[-2:]), owcs, cval, map.dtype)
nphi = utils.nint(360/np.abs(iwcs.wcs.cdelt[0]))
# If our map is wider than the wrapping length, assume we're a lower-spin field
nphi *= (nphi+map.shape[-1]-1)//nphi
if utils.streq(wrap, "auto"):
wrap = [0,0] if wcsutils.is_plain(iwcs) else [0,nphi]
else: wrap = np.zeros(2,int)+wrap
for ibox, obox in utils.sbox_wrap(pixbox.T, wrap=wrap, cap=map.shape[-2:]):
islice = utils.sbox2slice(ibox)
oslice = utils.sbox2slice(obox)
if reverse: map [islice] = op(map[islice], omap[oslice])
else: omap[oslice] = op(omap[oslice], map[islice])
return omap
def extent(shape, wcs, nsub=None, signed=False, method="auto"):
"""Returns the area of a patch with the given shape
and wcs, in steradians."""
if method == "auto":
if wcsutils.is_plain(wcs): method = "intermediate"
elif wcsutils.is_cyl(wcs): method = "cylindrical"
else: method = "subgrid"
if method in ["inter","intermediate"]: return extent_intermediate(shape, wcs, signed=signed)
elif method in ["cyl", "cylindrical"]: return extent_cyl(shape, wcs, signed=signed)
elif method in ["sub", "subgrid"]: return extent_subgrid(shape, wcs, nsub=nsub, signed=signed)
else: raise ValueError("Unrecognized method '%s' in extent()" % method)
def neighborhood_pixboxes(shape, wcs, poss, r):
"""Given a set of positions poss[npos,2] in radians and a distance r in radians,
return pixboxes[npos][{from,to},{y,x}] corresponding to the regions within a
distance of r from each entry in poss."""
if wcsutils.is_plain(wcs):
rpix = r/pixsize(shape, wcs)
centers = sky2pix(poss.T).T
return np.moveaxis([centers-rpix,center+rpix+1],0,1)
poss = np.asarray(poss)
res = np.zeros([len(poss),2,2])
for i, pos in enumerate(poss):
# Find the coordinate box we need
dec, ra = pos[:2]
dec1, dec2 = max(dec-r,-np.pi/2), min(dec+r,np.pi/2)
with utils.nowarn():
scale = 1/min(np.cos(dec1), np.cos(dec2))
dra = min(r*scale, np.pi)
ra1, ra2 = ra-dra, ra+dra
box = np.array([[dec1,ra1],[dec2,ra2]])
# And get the corresponding pixbox
res[i] = skybox2pixbox(shape, wcs, box)