Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_parse_schema(self):
buf = pa.py_buffer(schema_data)
result = _load_schema(buf)
expected = pa.schema([
pa.field("depdelay", pa.int16()),
pa.field("arrdelay", pa.int16())
])
assert result.equals(expected)
def test_ingest_with_datetime():
"""
Test ingesting datetime data with a given schema
"""
schema = pa.schema([
pa.field("foo", pa.int64()),
pa.field("bar", pa.int64()),
pa.field("baz", pa.timestamp("ns"))
])
data = [{"foo": 1, "bar": 2, "baz": "2018-01-01 01:02:03"}, {"foo": 10, "bar": 20, "baz": "2018-01-02 01:02:03"}]
converted_data = client.ingest_data(data, schema)
timestamp_values = [pd.to_datetime("2018-01-01 01:02:03"), pd.to_datetime("2018-01-02 01:02:03")]
assert converted_data.to_pydict() == {'foo': [1, 10], 'bar': [2, 20], 'baz': timestamp_values}
def generate_data(pyarrow_type, column_meta, source_data_generator, batch_count, batch_row_count):
stream = BytesIO()
assert len(pyarrow_type) == len(column_meta)
column_size = len(pyarrow_type)
fields = []
for i in range(column_size):
fields.append(pyarrow.field("column_{}".format(i), pyarrow_type[i], True, column_meta[i]))
schema = pyarrow.schema(fields)
expected_data = []
writer = RecordBatchStreamWriter(stream, schema)
for i in range(batch_count):
column_arrays = []
py_arrays = []
for j in range(column_size):
column_data = []
not_none_cnt = 0
while not_none_cnt == 0:
column_data = []
for _ in range(batch_row_count):
data = None if bool(random.getrandbits(1)) else source_data_generator()
if data is not None:
def _convert_schema(redshift_schema, partition_columns):
fields = []
for column in redshift_schema:
column_name = column[0]
if column_name in partition_columns:
# Allow skipping virtual columns used for partitioning
continue
column_type = column[1].upper()
numeric_precision = column[2]
numeric_scale = column[3]
datetime_precision = column[4]
converted_type = _convert_type(column_type, numeric_precision, numeric_scale, datetime_precision)
fields.append(pa.field(column_name, converted_type))
schema = pa.schema(fields)
return schema
pa_schema_list = []
for field in string_schema:
field_type = field['type']
field_name = field['name']
field_mode = field['mode']
converted_field_type = type_conversions[field_type]
if converted_field_type is None:
error_message = 'Error: json schema included a {0:s} field. ' \
'BYTE, GEOGRAPHY, and RECORD types cannot ' \
'currently be used when outputting to ' \
'parquet.'.format(field_type)
logging.error(error_message)
raise ValueError(error_message)
else:
nullable = False if field_mode == 'REQUIRED' else True
pa_field = pa.field(
name=field_name,
type=converted_field_type
#nullable=nullable
)
pa_schema_list.append(pa_field)
return pa.schema(pa_schema_list)
# https://issues.apache.org/jira/browse/ARROW-8142
if len(table) == 0:
df = table.to_pandas(date_as_object=True)
new_types = {
col: df[col].cat.categories.dtype
for col in df.select_dtypes("category")
}
if new_types:
df = df.astype(new_types)
table = pa.Table.from_pandas(df)
else:
schema = table.schema
for i in range(len(schema)):
field = schema[i]
if pa.types.is_dictionary(field.type):
new_field = pa.field(
field.name,
field.type.value_type,
field.nullable,
field.metadata,
)
schema = schema.remove(i).insert(i, new_field)
table = table.cast(schema)
else:
for i in range(table.num_columns):
col = table[i]
if col.name in exclude:
continue
if pa.types.is_dictionary(col.type):
new_type = col.data.chunk(0).dictionary.type
new_col = col.cast(new_type)
pa.field('value', pa.string()),
pa.field('arguments', pa.string()),
pa.field('time_stamp', pa.string(), nullable=False)
]
PQ_SCHEMAS['javascript'] = pa.schema(fields)
# javascript_cookies
fields = [
pa.field('crawl_id', pa.uint32()),
pa.field('visit_id', pa.int64()),
pa.field('instance_id', pa.uint32(), nullable=False),
pa.field('extension_session_uuid', pa.string()),
pa.field('event_ordinal', pa.int64()),
pa.field('record_type', pa.string()),
pa.field('change_cause', pa.string()),
pa.field('expiry', pa.string()),
pa.field('is_http_only', pa.bool_()),
pa.field('is_host_only', pa.bool_()),
pa.field('is_session', pa.bool_()),
pa.field('host', pa.string()),
pa.field('is_secure', pa.bool_()),
pa.field('name', pa.string()),
pa.field('path', pa.string()),
pa.field('value', pa.string()),
pa.field('same_site', pa.string()),
pa.field('first_party_domain', pa.string()),
pa.field('store_id', pa.string()),
pa.field('time_stamp', pa.string())
]
PQ_SCHEMAS['javascript_cookies'] = pa.schema(fields)
# navigations
pa.field('window_id', pa.int64()),
pa.field('tab_id', pa.int64()),
pa.field('tab_opener_tab_id', pa.int64()),
pa.field('frame_id', pa.int64()),
pa.field('parent_frame_id', pa.int64()),
pa.field('window_width', pa.int64()),
pa.field('window_height', pa.int64()),
pa.field('window_type', pa.string()),
pa.field('tab_width', pa.int64()),
pa.field('tab_height', pa.int64()),
pa.field('tab_cookie_store_id', pa.string()),
pa.field('uuid', pa.string()),
pa.field('url', pa.string()),
pa.field('transition_qualifiers', pa.string()),
pa.field('transition_type', pa.string()),
pa.field('before_navigate_event_ordinal', pa.int64()),
pa.field('before_navigate_time_stamp', pa.string()),
pa.field('committed_event_ordinal', pa.int64()),
pa.field('time_stamp', pa.string())
]
PQ_SCHEMAS['navigations'] = pa.schema(fields)