Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_essential_parameters_ng(self):
"""
EssentialParameters invalid case
"""
with pytest.raises(CliboaException) as excinfo:
valid = EssentialParameters("DummyClass", [""])
valid()
assert "is not specified" in str(excinfo.value)
def execute(self, *args):
super().execute()
valid = EssentialParameters(self.__class__.__name__, [self._src_pattern])
valid()
client = Gcs.get_gcs_client(self._credentials)
bucket = client.get_bucket(self._bucket)
dl_files = []
for blob in bucket.list_blobs(prefix=self._prefix, delimiter=self._delimiter):
r = re.compile(self._src_pattern)
if not r.fullmatch(blob.name):
continue
dl_files.append(blob.name)
blob.download_to_filename(
os.path.join(self._dest_dir, os.path.basename(blob.name))
)
ObjectStore.put(self._step, dl_files)
def execute(self, *args):
super().execute()
param_valid = EssentialParameters(self.__class__.__name__, [self._tblname])
param_valid()
tbl_valid = SqliteTableExistence(self._dbname, self._tblname)
tbl_valid()
output_valid = IOOutput(self._io)
output_valid()
# get table column definition
self._sqlite_adptr.connect(self._dbname)
column_def = self.__get_column_def()
if self._refresh is True:
self.__refresh_table(column_def)
# database transaction
def execute(self, *args):
if self.__form_auth is True:
valid = EssentialParameters(
self.__class__.__name__,
[self.__form_url, self.__form_id, self.__form_password],
)
valid()
def execute(self, *args):
# essential parameters check
valid = EssentialParameters(
self.__class__.__name__, [self._src_dir, self._src_pattern, self._tblname]
)
valid()
files = super().get_target_files(self._src_dir, self._src_pattern)
self._logger.info("Files found %s" % files)
if len(files) > 1:
raise Exception("Input file must be only one.")
if len(files) == 0:
raise FileNotFound("No csv file was found.")
def func():
if self._refresh is True:
# Drop table in advance, If refresh is True
def execute(self, *args):
# essential parameters check
valid = EssentialParameters(
self.__class__.__name__,
[self._host, self._user, self._src_dir, self._src_pattern],
)
valid()
os.makedirs(self._dest_dir, exist_ok=True)
# fetch src
sftp = Sftp(
self._host,
self._user,
self._password,
self._key,
self._timeout,
self._retry_count,
self._port,
def execute(self, *args):
# essential parameters check
param_valid = EssentialParameters(self.__class__.__name__, [self._dbname])
param_valid()
def execute(self, *args):
valid = EssentialParameters(self.__class__.__name__, [self._bucket])
valid()
def execute(self, *args):
super().execute()
valid = EssentialParameters(self.__class__.__name__, [self._src_pattern])
valid()
client = self._s3_client()
p = client.get_paginator("list_objects")
for page in p.paginate(
Bucket=self._bucket, Delimiter=self._delimiter, Prefix=self._prefix
):
for c in page.get("Contents", []):
filename = c.get("Key")
rec = re.compile(self._src_pattern)
if not rec.fullmatch(filename):
continue
dest_path = os.path.join(self._dest_dir, os.path.basename(filename))
client.download_file(self._bucket, filename, dest_path)
def execute(self, *args):
super().execute()
input_valid = IOInput(self._io)
input_valid()
param_valid = EssentialParameters(self.__class__.__name__, [self._tblname])
param_valid()
tbl_valid = SqliteTableExistence(self._dbname, self._tblname)
tbl_valid()
def dict_factory(cursor, row):
d = {}
for i, col in enumerate(cursor.description):
d[col[0]] = row[i]
return d
self._sqlite_adptr.connect(self._dbname)
cur = self._sqlite_adptr.fetch(sql=self.__get_query(), row_factory=dict_factory)
for r in cur:
self._s.save(r)