Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_mosaic_errors():
"""Check error handling of create_mosaic()."""
tp_geo = BufferedTilePyramid("geodetic")
tp_mer = BufferedTilePyramid("mercator")
geo_tile = tp_geo.tile(1, 0, 0)
geo_tile_data = np.ndarray(geo_tile.shape)
mer_tile = tp_mer.tile(1, 1, 0)
mer_tile_data = np.ndarray(mer_tile.shape)
# tiles error
with pytest.raises(TypeError):
create_mosaic("invalid tiles")
with pytest.raises(TypeError):
create_mosaic(["invalid tiles"])
# CRS error
with pytest.raises(ValueError):
create_mosaic([
(geo_tile, geo_tile_data), (mer_tile, mer_tile_data)
])
# zoom error
with pytest.raises(ValueError):
diff_zoom = tp_geo.tile(2, 1, 0)
diff_zoom_data = np.ndarray(diff_zoom.shape)
create_mosaic([
(geo_tile, geo_tile_data), (diff_zoom, diff_zoom_data)
])
# tile data error
with pytest.raises(TypeError):
with pytest.raises(TypeError):
# for one tile
create_mosaic([(geo_tile, None)])
with pytest.raises(TypeError):
# for multiple tiles
create_mosaic([(geo_tile, None), (geo_tile, None)])
# tile data type error
with pytest.raises(TypeError):
diff_type = tp_geo.tile(1, 1, 0)
diff_type_data = np.ndarray(diff_zoom.shape).astype("int")
create_mosaic([
(geo_tile, geo_tile_data), (diff_type, diff_type_data)
])
# no tiles
with pytest.raises(ValueError):
create_mosaic(tiles=[])
def test_processing(mp_tmpdir, cleantopo_br, cleantopo_tl):
"""Test correct processing (read and write) outputs."""
for cleantopo_process in [cleantopo_br.path, cleantopo_tl.path]:
with mapchete.open(cleantopo_process) as mp:
for zoom in range(6):
tiles = []
for tile in mp.get_process_tiles(zoom):
output = mp.execute(tile)
tiles.append((tile, output))
assert isinstance(output, ma.MaskedArray)
assert output.shape == output.shape
assert not ma.all(output.mask)
mp.write(tile, output)
mosaic = create_mosaic(tiles)
try:
temp_vrt = os.path.join(mp_tmpdir, str(zoom)+".vrt")
gdalbuildvrt = "gdalbuildvrt %s %s/%s/*/*.tif > /dev/null" % (
temp_vrt, mp.config.output.path, zoom)
os.system(gdalbuildvrt)
with rasterio.open(temp_vrt, "r") as testfile:
for file_item, mosaic_item in zip(
testfile.meta["transform"], mosaic.affine
):
assert file_item == mosaic_item
band = testfile.read(1, masked=True)
assert band.shape == mosaic.data.shape
assert ma.allclose(band, mosaic.data)
assert ma.allclose(band.mask, mosaic.data.mask)
finally:
shutil.rmtree(mp_tmpdir, ignore_errors=True)
except:
pass
mapchete_file = os.path.join(scriptdir, cleantopo_process)
process = Mapchete(MapcheteConfig(mapchete_file))
for zoom in range(6):
tiles = []
for tile in process.get_process_tiles(zoom):
output = process.execute(tile)
tiles.append(output)
assert isinstance(output, BufferedTile)
assert isinstance(output.data, ma.MaskedArray)
assert output.data.shape == output.shape
assert not ma.all(output.data.mask)
process.write(output)
tilenum += 1
mosaic, mosaic_affine = create_mosaic(tiles)
try:
temp_vrt = os.path.join(out_dir, str(zoom)+".vrt")
gdalbuildvrt = "gdalbuildvrt %s %s/%s/*/*.tif > /dev/null" % (
temp_vrt, out_dir, zoom)
os.system(gdalbuildvrt)
with rasterio.open(temp_vrt, "r") as testfile:
for file_item, mosaic_item in zip(
testfile.meta["transform"], mosaic_affine
):
try:
assert file_item == mosaic_item
except AssertionError:
raise ValueError(
"%s zoom %s: Affine items do not match %s %s"
% (
cleantopo_process, zoom, file_item,
def test_create_mosaic_antimeridian():
"""Create mosaic using tiles on opposing antimeridian sides."""
zoom = 5
row = 0
pixelbuffer = 5
tp = BufferedTilePyramid("geodetic", pixelbuffer=pixelbuffer)
west = tp.tile(zoom, row, 0)
east = tp.tile(zoom, row, tp.matrix_width(zoom) - 1)
mosaic = create_mosaic([
(west, np.ones(west.shape).astype("uint8")),
(east, np.ones(east.shape).astype("uint8") * 2)
])
assert isinstance(mosaic, ReferencedRaster)
# Huge array gets initialized because the two tiles are on opposing sides of the
# projection area. The below test should pass if the tiles are stitched together next
# to each other.
assert mosaic.data.shape == (1, west.height, west.width * 2 - 2 * pixelbuffer)
assert mosaic.data[0][0][0] == 2
assert mosaic.data[0][0][-1] == 1
# If tiles from opposing sides from Antimeridian are mosaicked it will happen that the
# output mosaic exceeds the CRS bounds (obviously). In such a case the mosaicking
# function shall make sure that the larger part of the output mosaic shall be inside
# the CRS bounds.
def _read_existing_output(self, tile, output_tiles):
if self.config.output.METADATA["data_type"] == "raster":
mosaic, affine = raster.create_mosaic([
(output_tile, self.read(output_tile))
for output_tile in output_tiles
])
return raster.extract_from_array(mosaic, affine, tile)
elif self.config.output.METADATA["data_type"] == "vector":
return list(chain.from_iterable([
self.read(output_tile) for output_tile in output_tiles
]))
# resample from children tiles
elif baselevel == "lower":
if self.output_reader.pyramid.pixelbuffer:
lower_tiles = set([
y for y in chain(*[
self.output_reader.pyramid.tiles_from_bounds(
x.bounds, x.zoom + 1
)
for x in output_tiles
])
])
else:
lower_tiles = [
y for y in chain(*[x.get_children() for x in output_tiles])
]
mosaic = raster.create_mosaic(
[
(lower_tile, self.output_reader.read(lower_tile))
for lower_tile in lower_tiles
],
nodata=self.output_reader.output_params["nodata"]
)
process_data = raster.resample_from_array(
in_raster=mosaic.data,
in_affine=mosaic.affine,
out_tile=self.tile,
resampling=self.config_baselevels["lower"],
nodata=self.output_reader.output_params["nodata"]
)
logger.debug((self.tile.id, "generated from baselevel", str(t)))
return process_data
def _interpolate_from_baselevel(self, tile=None, baselevel=None):
with Timer() as t:
# resample from parent tile
if baselevel == "higher":
parent_tile = tile.get_parent()
process_data = raster.resample_from_array(
in_raster=self.get_raw_output(parent_tile, _baselevel_readonly=True),
in_affine=parent_tile.affine,
out_tile=tile,
resampling=self.config.baselevels["higher"],
nodataval=self.config.output.nodata
)
# resample from children tiles
elif baselevel == "lower":
mosaic, mosaic_affine = raster.create_mosaic([
(
child_tile,
self.get_raw_output(child_tile, _baselevel_readonly=True)
)
for child_tile in self.config.baselevels["tile_pyramid"].tile(
*tile.id
).get_children()
])
process_data = raster.resample_from_array(
in_raster=mosaic,
in_affine=mosaic_affine,
out_tile=tile,
resampling=self.config.baselevels["lower"],
nodataval=self.config.output.nodata
)
logger.debug((tile.id, "generated from baselevel", t.elapsed))