How to use the etlhelper.copy_rows function in etlhelper

To help you get started, we’ve selected a few etlhelper examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github BritishGeologicalSurvey / etlhelper / test / integration / etl / View on Github external
def test_copy_rows_happy_path(pgtestdb_conn, pgtestdb_test_tables,
                              pgtestdb_insert_sql, test_table_data):
    # Arrange and act
    select_sql = "SELECT * FROM src"
    insert_sql = pgtestdb_insert_sql.replace('src', 'dest')
    copy_rows(select_sql, pgtestdb_conn, insert_sql, pgtestdb_conn)

    # Assert
    sql = "SELECT * FROM dest"
    result = iter_rows(sql, pgtestdb_conn)
    assert list(result) == test_table_data
github BritishGeologicalSurvey / etlhelper / test / integration / db / View on Github external
def test_copy_rows_happy_path(test_tables, testdb_conn, test_table_data):
    # Arrange and act
    select_sql = "SELECT * FROM src"
    insert_sql = INSERT_SQL.format(tablename='dest')
    copy_rows(select_sql, testdb_conn, insert_sql, testdb_conn)

    # Assert
    sql = "SELECT * FROM dest"
    result = get_rows(sql, testdb_conn)

    # Oracle returns date object as datetime, convert test data to compare
    for i, row in enumerate(test_table_data):
        row_with_datetimes = [datetime.combine(x, datetime.min.time())
                              if isinstance(x, date) and not isinstance(x, datetime)
                              else x
                              for x in row]
        assert result[i] == tuple(row_with_datetimes)
github BritishGeologicalSurvey / etlhelper / test / integration / etl / View on Github external
def test_logging_copy_rows(caplog, level, expected,
                           pgtestdb_conn, pgtestdb_test_tables,
                           pgtestdb_insert_sql, test_table_data):
    # Arrange
    etlhelper.etl.CHUNKSIZE = 1
    select_sql = "SELECT * FROM src"
    insert_sql = pgtestdb_insert_sql.replace('src', 'dest')

    # Act
    copy_rows(select_sql, pgtestdb_conn, insert_sql, pgtestdb_conn)
    # ID for connection object and hostname vary between tests
    # and test environments
    messages = [re.sub(r'object at .*;', 'object at ???;', m)
                for m in caplog.messages]
    messages = [re.sub(r'host=.*? ', 'host=??? ', m)
                for m in messages]

    # Assert
    for i, message in enumerate(messages):
        assert message == expected[i]
github BritishGeologicalSurvey / etlhelper / test / integration / etl / View on Github external
def test_copy_rows_transform(pgtestdb_conn, pgtestdb_test_tables, my_transform):
    # Arrange
    select_sql = "SELECT * FROM src"
    insert_sql = "INSERT INTO dest (id) VALUES (%s)"

    expected = [(2, None, None, None, None, None),
                (3, None, None, None, None, None)]

    # Act
    copy_rows(select_sql, pgtestdb_conn, insert_sql, pgtestdb_conn,

    # Assert
    sql = "SELECT * FROM dest"
    result = iter_rows(sql, pgtestdb_conn)
    assert list(result) == expected
github BritishGeologicalSurvey / etlhelper / test / integration / db / View on Github external
def test_copy_rows_happy_path(test_tables, testdb_conn, test_table_data):
    # Arrange and act
    select_sql = "SELECT * FROM src"
    insert_sql = INSERT_SQL.format(tablename='dest')
    copy_rows(select_sql, testdb_conn, insert_sql, testdb_conn)

    # Assert
    sql = "SELECT * FROM dest"
    result = get_rows(sql, testdb_conn)

    # Fix result date and datetime strings to native classes
    fixed = []
    for row in result:
            dt.datetime.strptime(, '%Y-%m-%d').date(),
            dt.datetime.strptime(row.date_time, '%Y-%m-%d %H:%M:%S')

    assert fixed == test_table_data