Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _load_fs_and_path(path, creds=None, session_creds=True):
""" Given url(path) and creds returns filesystem required for accessing that file + url's filepath in that filesystem
"""
if (
path.startswith("./")
or path.startswith("/")
or path.startswith("../")
or path.startswith("~/")
):
return fsspec.filesystem("file"), os.path.expanduser(path.replace("fs://", ""))
if session_creds and creds is None and not path.startswith("s3://"):
path, creds = _connect(path)
if path.startswith("s3://"):
path = path[5:]
if creds is not None and session_creds:
return (
fsspec.filesystem(
"s3",
key=creds["access_key"],
secret=creds["secret_key"],
token=creds["session_token"],
client_kwargs={
"endpoint_url": creds["endpoint"],
"region_name": creds["region"],
return (
fsspec.filesystem(
"s3",
key=creds["access_key"],
secret=creds["secret_key"],
token=creds["session_token"],
client_kwargs={
"endpoint_url": creds["endpoint"],
"region_name": creds["region"],
},
),
path,
)
elif creds is not None:
return (
fsspec.filesystem(
"s3", key=creds.get("access_key"), secret=creds.get("secret_key"),
),
path,
)
else:
return fsspec.filesystem("s3"), path
client_kwargs={
"endpoint_url": creds["endpoint"],
"region_name": creds["region"],
},
),
path,
)
elif creds is not None:
return (
fsspec.filesystem(
"s3", key=creds.get("access_key"), secret=creds.get("secret_key"),
),
path,
)
else:
return fsspec.filesystem("s3"), path
def _determine_worker(self):
try:
get_worker()
self.worker = True
self.fs = filesystem(self.protocol, **(self.remote_options or {}))
except ValueError:
self.worker = False
self.client = _get_global_client()
self.rfs = dask.delayed(self)
if (
path.startswith("./")
or path.startswith("/")
or path.startswith("../")
or path.startswith("~/")
):
return fsspec.filesystem("file"), os.path.expanduser(path.replace("fs://", ""))
if session_creds and creds is None and not path.startswith("s3://"):
path, creds = _connect(path)
if path.startswith("s3://"):
path = path[5:]
if creds is not None and session_creds:
return (
fsspec.filesystem(
"s3",
key=creds["access_key"],
secret=creds["secret_key"],
token=creds["session_token"],
client_kwargs={
"endpoint_url": creds["endpoint"],
"region_name": creds["region"],
},
),
path,
)
elif creds is not None:
return (
fsspec.filesystem(
"s3", key=creds.get("access_key"), secret=creds.get("secret_key"),
),
"""
Validate filesystem argument and return an fsspec file system object
Args:
path: Path as a string
filesystem: Optional fsspec filesystem object to use to open the file. If not
provided, filesystem type is inferred from path
Returns:
fsspec file system
"""
if filesystem is None:
return fsspec.open(path).fs
else:
if isinstance(filesystem, (str, pathlib.Path)):
return fsspec.filesystem(str(filesystem))
elif isinstance(filesystem, fsspec.AbstractFileSystem):
return filesystem
else:
raise ValueError(
"Received invalid filesystem value with type: {typ}".format(
typ=type(filesystem)
)
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class.
E.g. for ``GCSFileSystem`` class: `{"project": "my-project", ...}`
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}
protocol, path = get_protocol_and_path(filepath, version)
self._protocol = protocol
self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)
super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)
# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)