Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_remote(self):
import s3fs
endpoint_url = "http://obs.eu-de.otc.t-systems.com"
s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(endpoint_url=endpoint_url))
s3_store = s3fs.S3Map(root="cyanoalert/cyanoalert-olci-lswe-l2c-v1.zarr", s3=s3, check=False)
diagnostic_store = DiagnosticStore(s3_store, logging_observer(log_path='remote-cube.log'))
xr.open_zarr(diagnostic_store)
def test_remote(self):
import s3fs
endpoint_url = "http://obs.eu-de.otc.t-systems.com"
s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(endpoint_url=endpoint_url))
s3_store = s3fs.S3Map(root="cyanoalert/cyanoalert-olci-lswe-l2c-v1.zarr", s3=s3, check=False)
diagnostic_store = DiagnosticStore(s3_store, logging_observer(log_path='remote-cube.log'))
xr.open_zarr(diagnostic_store)
If *path_or_url* is an object storage URL, return a object storage Zarr store (mapping object)
using *client_kwargs* and *mode* and a flag indicating whether the Zarr datasets is consolidated.
Otherwise *path_or_url* is interpreted as a local file system path, retured as-is plus
a flag indicating whether the Zarr datasets is consolidated.
:param path_or_url: A path or a URL.
:param client_kwargs: Object storage client keyword arguments.
:param mode: "r" or "w"
:return: A tuple (path_or_obs_store, consolidated).
"""
if is_obs_url(path_or_url):
root, obs_fs_kwargs, obs_fs_client_kwargs = parse_obs_url_and_kwargs(path_or_url, client_kwargs)
s3 = s3fs.S3FileSystem(**obs_fs_kwargs, client_kwargs=obs_fs_client_kwargs)
consolidated = mode == "r" and s3.exists(f'{root}/.zmetadata')
return s3fs.S3Map(root=root, s3=s3, check=False, create=mode == "w"), consolidated
else:
consolidated = os.path.exists(os.path.join(path_or_url, '.zmetadata'))
return path_or_url, consolidated
def open_data(self, data_id: str, **open_params) -> xr.Dataset:
s3_fs = self._s3_fs
if s3_fs is None:
s3_fs, open_params = self.consume_s3fs_params(open_params)
bucket_name, open_params = self.consume_bucket_name_param(open_params)
try:
return xr.open_zarr(s3fs.S3Map(root=f'{bucket_name}/{data_id}' if bucket_name else data_id,
s3=s3_fs,
check=False),
**open_params)
except ValueError as e:
raise DataStoreError(f'{e}') from e
def write_dataset(self, dataset: xr.Dataset, dataset_id: str = None, **write_params) -> str:
dataset_id = dataset_id or 'out.zarr'
import s3fs
s3, write_params = _get_s3_and_consume_params(write_params)
dataset.to_zarr(s3fs.S3Map(root=dataset_id, s3=s3, check=False), **write_params)
return dataset_id
Read the dataset for the level at given *index*.
:param index: the level index
:param zarr_kwargs: kwargs passed to xr.open_zarr()
:return: the dataset for the level at *index*.
"""
ext, level_path = self._level_paths[index]
if ext == ".link":
with self._obs_file_system.open(level_path, "w") as fp:
level_path = fp.read()
# if file_path is a relative path, resolve it against the levels directory
if not os.path.isabs(level_path):
base_dir = os.path.dirname(self._dir_path)
level_path = os.path.join(base_dir, level_path)
store = s3fs.S3Map(root=level_path, s3=self._obs_file_system, check=False)
cached_store = zarr.LRUStoreCache(store, max_size=2 ** 28)
with measure_time(tag=f"opened remote dataset {level_path} for level {index}"):
consolidated = self._obs_file_system.exists(f'{level_path}/.zmetadata')
return assert_cube(xr.open_zarr(cached_store, consolidated=consolidated, **zarr_kwargs), name=level_path)
def write_data(self, data: xr.Dataset, data_id: str, replace=False, **write_params):
assert_instance(data, xr.Dataset, 'data')
s3_fs = self._s3_fs
if s3_fs is None:
s3_fs, write_params = self.consume_s3fs_params(write_params)
bucket_name, write_params = self.consume_bucket_name_param(write_params)
try:
data.to_zarr(s3fs.S3Map(root=f'{bucket_name}/{data_id}' if bucket_name else data_id,
s3=s3_fs,
check=False),
mode='w' if replace else None,
**write_params)
except ValueError as e:
raise DataStoreError(f'{e}') from e