Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _load(self):
# initial: find cat files
# if flattening, need to get all entries from each.
self._entries.clear()
options = self.storage_options or {}
if isinstance(self.path, (list, tuple)):
files = sum([open_files(p, mode='rb', **options)
for p in self.path], [])
self.name = self.name or "%i files" % len(files)
self.description = self.description or f'Catalog generated from {len(files)} files'
self.path = [make_path_posix(p) for p in self.path]
else:
if isinstance(self.path, str) and '*' not in self.path:
self.path = self.path + '/*'
files = open_files(self.path, mode='rb', **options)
self.path = make_path_posix(self.path)
self.name = self.name or self.path
self.description = self.description or f'Catalog generated from all files found in {self.path}'
if not set(f.path for f in files) == set(
f.path for f in self._cat_files):
# glob changed, reload all
self._cat_files = files
self._cats.clear()
for f in files:
name = os.path.split(f.path)[-1].replace(
'.yaml', '').replace('.yml', '')
kwargs = self.kwargs.copy()
kwargs['path'] = f.path
d = make_path_posix(os.path.dirname(f.path))
if f.path not in self._cats:
entry = LocalCatalogEntry(name, "YAML file: %s" % name,
def _load(self):
# initial: find cat files
# if flattening, need to get all entries from each.
self._entries.clear()
options = self.storage_options or {}
if isinstance(self.path, (list, tuple)):
files = sum([open_files(p, mode='rb', **options)
for p in self.path], [])
self.name = self.name or "%i files" % len(files)
self.description = self.description or f'Catalog generated from {len(files)} files'
self.path = [make_path_posix(p) for p in self.path]
else:
if isinstance(self.path, str) and '*' not in self.path:
self.path = self.path + '/*'
files = open_files(self.path, mode='rb', **options)
self.path = make_path_posix(self.path)
self.name = self.name or self.path
self.description = self.description or f'Catalog generated from all files found in {self.path}'
if not set(f.path for f in files) == set(
f.path for f in self._cat_files):
# glob changed, reload all
self._cat_files = files
self._cats.clear()
def _get_schema(self):
from fsspec import open_files
import dask.array as da
if self._arr is None:
path = self._get_cache(self.path)[0]
files = open_files(path, 'rb', compression=None,
**self.storage)
if self.shape is None:
arr = NumpyAccess(files[0])
self.shape = arr.shape
self.dtype = arr.dtype
arrs = [arr] + [NumpyAccess(f, self.shape, self.dtype)
for f in files[1:]]
else:
arrs = [NumpyAccess(f, self.shape, self.dtype)
for f in files]
self.chunks = (self._chunks, ) + (-1, ) * (len(self.shape) - 1)
self._arrs = [da.from_array(arr, self.chunks) for arr in arrs]
if len(self._arrs) > 1:
self._arr = da.stack(self._arrs)
else:
def _get_schema(self):
from fsspec import open_files
if self._files is None:
urlpath = self._get_cache(self._urlpath)[0]
self._files = open_files(
urlpath, mode=self.mode, encoding=self.encoding,
compression=self.compression,
**self._storage_options)
self.npartitions = len(self._files)
return base.Schema(datashape=None,
dtype=None,
shape=(None, ),
npartitions=self.npartitions,
extra_metadata=self.metadata)
def _data_to_source(b, path, encoder=None, **kwargs):
import dask.bag as db
import posixpath
from fsspec import open_files
import dask
from intake.source.textfiles import TextFilesSource
if not hasattr(b, 'to_textfiles'):
try:
b = db.from_sequence(b, npartitions=1)
except TypeError:
raise NotImplementedError
files = open_files(posixpath.join(path, 'part.*'), mode='wt',
num=b.npartitions)
dwrite = dask.delayed(write_file)
out = [dwrite(part, f, encoder)
for part, f in zip(b.to_delayed(), files)]
dask.compute(out)
s = TextFilesSource(posixpath.join(path, 'part.*'))
return s
def _data_to_source(cat, path, **kwargs):
from intake.catalog.local import YAMLFileCatalog
from fsspec import open_files
import yaml
if not isinstance(cat, Catalog):
raise NotImplementedError
out = {}
for name in cat:
entry = cat[name]
out[name] = entry.__getstate__()
out[name]['parameters'] = [up._captured_init_kwargs for up
in entry._user_parameters]
out[name]['kwargs'].pop('parameters')
fn = posixpath.join(path, 'cat.yaml')
with open_files([fn], 'wt')[0] as f:
yaml.dump({'sources': out}, f)
return YAMLFileCatalog(fn)
def _load_metadata(self):
import dask.dataframe as dd
import dask.delayed
from fsspec import open_files
self.files = open_files(self.url, **self.storage_options)
def read_a_file(open_file, reader, kwargs):
with open_file as of:
df = reader(of, **kwargs)
df['path'] = open_file.path
return df
if self.dataframe is None:
self.parts = [
dask.delayed(read_a_file)(open_file, self.reader, self.kwargs)
for open_file in self.files
]
self.dataframe = dd.from_delayed(self.parts)
self.npartitions = self.dataframe.npartitions
self.shape = (None, len(self.dataframe.columns))
self.dtype = self.dataframe.dtypes.to_dict()