Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Args:
info: initial metadata for the composite
md_list: list of metadata dictionaries for each input
composite_array: array representing the final data values of the
composite for valid min/max calculations
Returns: dict of overall metadata (same as `info`)
"""
if not all(x[Info.PROJ] == md_list[0][Info.PROJ] for x in md_list[1:]):
raise ValueError("Algebraic inputs must all be the same projection")
uuid = uuidgen()
info[Info.UUID] = uuid
for k in (Info.PLATFORM, Info.INSTRUMENT, Info.SCENE):
if md_list[0].get(k) is None:
continue
if all(x.get(k) == md_list[0].get(k) for x in md_list[1:]):
info.setdefault(k, md_list[0][k])
info.setdefault(Info.KIND, Kind.COMPOSITE)
info.setdefault(Info.SHORT_NAME, '')
info.setdefault(Info.DATASET_NAME, info[Info.SHORT_NAME])
info.setdefault(Info.UNITS, '1')
max_meta = max(md_list, key=lambda x: x[Info.SHAPE])
for k in (Info.PROJ, Info.ORIGIN_X, Info.ORIGIN_Y, Info.CELL_WIDTH, Info.CELL_HEIGHT, Info.SHAPE):
info[k] = max_meta[k]
info[Info.VALID_RANGE] = (np.nanmin(composite_array), np.nanmax(composite_array))
info[Info.CLIM] = (np.nanmin(composite_array), np.nanmax(composite_array))
info[Info.OBS_TIME] = min([x[Info.OBS_TIME] for x in md_list])
def instrument(self):
return self.get(Info.INSTRUMENT)
def time_siblings(self, uuid, sibling_infos=None):
"""
return time-ordered list of datasets which have the same band, in time order
:param uuid: focus UUID we're trying to build around
:param sibling_infos: dictionary of UUID -> Dataset Info to sort through
:return: sorted list of sibling uuids in time order, index of where uuid is in the list
"""
if sibling_infos is None:
sibling_infos = self._layer_with_uuid
it = sibling_infos.get(uuid, None)
if it is None:
return [], 0
sibs = [(x[Info.SCHED_TIME], x[Info.UUID]) for x in
self._filter(sibling_infos.values(), it,
{Info.SHORT_NAME, Info.STANDARD_NAME, Info.SCENE, Info.INSTRUMENT, Info.PLATFORM,
Info.KIND})]
# then sort it into time order
sibs.sort()
offset = [i for i, x in enumerate(sibs) if x[1] == uuid]
return [x[1] for x in sibs], offset[0]
# otherwise display information on the selected layer(s)
# figure out the info shared between all the layers currently selected
presentation_info = self.document.current_layer_set
for layer_uuid in selected_uuid_list:
layer_info = self.document.get_info(uuid=layer_uuid)
this_prez = None
for prez_tuple in presentation_info:
if prez_tuple.uuid == layer_uuid:
this_prez = prez_tuple
break
# compare our various values
self._update_if_different(shared_info, layer_info, Info.DISPLAY_NAME)
self._update_if_different(shared_info, layer_info, Info.DISPLAY_TIME)
self._update_if_different(shared_info, layer_info, Info.INSTRUMENT)
self._update_if_different(shared_info, this_prez, attr='colormap')
self._get_shared_color_limits(shared_info, layer_info, this_prez)
self._get_code_block(shared_info, layer_uuid)
# wavelength
wl = layer_info.get(Info.CENTRAL_WAVELENGTH)
fmt = "{:0.2f} µm"
if isinstance(wl, (tuple, list)):
wl = [fmt.format(x) if x is not None else '---' for x in wl]
wl = "(" + ", ".join(wl) + ")"
else:
wl = fmt.format(wl) if wl is not None else '---'
if Info.CENTRAL_WAVELENGTH not in shared_info:
shared_info[Info.CENTRAL_WAVELENGTH] = wl
else:
shared_info[Info.CENTRAL_WAVELENGTH] = "---" if shared_info[Info.CENTRAL_WAVELENGTH] != wl else wl
:return:
"""
# FUTURE: resolve dictionary-style into attribute-style uses
dep_info = [self.r, self.g, self.b]
display_time = self._default_display_time()
short_name = self._default_short_name()
name = self._default_display_name(short_name=short_name, display_time=display_time)
ds_info = {
Info.DATASET_NAME: short_name,
Info.SHORT_NAME: short_name,
Info.DISPLAY_NAME: name,
Info.DISPLAY_TIME: display_time,
Info.SCHED_TIME: self.sched_time,
Info.BAND: self.band,
Info.CENTRAL_WAVELENGTH: self.central_wavelength,
Info.INSTRUMENT: self.instrument,
Info.PLATFORM: self.platform,
Info.SCENE: self.scene,
Info.UNIT_CONVERSION: self._get_units_conversion(),
Info.UNITS: None,
Info.VALID_RANGE: [d[Info.VALID_RANGE] if d else (None, None) for d in dep_info],
}
if self.r is None and self.g is None and self.b is None:
ds_info.update({
Info.ORIGIN_X: None,
Info.ORIGIN_Y: None,
Info.CELL_WIDTH: None,
Info.CELL_HEIGHT: None,
Info.PROJ: None,
Info.CLIM: ((None, None), (None, None), (None, None)),
})
def _display_shared_info(self, shared_info: defaultdict):
self.name_text.setText("Name: " + shared_info[Info.DISPLAY_NAME])
self.time_text.setText("Time: " + shared_info[Info.DISPLAY_TIME])
self.instrument_text.setText("Instrument: " + shared_info[Info.INSTRUMENT])
self.wavelength_text.setText("Wavelength: " + shared_info[Info.CENTRAL_WAVELENGTH])
self.colormap_text.setText("Colormap: " + (shared_info["colormap"] or ""))
self.clims_text.setText("Color Limits: " + shared_info["climits"])
self.composite_codeblock.setText(shared_info['codeblock'])
# format colormap
if not shared_info["colormap"]:
self.cmap_vis.setHtml("")
else:
cmap_html = COLORMAP_MANAGER[shared_info["colormap"]]._repr_html_()
cmap_html = cmap_html.replace("height", "border-collapse: collapse;\nheight")
self.cmap_vis.setHtml(
"""<div>%s</div>""" % (cmap_html,))
def _default_display_name(self, ds_info, display_time=None):
# FUTURE: This can be customized by the user
sat = ds_info[Info.PLATFORM]
inst = ds_info[Info.INSTRUMENT]
name = ds_info.get(Info.SHORT_NAME, '-unknown-')
label = ds_info.get(Info.STANDARD_NAME, '')
if label == 'toa_bidirectional_reflectance':
label = 'Refl'
elif label == 'toa_brightness_temperature':
label = 'BT'
else:
label = ''
if display_time is None:
display_time = ds_info.get(Info.DISPLAY_TIME, self._default_display_time(ds_info))
name = "{sat} {inst} {name} {standard_name} {dtime}".format(
sat=sat.value, inst=inst.value, name=name, standard_name=label, dtime=display_time)
return name
def _create_filenames(self, uuids, base_filename):
if not uuids or uuids[0] is None:
return [None], [base_filename]
filenames = []
# only use the first uuid to fill in filename information
file_uuids = uuids[:1] if is_video_filename(base_filename) else uuids
for u in file_uuids:
layer_info = self.doc[u]
fn = base_filename.format(
start_time=layer_info[Info.SCHED_TIME],
scene=Info.SCENE,
instrument=Info.INSTRUMENT,
)
filenames.append(fn)
# check for duplicate filenames
if len(filenames) > 1 and all(filenames[0] == fn for fn in filenames):
ext = os.path.splitext(filenames[0])[-1]
filenames = [os.path.splitext(fn)[0] + "_{:03d}".format(i + 1) + ext for i, fn in enumerate(filenames)]
return uuids, filenames