Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
]
jinja_env = dag.get_template_env()
templates = [
Path(tmpl)
for tmpl in jinja_env.list_templates(
filter_func=lambda tmpl: "qa_checks" in tmpl and tmpl.endswith(".sql")
)
]
valid_stems = (
"qa_checks",
*((dag.params["cdr_type"],) if "cdr_type" in dag.params else ()),
)
template_paths = [tmpl for tmpl in templates if tmpl.parent.stem in valid_stems]
return [
QACheckOperator(
task_id=tmpl.stem
if tmpl.parent.stem == "qa_checks"
else f"{tmpl.stem}.{tmpl.parent.stem}",
sql=str(tmpl),
dag=dag,
)
for tmpl in sorted(template_paths)
]