Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_session(self):
if self.STATE_SESSION not in self.context.state:
return
key = self.context.state.get(self.STATE_SESSION)
value = conn.get(key)
if value is not None:
session = codecs.decode(bytes(value, 'utf-8'), 'base64')
return pickle.loads(session)
def last_run(cls, crawler):
last_run = conn.get(make_key(crawler, "last_run"))
return unpack_datetime(last_run)
def runs(cls, crawler):
runs = []
for run_id in cls.run_ids(crawler):
start = conn.get(make_key("run", run_id, "start"))
end = conn.get(make_key("run", run_id, "end"))
total_ops = conn.get(make_key("run", run_id, "total_ops"))
runs.append({
'run_id': run_id,
'total_ops': unpack_int(total_ops),
'start': unpack_datetime(start, datetime.utcnow()),
'end': unpack_datetime(end)
})
return runs
def runs(cls, crawler):
runs = []
for run_id in cls.run_ids(crawler):
start = conn.get(make_key("run", run_id, "start"))
end = conn.get(make_key("run", run_id, "end"))
total_ops = conn.get(make_key("run", run_id, "total_ops"))
runs.append({
'run_id': run_id,
'total_ops': unpack_int(total_ops),
'start': unpack_datetime(start, datetime.utcnow()),
'end': unpack_datetime(end)
})
return runs
def op_count(cls, crawler, stage=None):
"""Total operations performed for this crawler"""
if stage:
total_ops = conn.get(make_key(crawler, stage))
else:
total_ops = conn.get(make_key(crawler, "total_ops"))
return unpack_int(total_ops)
def get_tag(self, key):
value = conn.get(make_key(self.crawler, "tag", key))
if value is not None:
return load_json(value)
def runs(cls, crawler):
runs = []
for run_id in cls.run_ids(crawler):
start = conn.get(make_key("run", run_id, "start"))
end = conn.get(make_key("run", run_id, "end"))
total_ops = conn.get(make_key("run", run_id, "total_ops"))
runs.append({
'run_id': run_id,
'total_ops': unpack_int(total_ops),
'start': unpack_datetime(start, datetime.utcnow()),
'end': unpack_datetime(end)
})
return runs
def latest_runid(cls, crawler):
return conn.get(make_key(crawler, "current_run"))