How to use the foremast.datapipeline.datapipeline.AWSDataPipeline function in foremast

To help you get started, we’ve selected a few foremast examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github foremast / foremast / tests / datapipeline / test_datapipeline.py View on Github external
'pipelineIdList': [{
            "name": "Test Pipeline",
            "id": "1234"
        }, {
            "name": "Other",
            "id": "5678"
        }],
        "hasMoreResults": False
    }]
    generated = {"project": "test"}
    properties = copy.deepcopy(TEST_PROPERTIES)
    mock_get_details.return_value.data = generated
    mock_get_properties.return_value = properties
    mock_boto3.return_value.get_paginator.return_value.paginate.return_value = test_pipelines

    dp = AWSDataPipeline(app='test_app', env='test_env', region='us-east-1', prop_path='other')
    dp.get_pipeline_id()
    assert dp.pipeline_id == '1234'
github foremast / foremast / tests / datapipeline / test_datapipeline.py View on Github external
def test_good_set_pipeline_definition(mock_get_properties, mock_get_details, mock_boto3):
    """Tests that good pipeline definition is set correctly"""
    generated = {"project": "test"}
    properties = copy.deepcopy(TEST_PROPERTIES)
    mock_get_details.return_value.data = generated
    mock_get_properties.return_value = properties

    good_dp = AWSDataPipeline(app='test_app', env='test_env', region='us-east-1', prop_path='other')
    good_dp.pipeline_id = '1'
    assert good_dp.set_pipeline_definition()
github foremast / foremast / tests / datapipeline / test_datapipeline.py View on Github external
def test_bad_set_pipeline_definition(mock_get_properties, mock_get_details, mock_boto3):
    """Tests that bad pipeline definition is caught"""
    generated = {"project": "test"}
    properties = copy.deepcopy(TEST_PROPERTIES)
    properties['datapipeline']['json_definition'] = BAD_DEF
    mock_get_details.return_value.data = generated
    mock_get_properties.return_value = properties

    bad_dp = AWSDataPipeline(app='test_app', env='test_env', region='us-east-1', prop_path='other')
    bad_dp.pipeline_id = '1'
    with pytest.raises(DataPipelineDefinitionError):
        bad_dp.set_pipeline_definition()
github foremast / foremast / tests / datapipeline / test_datapipeline.py View on Github external
def test_create_datapipeline(mock_get_properties, mock_get_details, mock_boto3):
    """Check data pipeline creation"""
    generated = {"project": "test"}
    properties = copy.deepcopy(TEST_PROPERTIES)
    mock_get_details.return_value.data = generated
    mock_get_properties.return_value = properties
    mock_boto3.return_value.create_pipeline.return_value = {'pipelineId': '1234'}

    dp = AWSDataPipeline(app='test_app', env='test_env', region='us-east-1', prop_path='other')
    dp.create_datapipeline()
    assert dp.pipeline_id == '1234'