Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class OsmSplitter(AreaSplitter):
""" A tool that splits the given area into smaller parts. For the splitting it uses Open Street Map (OSM) grid on
the specified zoom level. It calculates bounding boxes of all OSM tiles that intersect the area. If specified by
user it can also reduce the sizes of the remaining bounding boxes to best fit the area.
:param shape_list: A list of geometrical shapes describing the area of interest
:type shape_list: list(shapely.geometry.multipolygon.MultiPolygon or shapely.geometry.polygon.Polygon)
:param crs: Coordinate reference system of the shapes in `shape_list`
:type crs: CRS
:param zoom_level: A zoom level defined by OSM. Level 0 is entire world, level 1 splits the world into 4 parts, etc.
:type zoom_level: int
:param reduce_bbox_sizes: If `True` it will reduce the sizes of bounding boxes so that they will tightly fit
the given area geometry from `shape_list`.
:type reduce_bbox_sizes: bool
"""
POP_WEB_MAX = transform_point((180, 0), CRS.WGS84, CRS.POP_WEB)[0]
def __init__(self, shape_list, crs, zoom_level, **kwargs):
super().__init__(shape_list, crs, **kwargs)
self.zoom_level = zoom_level
self._make_split()
def _make_split(self, ):
"""This method makes the split
"""
self.area_bbox = self.get_area_bbox(CRS.POP_WEB)
self._check_area_bbox()
self.bbox_list = []
self.info_list = []
def __init__(self, shape_list, crs, bbox_size):
"""
:param shape_list: A list of geometrical shapes describing the area of interest
:type shape_list: list(shapely.geometry.multipolygon.MultiPolygon or shapely.geometry.polygon.Polygon)
:param crs: Coordinate reference system of the shapes in `shape_list`
:type crs: CRS
:param bbox_size: Physical size in metres of generated bounding boxes. Could be a float or tuple of floats
:type bbox_size: int or (int, int) or float or (float, float)
"""
super().__init__(shape_list, crs)
self.bbox_size = self._parse_split_parameters(bbox_size, allow_float=True)
self.shape_geometry = Geometry(self.area_shape, self.crs).transform(CRS.WGS84)
self.utm_grid = self._get_utm_polygons()
self._make_split()
if CustomUrlParam.EVALSCRIPT.value in params:
evalscript = params[CustomUrlParam.EVALSCRIPT.value]
params[CustomUrlParam.EVALSCRIPT.value] = b64encode(evalscript.encode()).decode()
if CustomUrlParam.GEOMETRY.value in params:
geometry = params[CustomUrlParam.GEOMETRY.value]
crs = request.bbox.crs
if isinstance(geometry, Geometry):
if geometry.crs is not crs:
raise ValueError('Geometry object in custom_url_params should have the same CRS as given BBox')
else:
geometry = Geometry(geometry, crs)
if geometry.crs is CRS.WGS84:
geometry = geometry.reverse()
params[CustomUrlParam.GEOMETRY.value] = geometry.wkt
return params
:param tile_id: original tile identification string provided by ESA (e.g.
'S2A_OPER_MSI_L1C_TL_SGS__20160109T230542_A002870_T10UEV_N02.01')
:type tile_id: str
:param bbox: bounding box of requested area
:type bbox: geometry.BBox
:param start_date: beginning of time range in ISO8601 format
:type start_date: str
:param end_date: end of time range in ISO8601 format
:type end_date: str
:param absolute_orbit: An absolute orbit number of Sentinel-2 L1C products as defined by ESA
:type absolute_orbit: int
:return: An iterator returning dictionaries with info provided by Sentinel Hub OpenSearch REST service
:rtype: Iterator[dict]
"""
if bbox and bbox.crs is not CRS.WGS84:
bbox = bbox.transform(CRS.WGS84)
url_params = _prepare_url_params(tile_id, bbox, end_date, start_date, absolute_orbit)
url_params['maxRecords'] = SHConfig().max_opensearch_records_per_query
start_index = 1
while True:
url_params['index'] = start_index
url = '{}search.json?{}'.format(SHConfig().opensearch_url, urlencode(url_params))
LOGGER.debug("URL=%s", url)
response = get_json(url)
for tile_info in response["features"]:
yield tile_info
def wgs84_to_utm(lng, lat, utm_crs=None):
""" Convert WGS84 coordinates to UTM. If UTM CRS is not set it will be calculated automatically.
:param lng: longitude in WGS84 system
:type lng: float
:param lat: latitude in WGS84 system
:type lat: float
:param utm_crs: UTM coordinate reference system enum constants
:type utm_crs: constants.CRS or None
:return: east, north coordinates in UTM system
:rtype: float, float
"""
if utm_crs is None:
utm_crs = get_utm_crs(lng, lat)
return transform_point((lng, lat), CRS.WGS84, utm_crs)
for utm_cell in self.utm_grid:
utm_cell_geom, utm_cell_prop = utm_cell
# the UTM MGRS grid definition contains four 0 zones at the poles (0A, 0B, 0Y, 0Z)
if utm_cell_prop['zone'] == 0:
continue
utm_crs = self._get_utm_from_props(utm_cell_prop)
intersection = utm_cell_geom.intersection(self.shape_geometry.geometry)
if not intersection.is_empty and isinstance(intersection, GeometryCollection):
intersection = MultiPolygon(geo_object for geo_object in intersection
if isinstance(geo_object, (Polygon, MultiPolygon)))
if not intersection.is_empty:
intersection = Geometry(intersection, CRS.WGS84).transform(utm_crs)
bbox_partition = self._align_bbox_to_size(intersection.bbox).get_partition(size_x=size_x, size_y=size_y)
columns, rows = len(bbox_partition), len(bbox_partition[0])
for i, j in itertools.product(range(columns), range(rows)):
if bbox_partition[i][j].geometry.intersects(intersection.geometry):
self.bbox_list.append(bbox_partition[i][j])
self.info_list.append(dict(crs=utm_crs.name,
utm_zone=str(utm_cell_prop['zone']).zfill(2),
utm_row=utm_cell_prop['row'],
direction=utm_cell_prop['direction'],
index=index,
index_x=i,
index_y=j))
index += 1
def get_utm_crs(lng, lat, source_crs=CRS.WGS84):
""" Get CRS for UTM zone in which (lat, lng) is contained.
:param lng: longitude
:type lng: float
:param lat: latitude
:type lat: float
:param source_crs: source CRS
:type source_crs: constants.CRS
:return: CRS of the zone containing the lat,lon point
:rtype: constants.CRS
"""
if source_crs is not CRS.WGS84:
lng, lat = transform_point((lng, lat), source_crs, CRS.WGS84)
return CRS.get_utm_from_wgs84(lng, lat)
def get_utm_crs(lng, lat, source_crs=CRS.WGS84):
""" Get CRS for UTM zone in which (lat, lng) is contained.
:param lng: longitude
:type lng: float
:param lat: latitude
:type lat: float
:param source_crs: source CRS
:type source_crs: constants.CRS
:return: CRS of the zone containing the lat,lon point
:rtype: constants.CRS
"""
if source_crs is not CRS.WGS84:
lng, lat = transform_point((lng, lat), source_crs, CRS.WGS84)
return CRS.get_utm_from_wgs84(lng, lat)
:param tile_id: original tile identification string provided by ESA (e.g.
'S2A_OPER_MSI_L1C_TL_SGS__20160109T230542_A002870_T10UEV_N02.01')
:type tile_id: str
:param bbox: bounding box of requested area
:type bbox: geometry.BBox
:param start_date: beginning of time range in ISO8601 format
:type start_date: str
:param end_date: end of time range in ISO8601 format
:type end_date: str
:param absolute_orbit: An absolute orbit number of Sentinel-2 L1C products as defined by ESA
:type absolute_orbit: int
:return: An iterator returning dictionaries with info provided by Sentinel Hub OpenSearch REST service
:rtype: Iterator[dict]
"""
if bbox and bbox.crs is not CRS.WGS84:
bbox = bbox.transform(CRS.WGS84)
url_params = _prepare_url_params(tile_id, bbox, end_date, start_date, absolute_orbit)
url_params['maxRecords'] = SHConfig().max_opensearch_records_per_query
start_index = 1
while True:
url_params['index'] = start_index
url = '{}search.json?{}'.format(SHConfig().opensearch_url, urlencode(url_params))
LOGGER.debug("URL=%s", url)
response = get_json(url)
for tile_info in response["features"]:
yield tile_info