Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
"""
Run loop which continuously attempts to receive triggered data from Makai.
"""
self.logger.info("MakaiDataSubscriber thread started")
while True:
# Receive data and extract parameters
data = self.zmq_socket.recv_multipart()
identity = data[0].decode()
topic, event_token, box_id = identity.split("_")
response = pb_util.deserialize_makai_response(data[1])
if pb_util.is_makai_data_response(response):
box_id = str(response.box_id)
cycles = list(map(pb_util.deserialize_makai_cycle, data[2:]))
start_ts, end_ts = extract_timestamps(cycles)
samples = cycles_to_data(cycles)
event_id = self.trigger_records.event_id(event_token)
self.logger.debug("Recv data with topic %s, event_token %s, event_id %d, and box_id %s",
topic,
event_token,
event_id,
box_id)
# Update event
mongo.update_event(event_id, box_id, self.mongo_client)
self.trigger_records.remove_record(event_token)
self.logger.debug("Event with event_id %d updated", event_id)
def on_message(self, topic: str, mauka_message: pb_util.mauka_pb2.MaukaMessage):
"""Subscribed messages occur async
Messages are printed to stdout
:param topic: The topic that this message is associated with
:param mauka_message: The message
"""
if pb_util.is_trigger_request(mauka_message):
self.debug("Recv trigger request %s" % str(mauka_message))
trigger_boxes(self.zmq_trigger_socket,
self.zmq_event_id_socket,
self.trigger_records,
mauka_message.trigger_request.start_timestamp_ms,
mauka_message.trigger_request.end_timestamp_ms,
mauka_message.trigger_request.box_ids[:],
self.logger,
self.mongo_client)
else:
self.logger.error("Received incorrect type of MaukaMessage :%s", str(mauka_message))
def rerun(mongo_client: mongo.OpqMongoClient, mauka_message):
"""
Rerun this plugin over the provided mauka message.
:param mongo_client: Mongo client to perform DB queries.
:param mauka_message: Mauka message to rerun this plugin over.
"""
if protobuf.pb_util.is_payload(mauka_message, protobuf.mauka_pb2.VOLTAGE_RMS_WINDOWED):
client = mongo.get_default_client(mongo_client)
semi_violation(client, mauka_message)
def request_next_available_incident_id(self) -> typing.Optional[int]:
"""
Requests the next available incident id from the incident id provider service.
:return: The next available incident id or None.
"""
req_id = int(time.time())
req = protobuf.pb_util.build_incident_id_req(self.name, req_id)
self.zmq_incident_id_req_socket.send(protobuf.pb_util.serialize_message(req))
resp: bytes = self.zmq_incident_id_req_socket.recv()
mauka_message: protobuf.mauka_pb2.MaukaMessage = protobuf.pb_util.deserialize_mauka_message(resp)
if protobuf.pb_util.is_incident_id_resp(mauka_message):
if mauka_message.incident_id_resp.resp_id == req_id:
return mauka_message.incident_id_resp.incident_id
else:
self.logger.error("Incident req id %d != incident resp id %d",
req_id,
mauka_message.incident_id_resp.resp_id)
return None
else:
self.logger.error("Received an incorrect message for an incident id response")
return None
def request_next_available_incident_id(self) -> typing.Optional[int]:
"""
Requests the next available incident id from the incident id provider service.
:return: The next available incident id or None.
"""
req_id = int(time.time())
req = protobuf.pb_util.build_incident_id_req(self.name, req_id)
self.zmq_incident_id_req_socket.send(protobuf.pb_util.serialize_message(req))
resp: bytes = self.zmq_incident_id_req_socket.recv()
mauka_message: protobuf.mauka_pb2.MaukaMessage = protobuf.pb_util.deserialize_mauka_message(resp)
if protobuf.pb_util.is_incident_id_resp(mauka_message):
if mauka_message.incident_id_resp.resp_id == req_id:
return mauka_message.incident_id_resp.incident_id
else:
self.logger.error("Incident req id %d != incident resp id %d",
req_id,
mauka_message.incident_id_resp.resp_id)
return None
else:
self.logger.error("Received an incorrect message for an incident id response")
return None
def produce_makai_event_id(pub_socket: zmq.Socket, event_id: int):
"""
Produces a makai event_id to the MakaiEventPlugin.
:param pub_socket: Publish socket.
:param event_id: The event_id to publish.
"""
makai_event = pb_util.build_makai_event("TriggerPlugin", event_id)
serialized_makai_event = pb_util.serialize_message(makai_event)
pub_socket.send_multipart((Routes.makai_event.encode(), serialized_makai_event))