Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_arrow_table_exceeds_expected_columns(dsn, configuration):
with open_cursor(configuration) as cursor:
with query_fixture(cursor, configuration, 'INSERT INTEGER') as table_name:
arr = pa.array([1, 2])
rb = pa.RecordBatch.from_arrays([arr], ['a'])
table = pa.Table.from_batches([rb, rb])
with pytest.raises(NotImplementedError):
cursor.executemanycolumns("INSERT INTO {} VALUES (?)".format(table_name), table)
def execute(self, event):
schema = self._parse_schema(event['schema']['schema'])
records = {k: [] for k in schema.names}
sqlite_dbname = event['tableName']['schemaName']
sqlite_tablename = event['tableName']['tableName']
s3db = SQLiteDB(S3_BUCKET, S3_PREFIX, sqlite_dbname)
# TODO: How to select field names?
for row in s3db.execute("SELECT {} FROM {}".format(','.join(schema.names), sqlite_tablename)):
for i, name in enumerate(schema.names):
records[name].append(str(row[i]))
pa_records = pa.RecordBatch.from_arrays([pa.array(records[name]) for name in schema.names], schema=schema)
return {
"@type": "ReadRecordsResponse",
"catalogName": event['catalogName'],
"records": {
"aId": str(uuid4()),
"schema": base64.b64encode(schema.serialize().slice(4)).decode("utf-8"),
"records": base64.b64encode(pa_records.serialize().slice(4)).decode("utf-8")
},
"requestType": "READ_RECORDS"
}
columns[idx].append(value)
if rownum % 10000 == 0:
add_arrays(columns)
columns = [[] for x in range(len(column_names))]
if rownum == max_rows:
break
if columns and any(columns):
add_arrays(columns)
data = [
pa.array([item.as_py() for sublist in arr for item in sublist], type=types[idx][0]) if keep[idx] else None
for idx, arr in enumerate(arrs)]
data = [x for x in data if x is not None]
batch = pa.RecordBatch.from_arrays(data, [column_names[x] for x in range(len(arrs)) if keep[x]])
table = pa.Table.from_batches([batch])
pq.write_table(table,
output_file,
version='1.0',
compression=codec,
use_dictionary=True,
row_group_size=row_group_size)
import pyarrow as pa
#conda install pyarrow
data = [
pa.array([1, 2, 3, 4]),
pa.array(['foo', 'bar', 'baz', None]),
pa.array([True, None, False, True])
]
batch = pa.RecordBatch.from_arrays(data, ['f0', 'f1', 'f2'])
print(batch)
print(batch.num_rows)
print(batch.num_columns)
sink = pa.BufferOutputStream()
writer = pa.RecordBatchFileWriter(sink, batch.schema)
for i in range(5):
writer.write_batch(batch)
writer.close()
buf = sink.get_result()
with open('threecols.dat', 'wb') as f:
def write_data(data_dict, file_name):
keys = sorted(data_dict.keys())
data = [pa.array(data_dict[k]) for k in keys]
batch = pa.RecordBatch.from_arrays(data, keys)
writer = pa.RecordBatchStreamWriter(file_name, batch.schema)
writer.write(batch)
writer.close()
def _convert_data_with_column_names_dict(data, schema):
column_data = {}
array_data = []
schema_names = []
for row in data:
for column in schema:
_col = column_data.get(column, [])
_col.append(row.get(column))
column_data[column] = _col
for column in schema.keys():
_col = column_data.get(column)
array_data.append(pa.array(_col))
# Use custom column names given by user
schema_names.append(schema[column])
return pa.RecordBatch.from_arrays(array_data, schema_names)
def _flush_buffer(self):
arrays = [[] for _ in range(len(self._schema.names))]
for x, y in enumerate(self._buffer):
arrays[x] = pa.array(y, type=self._schema.types[x])
self._buffer[x] = []
rb = pa.RecordBatch.from_arrays(arrays, self._schema.names)
self._record_batches.append(rb)
size = 0
for x in arrays:
for b in x.buffers():
size = size + b.size
self._record_batches_byte_size = self._record_batches_byte_size + size