Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_join(self):
left_rows = [(six.text_type(i), i) for i in range(100000)]
right_rows = [(six.text_type(i), i) for i in range(100000)]
shuffle(left_rows)
shuffle(right_rows)
column_names = ['text', 'number']
column_types = [agate.Text(), agate.Number()]
left = agate.Table(left_rows, column_names, column_types)
right = agate.Table(right_rows, column_names, column_types)
def test():
left.join(right, 'text')
results = Timer(test).repeat(10, 1)
min_time = min(results)
self.assertLess(min_time, 0)
def test_to_sql_create_statement_zero_width(self):
rows = ((1, ''), (2, ''))
column_names = ['id', 'name']
column_types = [agate.Number(), agate.Text()]
table = agate.Table(rows, column_names, column_types)
statement = table.to_sql_create_statement('test_table', db_schema='test_schema', dialect='mysql')
self.assertEqual(statement.replace('\t', ' '), '''CREATE TABLE test_schema.test_table (
id DECIMAL(38, 0) NOT NULL,
def test_create_if_not_exists(self):
column_names = ['id', 'name']
column_types = [agate.Number(), agate.Text()]
rows1 = (
(1, 'Jake'),
(2, 'Howard'),
)
rows2 = (
(3, 'Liz'),
(4, 'Tim'),
)
table1 = agate.Table(rows1, column_names, column_types)
table2 = agate.Table(rows2, column_names, column_types)
engine = create_engine(self.connection_string)
connection = engine.connect()
# Write two agate tables into the same SQL table
def test_to_sql_create_statement_wide_width(self):
rows = ((1, 'x' * 21845), (2, ''))
column_names = ['id', 'name']
column_types = [agate.Number(), agate.Text()]
table = agate.Table(rows, column_names, column_types)
statement = table.to_sql_create_statement('test_table', db_schema='test_schema', dialect='mysql')
self.assertEqual(statement.replace('\t', ' '), '''CREATE TABLE test_schema.test_table (
id DECIMAL(38, 0) NOT NULL,
label = u'{label:{label_column_width}}'.format(**{
'label_column_width': label_column_width,
'label': op_data['label']
})
if op_name == 'freq':
for i, row in enumerate(column_stats['freq']):
if i == 0:
self.output_file.write('\t{} '.format(label))
else:
self.output_file.write(u'\t{label:{label_column_width}} '.format(**{
'label_column_width': label_column_width,
'label': ''
}))
if isinstance(column.data_type, agate.Number):
v = row[column_name]
if self.is_finite_decimal(v):
v = format_decimal(v, locale=agate.config.get_option('default_locale'))
else:
v = six.text_type(row[column_name])
self.output_file.write(u'{} ({}x)\n'.format(v, row['Count']))
continue
v = column_stats[op_name]
if op_name == 'nulls' and v:
v = '%s (excluded from calculations)' % v
elif op_name == 'len':
column_names = []
column_types = []
for sql_column in sql_table.columns:
column_names.append(sql_column.name)
if type(sql_column.type) in INTERVAL_MAP.values():
py_type = datetime.timedelta
else:
py_type = sql_column.type.python_type
if py_type in [int, float, decimal.Decimal]:
if py_type is float:
sql_column.type.asdecimal = True
column_types.append(agate.Number())
elif py_type is bool:
column_types.append(agate.Boolean())
elif issubclass(py_type, six.string_types):
column_types.append(agate.Text())
elif py_type is datetime.date:
column_types.append(agate.Date())
elif py_type is datetime.datetime:
column_types.append(agate.DateTime())
elif py_type is datetime.timedelta:
column_types.append(agate.TimeDelta())
else:
raise ValueError('Unsupported sqlalchemy column type: %s' % type(sql_column.type))
s = select([sql_table])
rows = connection.execute(s)
def get_column_types(self):
if getattr(self.args, 'blanks', None):
type_kwargs = {'null_values': ()}
else:
type_kwargs = {}
text_type = agate.Text(**type_kwargs)
if self.args.no_inference:
types = [text_type]
else:
number_type = agate.Number(locale=self.args.locale, **type_kwargs)
# See the order in the `agate.TypeTester` class.
types = [
agate.Boolean(**type_kwargs),
agate.TimeDelta(**type_kwargs),
agate.Date(date_format=self.args.date_format, **type_kwargs),
agate.DateTime(datetime_format=self.args.datetime_format, **type_kwargs),
text_type,
]
# In order to parse dates like "20010101".
if self.args.date_format or self.args.datetime_format:
types.insert(-1, number_type)
else:
types.insert(1, number_type)
def convert_agate_type(cls, agate_table, col_idx):
agate_type = agate_table.column_types[col_idx]
conversions = [
(agate.Text, cls.convert_text_type),
(agate.Number, cls.convert_number_type),
(agate.Boolean, cls.convert_boolean_type),
(agate.DateTime, cls.convert_datetime_type),
(agate.Date, cls.convert_date_type),
(agate.TimeDelta, cls.convert_time_type),
]
for agate_cls, func in conversions:
if isinstance(agate_type, agate_cls):
return func(agate_table, col_idx)
def make_sql_table(table, table_name, dialect=None, db_schema=None, constraints=True, unique_constraint=[],
connection=None, min_col_len=1, col_len_multiplier=1):
"""
Generates a SQL alchemy table from an agate table.
"""
metadata = MetaData(connection)
sql_table = Table(table_name, metadata, schema=db_schema)
SQL_TYPE_MAP[agate.Boolean] = BOOLEAN_MAP.get(dialect, BOOLEAN)
SQL_TYPE_MAP[agate.DateTime] = DATETIME_MAP.get(dialect, TIMESTAMP)
SQL_TYPE_MAP[agate.Number] = NUMBER_MAP.get(dialect, DECIMAL)
SQL_TYPE_MAP[agate.TimeDelta] = INTERVAL_MAP.get(dialect, Interval)
for column_name, column in table.columns.items():
sql_column_type = None
sql_type_kwargs = {}
sql_column_kwargs = {}
if constraints:
if isinstance(column.data_type, agate.Text) and dialect == 'mysql':
length = table.aggregate(agate.MaxLength(column_name)) * decimal.Decimal(col_len_multiplier)
if length > 21844:
# @see https://dev.mysql.com/doc/refman/5.7/en/string-type-overview.html
sql_column_type = TEXT
else:
# If length is zero, SQLAlchemy may raise "VARCHAR requires a length on dialect mysql".
sql_type_kwargs['length'] = length if length >= min_col_len else min_col_len