Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
plain_filters = {}
subquery_filters = {}
model = query.model
filters = prepare_filters(query, filters)
for key, val in filters.items():
if is_filter_deep(model, key):
join_field, join_filter_key = key.split(DELIM, 1)
subquery_filters.setdefault(join_field, {}).update({join_filter_key: val})
else:
plain_filters[key] = val
query = filter_query(query, plain_filters)
for key, val in subquery_filters.items():
field = getattr(model, key)
rel_model = field.rel_model
query = query.where(NodeList([
SQL('EXISTS'),
rel_model.select(SQL('1')).filter(**val).where(field == rel_model._meta.primary_key)
]))
return query
def add_not_null(db, migrator, table, column_name, field):
cmds = []
compiler = db.compiler()
if field.default is not None:
# if default is a function, turn it into a value
# this won't work on columns requiring uniquiness, like UUIDs
# as all columns will share the same called value
default = field.default() if hasattr(field.default, '__call__') else field.default
op = pw.Clause(pw.SQL('UPDATE'), pw.Entity(table), pw.SQL('SET'), field.as_entity(), pw.SQL('='), default, pw.SQL('WHERE'), field.as_entity(), pw.SQL('IS NULL'))
cmds.append(compiler.parse_node(op))
if is_postgres(db) or is_sqlite(db):
junk = migrator.add_not_null(table, column_name, generate=True)
cmds += normalize_whatever_junk_peewee_migrations_gives_you(migrator, junk)
return cmds
elif is_mysql(db):
op = pw.Clause(pw.SQL('ALTER TABLE'), pw.Entity(table), pw.SQL('MODIFY'), compiler.field_definition(field))
cmds.append(compiler.parse_node(op))
return cmds
raise Exception('how do i add a not null for %s?' % db)
columns.append(field.ddl(ctx))
if isinstance(field, ForeignKeyField):
constraints.append(field.foreign_key_constraint())
meta_options = getattr(self.model._meta, 'options', None) or {}
if meta_options or options:
meta_options.update(options or {})
for key, value in sorted(meta_options.items()):
if isinstance(value, Node):
value = value
elif is_model(value):
value = value._meta.table
else:
value = SQL(value)
extra.append(Clause((SQL(key), extra), glue='='))
ctx.sql(EnclosedNodeList(columns + constraints + extra))
return ctx
def between(self, low, high):
return Expression(self, OP_BETWEEN, Clause(low, R('AND'), high))
def regexp(self, expression):
def rename_column(db, migrator, ntn, ocn, ncn, field):
compiler = db.compiler()
if is_mysql(db):
junk = pw.Clause(
pw.SQL('ALTER TABLE'), pw.Entity(ntn), pw.SQL('CHANGE'), pw.Entity(ocn), compiler.field_definition(field)
)
else:
junk = migrator.rename_column(ntn, ocn, ncn, generate=True)
return normalize_whatever_junk_peewee_migrations_gives_you(migrator, junk)
def clone_base(self):
return SQL(self.value, *self.params)
R = SQL # backwards-compat.
with ctx(parentheses=True):
parts = []
if self.partition_by:
parts.extend((
SQL('PARTITION BY'),
CommaNodeList(self.partition_by)))
if self.order_by:
parts.extend((
SQL('ORDER BY'),
CommaNodeList(self.order_by)))
if self.start is not None and self.end is not None:
parts.extend((
SQL('RANGE BETWEEN'),
self.start,
SQL('AND'),
self.end))
elif self.start is not None:
parts.extend((SQL('RANGE'), self.start))
ctx.sql(NodeList(parts))
return ctx
class OverclockProfileType(Enum):
DEFAULT = 'default'
OFFSET = 'offset'
class OverclockProfile(Model):
id = AutoIncrementField()
type = CharField(
constraints=[Check(f"type='{OverclockProfileType.DEFAULT.value}' "
f"OR type='{OverclockProfileType.OFFSET.value}'")],
default=OverclockProfileType.OFFSET.value)
name = CharField()
gpu = IntegerField(default=0)
memory = IntegerField(default=0)
read_only = BooleanField(default=False)
timestamp = DateTimeField(constraints=[SQL('DEFAULT CURRENT_TIMESTAMP')])
class Meta:
legacy_table_names = False
database = INJECTOR.get(SqliteDatabase)
@post_save(sender=OverclockProfile)
def on_overclock_profile_added(_: Any, profile: OverclockProfile, created: bool) -> None:
LOG.debug("Overclock added")
OVERCLOCK_PROFILE_CHANGED_SUBJECT.on_next(DbChange(profile, DbChange.INSERT if created else DbChange.UPDATE))
@post_delete(sender=OverclockProfile)
def on_overclock_profile_deleted(_: Any, profile: OverclockProfile) -> None:
LOG.debug("Overclock deleted")
OVERCLOCK_PROFILE_CHANGED_SUBJECT.on_next(DbChange(profile, DbChange.DELETE))
def change_column_type(db, migrator, table_name, column_name, field):
column_type = _field_type(field)
if is_postgres(db):
op = pw.Clause(pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL('ALTER'), field.as_entity(), pw.SQL('TYPE'), field.__ddl_column__(column_type))
elif is_mysql(db):
op = pw.Clause(*[pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL('MODIFY')] + field.__ddl__(column_type))
else:
raise Exception('how do i change a column type for %s?' % db)
return normalize_whatever_junk_peewee_migrations_gives_you(migrator, op)