Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_product_metadata_name(self):
"""
:return: name of product metadata file
:rtype: str
"""
if self.safe_type == EsaSafeType.OLD_TYPE:
name = _edit_name(self.product_id, 'MTD', 'SAFL1C')
else:
name = 'MTD_{}'.format(self.product_id.split('_')[1])
return '{}.{}'.format(name, MimeType.XML.value)
def get_data_format(filename):
""" Util function to guess format from filename extension
:param filename: name of file
:type filename: str
:return: file extension
:rtype: MimeType
"""
fmt_ext = filename.split('.')[-1]
return MimeType(MimeType.canonical_extension(fmt_ext))
**{'datastrip/*/qi/{}'.format(qi_report): MimeType.XML for qi_report in QUALITY_REPORTS}}
# Tile files with formats:
TILE_FILES = {**{TILE_INFO: MimeType.JSON,
PRODUCT_INFO: MimeType.JSON,
METADATA: MimeType.XML,
PREVIEW: MimeType.JPG,
PREVIEW_JP2: MimeType.JP2,
TCI: MimeType.JP2,
QI_MSK_CLOUD: MimeType.GML,
ECMWFT: MimeType.RAW,
AUX_ECMWFT: MimeType.RAW,
GIPP: MimeType.XML},
**{'qi/{}'.format(qi_report): MimeType.XML for qi_report in QUALITY_REPORTS},
**{'{}/{}'.format(preview, band): MimeType.JP2
for preview, band in it.zip_longest([], S2_L1C_BANDS, fillvalue=PREVIEW)},
**{'qi/MSK_{}_{}'.format(qi, band): MimeType.GML for qi, band in it.product(QI_LIST, S2_L1C_BANDS)},
**{band: MimeType.JP2 for band in S2_L1C_BANDS},
**{band: MimeType.JP2 for band in S2_L2A_BANDS},
**{'qi/{}_PVI'.format(source_id): MimeType.JP2 for source_id in SOURCE_ID_LIST},
**{'qi/{}_{}'.format(mask, res.lstrip('R')): MimeType.JP2 for mask, res in it.product(CLASS_MASKS,
[R20m, R60m])}}
# All files joined together
AWS_FILES = {**PRODUCT_FILES,
**{filename.split('/')[-1]: data_format for filename, data_format in PRODUCT_FILES.items()},
**TILE_FILES,
**{filename.split('/')[-1]: data_format for filename, data_format in TILE_FILES.items()}}
class EsaSafeType(Enum):
""" Enum constants class for ESA .SAFE type.
:param data: image in its original format
:type data: any of possible image types
:param image_type: expected image format
:type image_type: constants.MimeType
:return: image as numpy array
:rtype: numpy array
:raises: ImageDecodingError
"""
bytes_data = BytesIO(data)
if image_type.is_tiff_format():
image = tiff.imread(bytes_data)
else:
image = np.array(Image.open(bytes_data))
if image_type is MimeType.JP2:
try:
bit_depth = get_jp2_bit_depth(bytes_data)
image = fix_jp2_image(image, bit_depth)
except ValueError:
pass
if image is None:
raise ImageDecodingError('Unable to decode image')
return image
geometry_string = geometry.wkt if isinstance(geometry, Geometry) else str(geometry)
filename = '_'.join([
str(request.service_type.value),
request.layer,
geometry_string,
CRS.ogc_string(geometry.crs),
'{}_{}'.format(date_interval[0], date_interval[1]),
request.resolution,
str(request.bins) if request.bins else '',
request.histogram_type.value if request.histogram_type else ''
])
filename = OgcImageService.filename_add_custom_url_params(filename, request)
return OgcImageService.finalize_filename(filename, MimeType.JSON)
['qi/{}_PVI'.format(source_id) for source_id in SOURCE_ID_LIST] +\
['qi/{}_{}'.format(mask, res.lstrip('R')) for mask, res in it.product(CLASS_MASKS,
[R20m, R60m])] +\
['qi/MSK_{}_{}'.format(qi, band) for qi, band in it.product(QI_LIST, S2_L1C_BANDS)] +\
[QI_MSK_CLOUD] +\
['qi/{}'.format(qi_report) for qi_report in QUALITY_REPORTS] +\
[ECMWFT, AUX_ECMWFT, GIPP]
# Product files with formats:
PRODUCT_FILES = {**{PRODUCT_INFO: MimeType.JSON,
METADATA: MimeType.XML,
INSPIRE: MimeType.XML,
MANIFEST: MimeType.SAFE,
L2A_MANIFEST: MimeType.XML,
REPORT: MimeType.XML,
DATASTRIP_METADATA: MimeType.XML},
**{'datastrip/*/qi/{}'.format(qi_report): MimeType.XML for qi_report in QUALITY_REPORTS}}
# Tile files with formats:
TILE_FILES = {**{TILE_INFO: MimeType.JSON,
PRODUCT_INFO: MimeType.JSON,
METADATA: MimeType.XML,
PREVIEW: MimeType.JPG,
PREVIEW_JP2: MimeType.JP2,
TCI: MimeType.JP2,
QI_MSK_CLOUD: MimeType.GML,
ECMWFT: MimeType.RAW,
AUX_ECMWFT: MimeType.RAW,
GIPP: MimeType.XML},
**{'qi/{}'.format(qi_report): MimeType.XML for qi_report in QUALITY_REPORTS},
**{'{}/{}'.format(preview, band): MimeType.JP2
for preview, band in it.zip_longest([], S2_L1C_BANDS, fillvalue=PREVIEW)},
**{'qi/MSK_{}_{}'.format(qi, band): MimeType.GML for qi, band in it.product(QI_LIST, S2_L1C_BANDS)},
:param post_values: form encoded data to send in POST request. Default is `None`
:type post_values: dict
:param headers: add HTTP headers to request. Default is `None`
:type headers: dict
:return: request response as JSON instance
:rtype: JSON instance or None
:raises: RunTimeError
"""
json_headers = {} if headers is None else headers.copy()
if post_values is None:
request_type = RequestType.GET
else:
request_type = RequestType.POST
json_headers = {**json_headers, **{'Content-Type': MimeType.get_string(MimeType.JSON)}}
request = DownloadRequest(url=url, headers=json_headers, request_type=request_type, post_values=post_values,
save_response=False, return_data=True, data_type=MimeType.JSON)
return execute_download_request(request)
LOGGER.debug('data_type=%s', data_type)
if data_type is MimeType.JSON:
if isinstance(entire_response, requests.Response):
return entire_response.json()
return json.loads(response_content.decode('utf-8'))
if MimeType.is_image_format(data_type):
return decode_image(response_content, data_type)
if data_type is MimeType.XML or data_type is MimeType.GML or data_type is MimeType.SAFE:
return ElementTree.fromstring(response_content)
try:
return {
MimeType.RAW: response_content,
MimeType.TXT: response_content,
MimeType.REQUESTS_RESPONSE: entire_response,
MimeType.ZIP: BytesIO(response_content)
}[data_type]
except KeyError:
raise ValueError('Unknown response data type {}'.format(data_type))
def is_image_format(self):
""" Checks whether file format is an image format
Example: ``MimeType.PNG.is_image_format()`` or ``MimeType.is_image_format(MimeType.PNG)``
:param self: File format
:type self: MimeType
:return: `True` if file is in image format, `False` otherwise
:rtype: bool
"""
return self in frozenset([MimeType.TIFF, MimeType.TIFF_d8, MimeType.TIFF_d16, MimeType.TIFF_d32f, MimeType.PNG,
MimeType.JP2, MimeType.JPG])
def _fetch_features(self):
""" Collects data from WFS service
:return: dictionary containing info about product tiles
:rtype: dict
"""
if self.feature_offset is None:
return
main_url = '{}{}/{}?'.format(self.base_url, ServiceType.WFS.value, self.instance_id)
params = {'SERVICE': ServiceType.WFS.value,
'REQUEST': 'GetFeature',
'TYPENAMES': DataSource.get_wfs_typename(self.data_source),
'BBOX': str(self.bbox.reverse()) if self.bbox.crs is CRS.WGS84 else str(self.bbox),
'OUTPUTFORMAT': MimeType.get_string(MimeType.JSON),
'SRSNAME': CRS.ogc_string(self.bbox.crs),
'TIME': '{}/{}'.format(self.time_interval[0], self.time_interval[1]),
'MAXCC': 100.0 * self.maxcc,
'MAXFEATURES': SHConfig().max_wfs_records_per_query,
'FEATURE_OFFSET': self.feature_offset}
url = main_url + urlencode(params)
LOGGER.debug("URL=%s", url)
response = get_json(url)
is_sentinel1 = self.data_source.is_sentinel1()
for tile_info in response["features"]:
if is_sentinel1:
if self._sentinel1_product_check(tile_info['properties']['id'], self.data_source) and \
self.data_source.contains_orbit_direction(tile_info['properties']['orbitDirection']):