Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_adapt_subtype(self):
class Sub(str):
pass
s1 = "hel'lo"
s2 = Sub(s1)
self.assertEqual(adapt(s1).getquoted(), adapt(s2).getquoted())
# Restrict to tags when necessary
if self.filter_tags:
q &= Q(tags__contains=self.filter_tags)
# Restrict to attributes when necessary
for s in self.managedobjectselectorbyattribute_set.all():
q &= SQL(
"""
("sa_managedobject"."id" IN (
SELECT managed_object_id
FROM sa_managedobjectattribute
WHERE
key ~ %s
AND value ~ %s
))
"""
% (adapt(s.key_re).getquoted(), adapt(s.value_re).getquoted())
)
# Restrict to sources
if self.sources.count():
if self.source_combine_method == "A":
# AND
for s in self.sources.all():
q &= s.Q
else:
# OR
ql = list(self.sources.all())
q = ql.pop(0).Q
for qo in ql:
q |= qo.Q
return q
def quote_value(self, value):
if value ==None:
return SQL_NULL
if is_list(value):
json = value2json(value)
return self.quote_value(json)
if is_text(value) and len(value) > 256:
value = value[:256]
return SQL(adapt(value))
def low_level_search(self, cr, uid, ids, key_list, context={}, **kwargs):
if not kwargs.get('matching_model_name'):
for template in self.browse(cr, uid, ids, context=context):
model = template.target_model_id.model
else:
model = kwargs.get('matching_model_name')
target_model_pool = self.pool.get(model)
where_string = "WHERE id IS NOT NULL\n"
for key_sub_list in key_list:
if isinstance(key_sub_list[2], basestring):
second_parametr = adapt(key_sub_list[2]).getquoted()
else:
second_parametr = key_sub_list[2]
where_string += "AND {0} {1} {2} \n".format(key_sub_list[0], key_sub_list[1], second_parametr)
cr.execute("""
SELECT *
FROM %s
%s""" % (target_model_pool._table, where_string))
result = cr.fetchall()
result = [id_numders[0] for id_numders in result]
return result
register_adapter(Order, ObjectMapper)
# Describe what is needed to save on each object
# This is actually just configuration, you can use xml with a parser if you
# like to have plenty of wasted CPU cycles ;P.
persistent_fields = {'Album': ['album_id', 'creation_time', 'binary_data'],
'Order': ['order_id', 'items', 'price']
}
print adapt(Album()).generateInsert()
print adapt(Album()).generateInsert()
print adapt(Album()).generateInsert()
print adapt(Order()).generateInsert()
print adapt(Order()).generateInsert()
print adapt(Order()).generateInsert()
"""
- Discussion
Psycopg 2 has a great new feature: adaptation. The big thing about
adaptation is that it enables the programmer to glue most of the
code out there without many difficulties.
This recipe tries to focus attention on a way to generate SQL queries to
insert completely new objects inside a database. As you can see objects do
not know anything about the code that is handling them. We specify all the
fields that we need for each object through the persistent_fields dict.
The most important lines of this recipe are:
register_adapter(Album, ObjectMapper)
register_adapter(Order, ObjectMapper)
def adaptLabel(label):
return AsIs("(%s, %s)::label" % (adapt(label.type_), adapt(label.labelValue())))
def sqlesc(value):
adapted = adapt(value.encode('utf-8'))
if hasattr(adapted, 'getquoted'):
adapted = adapted.getquoted()
return adapted
if(interval==None):
interval = ""
else:
interval = "AND sp_timestamp > " + interval
sql = """SELECT ( SELECT sprocs.sproc_name
FROM monitor_data.sprocs
WHERE sprocs.sproc_id = t.sp_sproc_id) AS name, date_trunc('hour'::text, t.sp_timestamp) + floor(date_part('minute'::text, t.sp_timestamp) / 15::double precision) * '00:15:00'::interval AS xaxis, sum(t.delta_calls) AS d_calls, sum(t.delta_self_time) AS d_self_time, sum(t.delta_total_time) AS d_total_time
FROM ( SELECT sproc_performance_data.sp_timestamp,
sproc_performance_data.sp_sproc_id,
COALESCE(sproc_performance_data.sp_calls - lag(sproc_performance_data.sp_calls) OVER w, 0::bigint) AS delta_calls,
COALESCE(sproc_performance_data.sp_self_time - lag(sproc_performance_data.sp_self_time) OVER w, 0::bigint) AS delta_self_time,
COALESCE(sproc_performance_data.sp_total_time - lag(sproc_performance_data.sp_total_time) OVER w, 0::bigint) AS delta_total_time
FROM monitor_data.sproc_performance_data
WHERE (sproc_performance_data.sp_sproc_id IN ( SELECT sprocs.sproc_id
FROM monitor_data.sprocs WHERE sproc_host_id = """ + str(adapt(hostId)) + """ ))
""" + interval + """
WINDOW w AS ( PARTITION BY sproc_performance_data.sp_sproc_id ORDER BY sproc_performance_data.sp_timestamp )
ORDER BY sproc_performance_data.sp_timestamp) t
GROUP BY t.sp_sproc_id, date_trunc('hour'::text, t.sp_timestamp) + floor(date_part('minute'::text, t.sp_timestamp) / 15::double precision) * '00:15:00'::interval
ORDER BY date_trunc('hour'::text, t.sp_timestamp) + floor(date_part('minute'::text, t.sp_timestamp) / 15::double precision) * '00:15:00'::interval"""
return sql;
def getSingleSprocSQL(hostId, name, interval=None):
if name[-1:]!=")":
name = name + "("
if(interval==None):
interval = "AND sp_timestamp > ('now'::timestamp-'23 days'::interval)"
else:
if 'interval' in interval:
interval = "AND sp_timestamp > " + interval
else:
interval = "AND sp_timestamp BETWEEN %s::timestamp AND %s::timestamp" % ( adapt(interval['from']), adapt(interval['to']), )
nameSql = str(adapt(name+'%'))
sql = """SELECT ( SELECT sprocs.sproc_name
FROM monitor_data.sprocs
WHERE sprocs.sproc_id = t.sp_sproc_id) AS name,
date_trunc('hour'::text, t.sp_timestamp) + floor(date_part('minute'::text, t.sp_timestamp) / 15::double precision) * '00:15:00'::interval AS xaxis,
sum(t.delta_calls) AS d_calls,
sum(t.delta_self_time) AS d_self_time,
sum(t.delta_total_time) AS d_total_time,
CASE WHEN sum(t.delta_calls) > 0 THEN round(sum(t.delta_total_time) / sum(t.delta_calls), 2) ELSE 0 END AS d_avg_time,
CASE WHEN sum(t.delta_calls) > 0 THEN round(sum(t.delta_self_time) / sum(t.delta_calls), 2) ELSE 0 END AS d_avg_self_time
FROM ( SELECT sproc_performance_data.sp_timestamp,
sproc_performance_data.sp_sproc_id,
COALESCE(sproc_performance_data.sp_calls - lag(sproc_performance_data.sp_calls) OVER (PARTITION BY sproc_performance_data.sp_sproc_id ORDER BY sproc_performance_data.sp_timestamp), 0::bigint) AS delta_calls,
COALESCE(sproc_performance_data.sp_self_time - lag(sproc_performance_data.sp_self_time) OVER (PARTITION BY sproc_performance_data.sp_sproc_id ORDER BY sproc_performance_data.sp_timestamp), 0::bigint) AS delta_self_time,