Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ingest():
"""
Test ingesting data with a given schema
"""
schema = pa.schema([
pa.field("foo", pa.int64()),
pa.field("bar", pa.int64())
])
data = [{"foo": 1, "bar": 2}, {"foo": 10, "bar": 20}]
converted_data = client.ingest_data(data, schema)
assert converted_data.to_pydict() == {'foo': [1, 10], 'bar': [2, 20]}
Infer PyArrow schema for tabular data.
Args:
data (List[dict]): A list of dictionaries representing rows in a table.
Returns:
A PyArrow schema object.
"""
column_data = OrderedDict()
for row in data:
for key, value in row.items():
column_data.setdefault(key, []).append(value)
column_types = OrderedDict([
(key, pa.array(value).type) for key, value in column_data.items()
])
return pa.schema(list(column_types.items()))
schema_changed = False
for field in schema:
name = field.name
sanitized_name = _sanitized_spark_field_name(name)
if sanitized_name != name:
schema_changed = True
sanitized_field = pa.field(sanitized_name, field.type,
field.nullable, field.metadata)
sanitized_fields.append(sanitized_field)
else:
sanitized_fields.append(field)
new_schema = pa.schema(sanitized_fields, metadata=schema.metadata)
return new_schema, schema_changed
else:
return schema, False
cols_payload.append((name, cmd, field))
ordered = []
for k in partition_keys:
if k in cols_partition:
ordered.append(cols_partition[k])
ordered += [(cmd, f) for _name, cmd, f in sorted(cols_payload, key=lambda x: x[0])]
ordered += cols_misc
pandas_metadata["columns"] = [cmd for cmd, _ in ordered]
fields = [f for _, f in ordered if f is not None]
metadata = schema.metadata
metadata[b"pandas"] = _dict_to_binary(pandas_metadata)
schema = pa.schema(fields, metadata)
return SchemaWrapper(schema, origin)
def _build_pyarrow_schema(self, columns):
"""Return a pyarrow schema based on the SQLite data types, but for now ... everything is a string :)"""
return pa.schema(
[(col[0], pa.string()) for col in columns]
)
metadata = schema.metadata
if metadata is None or b"pandas" not in metadata:
raise ValueError(
"Pandas and non-Pandas schemas are not comparable. "
"Use ignore_pandas=True if you only want to compare "
"on Arrow level."
)
pandas_metadata = load_json(metadata[b"pandas"].decode("utf8"))
# we don't care about the pandas version, since we assume it's safe
# to read datasets that were written by older or newer versions.
pandas_metadata["pandas_version"] = "{}".format(pd.__version__)
metadata_clean = deepcopy(metadata)
metadata_clean[b"pandas"] = _dict_to_binary(pandas_metadata)
current = SchemaWrapper(pa.schema(schema, metadata_clean), schema.origin)
else:
current = schema
# If a field is null we cannot compare it and must therefore reject it
null_columns = {field.name for field in current if field.type == pa.null()}
# Determine a valid reference schema. A valid reference schema is considered to be the schema
# of all input schemas with the least empty columns.
# The reference schema ought to be a schema whose empty columns are a true subset for all sets
# of empty columns. This ensures that the actual reference schema is the schema with the most
# information possible. A schema which doesn't fulfil this requirement would weaken the
# comparison and would allow for false positives
# Trivial case
if reference is None:
reference = current
if name is None:
continue
field_name = cmd["field_name"]
field_idx = schema.get_field_index(field_name)
field = schema[field_idx]
(
fields[field_name],
cmd["pandas_type"],
cmd["numpy_type"],
cmd["metadata"],
) = normalize_type(
field.type, cmd["pandas_type"], cmd["numpy_type"], cmd["metadata"]
)
metadata = schema.metadata
metadata[b"pandas"] = _dict_to_binary(pandas_metadata)
schema = pa.schema([pa.field(n, t) for n, t in fields.items()], metadata)
return normalize_column_order(SchemaWrapper(schema, origin), partition_keys)