Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Monad.__init__(monad, translator, func)
monad.func = func
registered_functions = SQLTranslator.registered_functions = {}
class FuncMonadMeta(MonadMeta):
def __new__(meta, cls_name, bases, cls_dict):
func = cls_dict.get('func')
monad_cls = super(FuncMonadMeta, meta).__new__(meta, cls_name, bases, cls_dict)
if func:
if type(func) is tuple: functions = func
else: functions = (func,)
for func in functions: registered_functions[func] = monad_cls
return monad_cls
class FuncMonad(with_metaclass(FuncMonadMeta, Monad)):
def __call__(monad, *args, **kwargs):
translator = monad.translator
for arg in args:
assert isinstance(arg, translator.Monad)
for value in kwargs.values():
assert isinstance(value, translator.Monad)
try: return monad.call(*args, **kwargs)
except TypeError as exc:
reraise_improved_typeerror(exc, 'call', monad.type.__name__)
class FuncBufferMonad(FuncMonad):
func = buffer
def call(monad, source, encoding=None, errors=None):
translator = monad.translator
if not isinstance(source, translator.StringConstMonad): throw(TypeError)
source = source.value
return monad, path
def cast_from_json(monad, type):
translator = monad.translator
if issubclass(type, Json):
if not translator.json_values_are_comparable: throw(TranslationError,
'%s does not support comparison of json structures: {EXPR}' % translator.dialect)
return monad
base_monad, path = monad.get_path()
sql = [ 'JSON_VALUE', base_monad.getsql()[0], path, type ]
return translator.ExprMonad.new(translator, Json if type is NoneType else type, sql)
def getsql(monad):
base_monad, path = monad.get_path()
base_sql = base_monad.getsql()[0]
return [ [ 'JSON_QUERY', base_sql, path ] ]
class ConstMonad(Monad):
@staticmethod
def new(translator, value):
value_type = get_normalized_type_of(value)
if value_type in numeric_types: cls = translator.NumericConstMonad
elif value_type is unicode: cls = translator.StringConstMonad
elif value_type is date: cls = translator.DateConstMonad
elif value_type is time: cls = translator.TimeConstMonad
elif value_type is timedelta: cls = translator.TimedeltaConstMonad
elif value_type is datetime: cls = translator.DatetimeConstMonad
elif value_type is NoneType: cls = translator.NoneMonad
elif value_type is buffer: cls = translator.BufferConstMonad
elif value_type is Json: cls = translator.JsonConstMonad
elif issubclass(value_type, type(Ellipsis)): cls = translator.EllipsisMonad
else: throw(NotImplementedError, value_type) # pragma: no cover
result = cls(translator, value)
result.aggregated = False
return translator.CmpMonad('is not', monad, translator.NoneMonad(translator))
def getattr(monad, name):
translator = monad.translator
entity = monad.type
attr = entity._adict_.get(name) or entity._subclass_adict_.get(name)
if attr is None: throw(AttributeError,
'Entity %s does not have attribute %s: {EXPR}' % (entity.__name__, name))
if hasattr(monad, 'tableref'): monad.tableref.used_attrs.add(attr)
if not attr.is_collection:
return translator.AttrMonad.new(monad, attr)
else:
return translator.AttrSetMonad(monad, attr)
def requires_distinct(monad, joined=False):
return monad.attr.reverse.is_collection or monad.parent.requires_distinct(joined) # parent ???
class ObjectIterMonad(ObjectMixin, Monad):
def __init__(monad, translator, tableref, entity):
Monad.__init__(monad, translator, entity)
monad.tableref = tableref
def getsql(monad, subquery=None):
entity = monad.type
alias, pk_columns = monad.tableref.make_join(pk_only=True)
return [ [ 'COLUMN', alias, column ] for column in pk_columns ]
def requires_distinct(monad, joined=False):
return monad.tableref.name_path != monad.translator.tree.quals[-1].assign.name
class AttrMonad(Monad):
@staticmethod
def new(parent, attr, *args, **kwargs):
translator = parent.translator
type = normalize_type(attr.py_type)
if type in numeric_types: cls = translator.NumericAttrMonad
binop = 'AND'
class OrMonad(LogicalBinOpMonad):
binop = 'OR'
class NotMonad(BoolMonad):
def __init__(monad, operand):
if operand.type is not bool: operand = operand.nonzero()
BoolMonad.__init__(monad, operand.translator)
monad.operand = operand
def negate(monad):
return monad.operand
def getsql(monad, subquery=None):
return [ [ 'NOT', monad.operand.getsql()[0] ] ]
class ErrorSpecialFuncMonad(Monad):
def __init__(monad, translator, func):
Monad.__init__(monad, translator, func)
monad.func = func
registered_functions = SQLTranslator.registered_functions = {}
class FuncMonadMeta(MonadMeta):
def __new__(meta, cls_name, bases, cls_dict):
func = cls_dict.get('func')
monad_cls = super(FuncMonadMeta, meta).__new__(meta, cls_name, bases, cls_dict)
if func:
if type(func) is tuple: functions = func
else: functions = (func,)
for func in functions: registered_functions[func] = monad_cls
return monad_cls
entity = monad.type
assert len(monad.params) == len(entity._pk_converters_)
return [ [ 'PARAM', param, converter ] for param, converter in izip(monad.params, entity._pk_converters_) ]
def requires_distinct(monad, joined=False):
assert False # pragma: no cover
class StringParamMonad(StringMixin, ParamMonad): pass
class NumericParamMonad(NumericMixin, ParamMonad): pass
class DateParamMonad(DateMixin, ParamMonad): pass
class TimeParamMonad(TimeMixin, ParamMonad): pass
class TimedeltaParamMonad(TimeMixin, ParamMonad): pass
class DatetimeParamMonad(DatetimeMixin, ParamMonad): pass
class BufferParamMonad(BufferMixin, ParamMonad): pass
class UuidParamMonad(UuidMixin, ParamMonad): pass
class ExprMonad(Monad):
@staticmethod
def new(translator, type, sql):
if type in numeric_types: cls = translator.NumericExprMonad
elif type is unicode: cls = translator.StringExprMonad
elif type is date: cls = translator.DateExprMonad
elif type is time: cls = translator.TimeExprMonad
elif type is timedelta: cls = translator.TimedeltaExprMonad
elif type is datetime: cls = translator.DatetimeExprMonad
else: throw(NotImplementedError, type) # pragma: no cover
return cls(translator, type, sql)
def __new__(cls, *args):
if cls is ExprMonad: assert False, 'Abstract class' # pragma: no cover
return Monad.__new__(cls)
def __init__(monad, translator, type, sql):
Monad.__init__(monad, translator, type)
monad.sql = sql
def __new__(cls, *args):
if cls is ParamMonad: assert False, 'Abstract class' # pragma: no cover
return Monad.__new__(cls)
def __init__(monad, translator, type, paramkey):
return Monad.__new__(cls)
def __init__(monad, translator, type, sql):
Monad.__init__(monad, translator, type)
monad.sql = sql
def getsql(monad, subquery=None):
return [ monad.sql ]
class StringExprMonad(StringMixin, ExprMonad): pass
class NumericExprMonad(NumericMixin, ExprMonad): pass
class DateExprMonad(DateMixin, ExprMonad): pass
class TimeExprMonad(TimeMixin, ExprMonad): pass
class TimedeltaExprMonad(TimedeltaMixin, ExprMonad): pass
class DatetimeExprMonad(DatetimeMixin, ExprMonad): pass
class JsonExprMonad(JsonMixin, ExprMonad): pass
class JsonItemMonad(JsonMixin, Monad):
def __init__(monad, parent, key):
assert isinstance(parent, JsonMixin), parent
translator = parent.translator
Monad.__init__(monad, translator, Json)
monad.parent = parent
if isinstance(key, slice):
if key != slice(None, None, None): throw(NotImplementedError)
monad.key_ast = [ 'VALUE', key ]
elif isinstance(key, (ParamMonad, StringConstMonad, NumericConstMonad, EllipsisMonad)):
monad.key_ast = key.getsql()[0]
else: throw(TypeError, 'Invalid JSON path item: %s' % ast2src(key.node))
if isinstance(key, (slice, EllipsisMonad)) and not translator.json_path_wildcard_syntax:
throw(TranslationError, '%s does not support wildcards in JSON path: {EXPR}' % translator.dialect)
def get_path(monad):
path = []
while isinstance(monad, JsonItemMonad):
if not start:
plural = 's' if end > 1 else ''
new_msg = '%s() takes at most %d argument%s (%d given)' % (orig_func_name, end, plural, given)
else:
new_msg = '%s() takes from %d to %d arguments (%d given)' % (orig_func_name, start, end, given)
exc.args = (new_msg,)
throw(exc)
exc.args = (orig_func_name + msg,)
throw(exc)
def raise_forgot_parentheses(monad):
assert monad.type == 'METHOD'
throw(TranslationError, 'You seems to forgot parentheses after %s' % ast2src(monad.node))
class MethodMonad(Monad):
def __init__(monad, translator, parent, attrname):
Monad.__init__(monad, translator, 'METHOD')
monad.parent = parent
monad.attrname = attrname
def getattr(monad, attrname):
raise_forgot_parentheses(monad)
def __call__(monad, *args, **kwargs):
method = getattr(monad.parent, 'call_' + monad.attrname)
try: return method(*args, **kwargs)
except TypeError as exc: reraise_improved_typeerror(exc, method.__name__, monad.attrname)
def contains(monad, item, not_in=False): raise_forgot_parentheses(monad)
def nonzero(monad): raise_forgot_parentheses(monad)
def negate(monad): raise_forgot_parentheses(monad)
def aggregate(monad, func_name): raise_forgot_parentheses(monad)
def __getitem__(monad, key): raise_forgot_parentheses(monad)
binop = 'AND'
class OrMonad(LogicalBinOpMonad):
binop = 'OR'
class NotMonad(BoolMonad):
def __init__(monad, operand):
if operand.type is not bool: operand = operand.nonzero()
BoolMonad.__init__(monad, operand.translator)
monad.operand = operand
def negate(monad):
return monad.operand
def getsql(monad, subquery=None):
return [ [ 'NOT', monad.operand.getsql()[0] ] ]
class ErrorSpecialFuncMonad(Monad):
def __init__(monad, translator, func):
Monad.__init__(monad, translator, func)
monad.func = func
registered_functions = SQLTranslator.registered_functions = {}
class FuncMonadMeta(MonadMeta):
def __new__(meta, cls_name, bases, cls_dict):
func = cls_dict.get('func')
monad_cls = super(FuncMonadMeta, meta).__new__(meta, cls_name, bases, cls_dict)
if func:
if type(func) is tuple: functions = func
else: functions = (func,)
for func in functions: registered_functions[func] = monad_cls
return monad_cls
def __pow__(monad, monad2): raise_forgot_parentheses(monad)
def __neg__(monad): raise_forgot_parentheses(monad)
def abs(monad): raise_forgot_parentheses(monad)
class EntityMonad(Monad):
def __init__(monad, translator, entity):
Monad.__init__(monad, translator, SetType(entity))
if translator.database is None:
translator.database = entity._database_
elif translator.database is not entity._database_:
throw(TranslationError, 'All entities in a query must belong to the same database')
def __getitem__(monad, *args):
throw(NotImplementedError)
class ListMonad(Monad):
def __init__(monad, translator, items):
Monad.__init__(monad, translator, tuple(item.type for item in items))
monad.items = items
def contains(monad, x, not_in=False):
translator = monad.translator
for item in monad.items: check_comparable(item, x)
left_sql = x.getsql()
if len(left_sql) == 1:
if not_in: sql = [ 'NOT_IN', left_sql[0], [ item.getsql()[0] for item in monad.items ] ]
else: sql = [ 'IN', left_sql[0], [ item.getsql()[0] for item in monad.items ] ]
elif not_in:
sql = sqland([ sqlor([ [ 'NE', a, b ] for a, b in izip(left_sql, item.getsql()) ]) for item in monad.items ])
else:
sql = sqlor([ sqland([ [ 'EQ', a, b ] for a, b in izip(left_sql, item.getsql()) ]) for item in monad.items ])
return translator.BoolExprMonad(translator, sql)
def getsql(monad, subquery=None):