Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, request_id=None, name="unknown", artifact=None, channel=searcher_channel()):
super(ThreatServiceLookupEvent, self).__init__(name=name)
self.channels = (channel,)
self.request_id = request_id
self.cts_channel = channel
self.artifact = artifact
self.name = name
# because this is used in 'resutil threatservicetest'
# and we want to return an immediate (not async) response
return response_object
# If we already have a completed query for this key, return it immmediately
request_data = self.cache.get(cache_key)
if request_data and request_data.get("complete"):
response_object["hits"] = request_data.get("hits", [])
return response_object
response.status = 303
response_object["retry_secs"] = self.first_retry_secs
# Add the request to the cache, then notify searchers that there's a new request
self.cache.setdefault(cache_key, {"id": request_id, "artifact": body, "hits": [], "complete": False})
evt = ThreatServiceLookupEvent(request_id=request_id, name=artifact_type, artifact=body, channel=cts_channel)
self.async_helper.fire(evt, HELPER_CHANNEL)
return response_object
def _lookup_complete(self, event, *args, **kwargs):
"""
A lookup event was completed
"""
if not isinstance(event.parent, ThreatServiceLookupEvent):
return
results = event.parent.value.getValue()
artifact = event.parent.artifact
cts_channel = event.parent.cts_channel
request_id = event.parent.request_id
LOG.info("Lookup complete: %s, %s", event.parent, results)
# Depending on how many components handled this lookup event,
# the results can be a single value (dict), or an array, or None,
# or an exception, or a tuple (type, exception, traceback)
hits = []
complete = True
if isinstance(results, list):
for result in results:
if result: