Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_execute_ng_no_multiple_files(self):
# create test files
os.makedirs(self._data_dir)
test1_csv = os.path.join(self._data_dir, "test1.csv")
test2_csv = os.path.join(self._data_dir, "test2.csv")
open(test1_csv, "w").close
open(test2_csv, "w").close
with pytest.raises(InvalidCount) as execinfo:
# set the essential attributes
instance = CsvHeaderConvert()
Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
Helper.set_property(instance, "src_dir", self._data_dir)
Helper.set_property(instance, "src_pattern", "test(.*)\.csv")
Helper.set_property(instance, "dest_dir", self._data_dir)
Helper.set_property(instance, "dest_pattern", "test_new.csv")
Helper.set_property(
instance, "headers", [{"key": "new_key"}, {"data": "new_data"}]
)
instance.execute()
shutil.rmtree(self._data_dir)
assert "only one" in str(execinfo.value)
def test_execute(self):
STR_UTF8 = "いろはにほへと"
try:
# create test file
os.makedirs(self._data_dir)
test_file = os.path.join(self._data_dir, "test.txt")
with open(test_file, "w") as t:
t.write(STR_UTF8)
# set the essential attributes
instance = FileConvert()
Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
Helper.set_property(instance, "src_dir", self._data_dir)
Helper.set_property(instance, "src_pattern", r"test\.txt")
Helper.set_property(instance, "encoding_from", "utf-8")
Helper.set_property(instance, "encoding_to", "utf-16")
instance.execute()
with open(test_file, errors="ignore") as t:
str_utf16 = t.read()
assert str_utf16 != STR_UTF8
# set the essential attributes
instance = FileConvert()
Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
Helper.set_property(instance, "src_dir", self._data_dir)
Helper.set_property(instance, "src_pattern", r"test\.txt")
def __init__(self, cmd_args):
"""
Set project directory, scenario file path, scenario file format, other command line arguments
"""
logging.config.fileConfig(
env.BASE_DIR + "/conf/logging.conf", disable_existing_loggers=False
)
self._logger = LisboaLog.get_logger(__name__)
self._pj_scenario_dir = os.path.join(
env.PROJECT_DIR, cmd_args.project_name, env.SCENARIO_DIR_NAME
)
self._cmd_args = cmd_args
q = StepQueueFactory.create("step")
for s_dict in yaml_scenario_list:
if "multi_process_count" in s_dict.keys():
q.multi_proc_cnt = s_dict.get("multi_process_count")
continue
instances = []
if "parallel" in s_dict.keys():
for row in s_dict.get("parallel"):
instance = self.__create_instance(row, yaml_scenario_list)
Helper.set_property(
instance,
"logger",
LisboaLog.get_logger(instance.__class__.__name__),
)
instances.append(instance)
StepArgument._put(row["step"], instance)
else:
instance = self.__create_instance(s_dict, yaml_scenario_list)
Helper.set_property(
instance,
"logger",
LisboaLog.get_logger(instance.__class__.__name__),
)
instances.append(instance)
StepArgument._put(s_dict["step"], instance)
# Put instance to queue
q.push(instances)
def __init__(self, cmd_args):
"""
Args:
cmd_args: command line arguments
"""
self._logger = LisboaLog.get_logger(__name__)
self._scenario_queue = ScenarioQueue
self._cmd_args = cmd_args
self._listeners = []
def __init__(self):
self._logger = LisboaLog.get_logger(__name__)
@staticmethod
def auth(credentials):
if not credentials:
return None
if isinstance(credentials, dict):
return service_account.Credentials.from_service_account_info(credentials)
else:
return service_account.Credentials.from_service_account_file(credentials)
class BigQuery(object):
"""
bigquery api wrapper
"""
_logger = LisboaLog.get_logger(__name__)
@staticmethod
def get_bigquery_client(credentials):
"""
get bigquery client object
Args:
credentials: gcp service account json
"""
return bigquery.Client(credentials=ServiceAccount.auth(credentials))
@staticmethod
def get_extract_job_config(print_header=True):
return bigquery.ExtractJobConfig(print_header=print_header)
@staticmethod
def get_query_job_config():