Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for m in mods:
for dag in list(m.__dict__.values()):
if isinstance(dag, DAG):
if not dag.full_filepath:
dag.full_filepath = filepath
if dag.fileloc != filepath and not is_zipfile:
dag.fileloc = filepath
try:
dag.is_subdag = False
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
if isinstance(dag._schedule_interval, str):
croniter(dag._schedule_interval)
found_dags.append(dag)
found_dags += dag.subdags
except (CroniterBadCronError,
CroniterBadDateError,
CroniterNotAlphaError) as cron_e:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = \
"Invalid Cron expression: " + str(cron_e)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
except AirflowDagCycleException as cycle_exception:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = str(cycle_exception)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
self.file_last_changed[filepath] = file_last_changed_on_disk
return found_dags