Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@solid(input_defs=[InputDefinition('word', String)], config={'factor': Int})
def multiply_the_word(context, word):
return word * context.solid_config['factor']
@solid(input_defs=[InputDefinition('file_handle', S3FileHandle)])
def accept_file(context, file_handle):
local_path = context.file_manager.copy_handle_to_local_temp(file_handle)
assert isinstance(local_path, str)
assert open(local_path, 'rb').read() == bar_bytes
input_defs=[InputDefinition('on_complete', Nothing)],
output_defs=[
OutputDefinition(Int, 'num_1'),
OutputDefinition(Int, 'num_2'),
OutputDefinition(Nothing, 'complete'),
],
)
def yield_values(_context):
yield Output(1, 'num_1')
yield Output(2, 'num_2')
yield Output(None, 'complete')
@solid(input_defs=[InputDefinition('stuff', List[Any])])
def collect(_context, stuff):
assert set(stuff) == set([1, None, 'one'])
return stuff
with pytest.raises(DagsterInvalidDefinitionError, match="no input named 'bar'"):
CompositeSolidDefinition(
name='bad',
solid_defs=[echo],
input_mappings=[InputDefinition('mismatch').mapping_to('echo', 'bar')],
)
with pytest.raises(
DagsterInvalidDefinitionError,
match="InputMapping source and destination must have the same type",
):
CompositeSolidDefinition(
name='bad',
solid_defs=[echo],
input_mappings=[InputDefinition('mismatch', str).mapping_to('echo', 'foo')],
)
with pytest.raises(
DagsterInvalidDefinitionError,
match="mappings with same definition name but different definitions",
):
CompositeSolidDefinition(
name='bad',
solid_defs=[echo],
input_mappings=[
InputDefinition('mismatch').mapping_to('echo', 'foo'),
InputDefinition('mismatch').mapping_to('echo_2', 'foo'),
],
)
with pytest.raises(
@lambda_solid(input_defs=[InputDefinition('num', Int)], output_def=OutputDefinition(Int))
def add_one(num):
return num + 1
def test_sql_populate_tables():
create_all_tables_solids = _get_project_solid('create_all_tables')
populate_num_table_solid = _get_project_solid(
'populate_num_table', inputs=[InputDefinition(create_all_tables_solids.name)]
)
context = create_in_mem_context()
pipeline = create_mem_sql_pipeline_context_tuple(
context,
solids=[create_all_tables_solids, populate_num_table_solid],
dependencies={
populate_num_table_solid.name: {
create_all_tables_solids.name: DependencyDefinition(create_all_tables_solids.name)
}
},
)
pipeline_result = execute_pipeline(pipeline, {'context': {'default': {}}})
assert pipeline_result.success
def test_basic_pipeline():
sum_sql_text = '''CREATE TABLE sum_table AS
SELECT num1, num2, num1 + num2 as sum FROM num_table'''
sum_sq_sql_text = '''CREATE TABLE sum_sq_table AS
SELECT num1, num2, sum, sum * sum as sum_sq FROM sum_table'''
sum_sql_solid = create_sql_statement_solid('sum_sql_solid', sum_sql_text)
sum_sq_sql_solid = create_sql_statement_solid(
'sum_sq_sql_solid', sum_sq_sql_text, inputs=[InputDefinition(name=sum_sql_solid.name)]
)
context = in_mem_context()
pipeline = pipeline_test_def(
solids=[sum_sql_solid, sum_sq_sql_solid],
context=context,
dependencies={
'sum_sq_sql_solid': {sum_sql_solid.name: DependencyDefinition(sum_sql_solid.name)}
},
)
pipeline_result = execute_pipeline(pipeline)
assert pipeline_result.success
def create_full_pipeline(context):
create_all_tables_solids = _get_project_solid('create_all_tables')
populate_num_table_solid = _get_project_solid(
'populate_num_table', inputs=[InputDefinition('create_all_tables_solids')]
)
insert_into_sum_table_solid = _get_project_solid(
'insert_into_sum_table', inputs=[InputDefinition('populate_num_table_solid')]
)
insert_into_sum_sq_table_solid = _get_project_solid(
'insert_into_sum_sq_table', inputs=[InputDefinition('insert_into_sum_sq_table')]
)
return create_mem_sql_pipeline_context_tuple(
context,
solids=[
create_all_tables_solids,
populate_num_table_solid,
insert_into_sum_table_solid,
insert_into_sum_sq_table_solid,
],
dependencies={
populate_num_table_solid.name: {
def create_lakehouse_table_def(
name,
lakehouse_fn,
input_tables=None,
other_input_defs=None,
required_resource_keys=None,
metadata=None,
description=None,
):
metadata = check.opt_dict_param(metadata, 'metadata')
input_tables = check.opt_list_param(
input_tables, input_tables, of_type=LakehouseTableInputDefinition
)
other_input_defs = check.opt_list_param(
other_input_defs, other_input_defs, of_type=InputDefinition
)
required_resource_keys = check.opt_set_param(
required_resource_keys, 'required_resource_keys', of_type=str
)
table_type = define_python_dagster_type(
python_type=ITableHandle, name=name, description=description
)
table_input_dict = {input_table.name: input_table for input_table in input_tables}
input_defs = input_tables + other_input_defs
validate_solid_fn('@solid', name, lakehouse_fn, input_defs, ['context'])
def _compute(context, inputs):
'''
Workhouse function of lakehouse. The inputs are something that inherits from ITableHandle.