Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
pixelbuffer = 5
tile_pyramid = BufferedTilePyramid("geodetic", pixelbuffer=pixelbuffer)
tiles = list(tile_pyramid.tiles_from_geom(dummy1_bbox, zoom))
# add edge tile
tiles.append(tile_pyramid.tile(8, 0, 0))
for tile in tiles:
width, height = tile.shape
for band in read_raster_window(dummy1_tif, tile):
assert isinstance(band, ma.MaskedArray)
assert band.shape == (width, height)
for index in range(1, 4):
band = read_raster_window(dummy1_tif, tile, index)
assert isinstance(band, ma.MaskedArray)
assert band.shape == (width, height)
for index in [None, [1, 2, 3]]:
band = read_raster_window(dummy1_tif, tile, index)
assert isinstance(band, ma.MaskedArray)
assert band.ndim == 3
assert band.shape == (3, width, height)
config = MapcheteConfig(minmax_zoom.path)
rasterfile = config.params_at_zoom(zoom)["input"]["file1"]
dummy1_bbox = rasterfile.bbox()
pixelbuffer = 5
tile_pyramid = BufferedTilePyramid("geodetic", pixelbuffer=pixelbuffer)
tiles = list(tile_pyramid.tiles_from_geom(dummy1_bbox, zoom))
# add edge tile
tiles.append(tile_pyramid.tile(8, 0, 0))
for tile in tiles:
width, height = tile.shape
for band in read_raster_window(dummy1_tif, tile):
assert isinstance(band, ma.MaskedArray)
assert band.shape == (width, height)
for index in range(1, 4):
band = read_raster_window(dummy1_tif, tile, index)
assert isinstance(band, ma.MaskedArray)
assert band.shape == (width, height)
for index in [None, [1, 2, 3]]:
band = read_raster_window(dummy1_tif, tile, index)
assert isinstance(band, ma.MaskedArray)
assert band.ndim == 3
assert band.shape == (3, width, height)
def test_read_raster_window_resampling(cleantopo_br_tif):
"""Assert various resampling options work."""
tp = BufferedTilePyramid("geodetic")
with rasterio.open(cleantopo_br_tif, "r") as src:
tiles = tp.tiles_from_bounds(src.bounds, 4)
for tile in tiles:
outputs = [
read_raster_window(
cleantopo_br_tif, tile, resampling=resampling)
for resampling in [
"nearest", "bilinear", "cubic", "cubic_spline", "lanczos",
"average", "mode"
]
]
# resampling test:
assert any([
not np.array_equal(w, v) for v, w in zip(outputs[:-1], outputs[1:])
])
def test_read_raster_window_mask(s2_band):
"""No resampling artefacts on mask edges."""
tile = BufferedTilePyramid("geodetic").tile(zoom=13, row=1918, col=8905)
data = read_raster_window(
s2_band, tile, resampling="cubic", src_nodata=0, dst_nodata=0)
assert data.any()
assert not np.where(data == 1, True, False).any()
pixelbuffer = 5
tile_pyramid = BufferedTilePyramid("geodetic", pixelbuffer=pixelbuffer)
tiles = list(tile_pyramid.tiles_from_geom(dummy1_bbox, zoom))
# target window out of CRS bounds
band = read_raster_window(
dummy1_3857_tif, tile_pyramid.tile(12, 0, 0))
assert isinstance(band, ma.MaskedArray)
assert band.mask.all()
# not intersecting tile
tiles.append(tile_pyramid.tile(zoom, 1, 1)) # out of CRS bounds
tiles.append(tile_pyramid.tile(zoom, 16, 1)) # out of file bbox
for tile in tiles:
for band in read_raster_window(dummy1_3857_tif, tile):
assert isinstance(band, ma.MaskedArray)
assert band.shape == tile.shape
bands = read_raster_window(dummy1_3857_tif, tile, [1])
assert isinstance(bands, ma.MaskedArray)
assert bands.shape == tile.shape
# errors
with pytest.raises(IOError):
read_raster_window("nonexisting_path", tile)
def test_read_raster_window_partly_overlapping(cleantopo_br_tif):
"""Read array with read_raster_window where window is bigger than file."""
tile = BufferedTilePyramid("geodetic").tile(4, 15, 31)
data = read_raster_window(cleantopo_br_tif, tile)
assert isinstance(data, ma.MaskedArray)
assert data.mask.any()
rasterfile = config.params_at_zoom(zoom)["input"]["file1"]
dummy1_bbox = rasterfile.bbox()
pixelbuffer = 5
tile_pyramid = BufferedTilePyramid("geodetic", pixelbuffer=pixelbuffer)
tiles = list(tile_pyramid.tiles_from_geom(dummy1_bbox, zoom))
# target window out of CRS bounds
band = read_raster_window(
dummy1_3857_tif, tile_pyramid.tile(12, 0, 0))
assert isinstance(band, ma.MaskedArray)
assert band.mask.all()
# not intersecting tile
tiles.append(tile_pyramid.tile(zoom, 1, 1)) # out of CRS bounds
tiles.append(tile_pyramid.tile(zoom, 16, 1)) # out of file bbox
for tile in tiles:
for band in read_raster_window(dummy1_3857_tif, tile):
assert isinstance(band, ma.MaskedArray)
assert band.shape == tile.shape
bands = read_raster_window(dummy1_3857_tif, tile, [1])
assert isinstance(bands, ma.MaskedArray)
assert bands.shape == tile.shape
# errors
with pytest.raises(IOError):
read_raster_window("nonexisting_path", tile)
def test_read_raster_window_reproject(dummy1_3857_tif, minmax_zoom):
"""Read array with read_raster_window."""
zoom = 8
# with reproject
config_raw = minmax_zoom.dict
config_raw["input"].update(file1=dummy1_3857_tif)
config = MapcheteConfig(config_raw)
rasterfile = config.params_at_zoom(zoom)["input"]["file1"]
dummy1_bbox = rasterfile.bbox()
pixelbuffer = 5
tile_pyramid = BufferedTilePyramid("geodetic", pixelbuffer=pixelbuffer)
tiles = list(tile_pyramid.tiles_from_geom(dummy1_bbox, zoom))
# target window out of CRS bounds
band = read_raster_window(
dummy1_3857_tif, tile_pyramid.tile(12, 0, 0))
assert isinstance(band, ma.MaskedArray)
assert band.mask.all()
# not intersecting tile
tiles.append(tile_pyramid.tile(zoom, 1, 1)) # out of CRS bounds
tiles.append(tile_pyramid.tile(zoom, 16, 1)) # out of file bbox
for tile in tiles:
for band in read_raster_window(dummy1_3857_tif, tile):
assert isinstance(band, ma.MaskedArray)
assert band.shape == tile.shape
bands = read_raster_window(dummy1_3857_tif, tile, [1])
assert isinstance(bands, ma.MaskedArray)
assert bands.shape == tile.shape
# errors
with pytest.raises(IOError):
read_raster_window("nonexisting_path", tile)
def test_read_raster_window(dummy1_tif, minmax_zoom):
"""Read array with read_raster_window."""
zoom = 8
# without reproject
config = MapcheteConfig(minmax_zoom.path)
rasterfile = config.params_at_zoom(zoom)["input"]["file1"]
dummy1_bbox = rasterfile.bbox()
pixelbuffer = 5
tile_pyramid = BufferedTilePyramid("geodetic", pixelbuffer=pixelbuffer)
tiles = list(tile_pyramid.tiles_from_geom(dummy1_bbox, zoom))
# add edge tile
tiles.append(tile_pyramid.tile(8, 0, 0))
for tile in tiles:
width, height = tile.shape
for band in read_raster_window(dummy1_tif, tile):
assert isinstance(band, ma.MaskedArray)
assert band.shape == (width, height)
for index in range(1, 4):
band = read_raster_window(dummy1_tif, tile, index)
assert isinstance(band, ma.MaskedArray)
assert band.shape == (width, height)
for index in [None, [1, 2, 3]]:
band = read_raster_window(dummy1_tif, tile, index)
assert isinstance(band, ma.MaskedArray)
assert band.ndim == 3
assert band.shape == (3, width, height)
def read(self, output_tile, **kwargs):
"""
Read existing process output.
Parameters
----------
output_tile : ``BufferedTile``
must be member of output ``TilePyramid``
Returns
-------
NumPy array
"""
return read_raster_window(self.rio_file, output_tile)