Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _validate_dependency(self, dep):
"""validate a dependency.
For use in `set_flags`.
"""
if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)):
return True
elif isinstance(dep, (list,set, tuple)):
for d in dep:
if not isinstance(d, string_types + (AsyncResult,)):
return False
elif isinstance(dep, dict):
if set(dep.keys()) != set(Dependency().as_dict().keys()):
return False
if not isinstance(dep['msg_ids'], list):
return False
for d in dep['msg_ids']:
if not isinstance(d, string_types):
return False
else:
return False
def _validate_dependency(self, dep):
"""validate a dependency.
For use in `set_flags`.
"""
if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)):
return True
elif isinstance(dep, (list,set, tuple)):
for d in dep:
if not isinstance(d, string_types + (AsyncResult,)):
return False
elif isinstance(dep, dict):
if set(dep.keys()) != set(Dependency().as_dict().keys()):
return False
if not isinstance(dep['msg_ids'], list):
return False
for d in dep['msg_ids']:
if not isinstance(d, string_types):
return False
else:
return False
return True
# get targets as a set of bytes objects
# from a list of unicode objects
targets = md.get('targets', [])
targets = set(map(cast_bytes, targets))
retries = md.get('retries', 0)
self.retries[msg_id] = retries
# time dependencies
after = md.get('after', None)
if after:
after = Dependency(after)
if after.all:
if after.success:
after = Dependency(after.difference(self.all_completed),
success=after.success,
failure=after.failure,
all=after.all,
)
if after.failure:
after = Dependency(after.difference(self.all_failed),
success=after.success,
failure=after.failure,
all=after.all,
)
if after.check(self.all_completed, self.all_failed):
# recast as empty set, if `after` already met,
# to prevent unnecessary set comparisons
after = MET
else:
after = MET
def _render_dependency(self, dep):
"""helper for building jsonable dependencies from various input forms."""
if isinstance(dep, Dependency):
return dep.as_dict()
elif isinstance(dep, AsyncResult):
return dep.msg_ids
elif dep is None:
return []
else:
# pass to Dependency constructor
return list(Dependency(dep))
retries = md.get('retries', 0)
self.retries[msg_id] = retries
# time dependencies
after = md.get('after', None)
if after:
after = Dependency(after)
if after.all:
if after.success:
after = Dependency(after.difference(self.all_completed),
success=after.success,
failure=after.failure,
all=after.all,
)
if after.failure:
after = Dependency(after.difference(self.all_failed),
success=after.success,
failure=after.failure,
all=after.all,
)
if after.check(self.all_completed, self.all_failed):
# recast as empty set, if `after` already met,
# to prevent unnecessary set comparisons
after = MET
else:
after = MET
# location dependencies
follow = Dependency(md.get('follow', []))
timeout = md.get('timeout', None)
if timeout:
md = msg['metadata']
msg_id = header['msg_id']
self.all_ids.add(msg_id)
# get targets as a set of bytes objects
# from a list of unicode objects
targets = md.get('targets', [])
targets = set(map(cast_bytes, targets))
retries = md.get('retries', 0)
self.retries[msg_id] = retries
# time dependencies
after = md.get('after', None)
if after:
after = Dependency(after)
if after.all:
if after.success:
after = Dependency(after.difference(self.all_completed),
success=after.success,
failure=after.failure,
all=after.all,
)
if after.failure:
after = Dependency(after.difference(self.all_failed),
success=after.success,
failure=after.failure,
all=after.all,
)
if after.check(self.all_completed, self.all_failed):
# recast as empty set, if `after` already met,
# to prevent unnecessary set comparisons
)
if after.failure:
after = Dependency(after.difference(self.all_failed),
success=after.success,
failure=after.failure,
all=after.all,
)
if after.check(self.all_completed, self.all_failed):
# recast as empty set, if `after` already met,
# to prevent unnecessary set comparisons
after = MET
else:
after = MET
# location dependencies
follow = Dependency(md.get('follow', []))
timeout = md.get('timeout', None)
if timeout:
timeout = float(timeout)
job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
header=header, targets=targets, after=after, follow=follow,
timeout=timeout, metadata=md,
)
# validate and reduce dependencies:
for dep in after,follow:
if not dep: # empty dependency
continue
# check valid:
if msg_id in dep or dep.difference(self.all_ids):
self.queue_map[msg_id] = job