Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_logger.info("Starting makai event bridge...")
zmq_context = zmq.Context()
zmq_sub_event_socket = zmq_context.socket(zmq.SUB)
zmq_sub_event_socket.setsockopt(zmq.SUBSCRIBE, b"")
zmq_pub_socket = zmq_context.socket(zmq.PUB)
zmq_sub_event_socket.connect(conf.get("zmq.event.interface"))
zmq_pub_socket.connect(conf.get("zmq.mauka.plugin.pub.interface"))
while True:
event_msg = zmq_sub_event_socket.recv_multipart()
if mauka_config.get("debug", False):
_logger.debug("recv event msg: %s", str(event_msg))
event_id = int(event_msg[1])
makai_event = protobuf.pb_util.build_makai_event("makai_event_bridge", event_id)
mauka_message_bytes = protobuf.pb_util.serialize_message(makai_event)
zmq_pub_socket.send_multipart((Routes.makai_event.encode(), mauka_message_bytes))
def produce_makai_event_id(pub_socket: zmq.Socket, event_id: int):
"""
Produces a makai event_id to the MakaiEventPlugin.
:param pub_socket: Publish socket.
:param event_id: The event_id to publish.
"""
makai_event = pb_util.build_makai_event("TriggerPlugin", event_id)
serialized_makai_event = pb_util.serialize_message(makai_event)
pub_socket.send_multipart((Routes.makai_event.encode(), serialized_makai_event))
def reanalyze_event(event_id: int,
zmq_push_ep: str):
try:
zmq_context: zmq.Context = zmq.Context()
zmq_pub_socket: zmq.Socket = zmq_context.socket(zmq.PUB)
zmq_pub_socket.connect(zmq_push_ep)
makai_event = protobuf.pb_util.build_makai_event("makai_event_bridge", event_id)
mauka_message_bytes = protobuf.pb_util.serialize_message(makai_event)
print(makai_event, len(mauka_message_bytes))
zmq_pub_socket.send_multipart((Routes.makai_event.encode(), mauka_message_bytes))
print("event_id sent")
except Exception as e:
print("Encountered an error %s" % str(e))
:param box_ids: A list of box ids to modify.
:param measurement_window_cycles: Number of cycles in a measurement.
:param box_optimization_records: Thread safe req/resp of records.
:param logger: The logger.
"""
if measurement_window_cycles <= 0:
logger.error("measurement_window_cycles must be strictly larger than 0")
return
logger.debug("Modifying measurement_window_cycles=%d for %s" % (measurement_window_cycles, str(box_ids)))
box_commands = pb_util.build_makai_rate_change_commands(box_ids, measurement_window_cycles)
for (box_command, identity) in box_commands:
serialized_box_command = pb_util.serialize_message(box_command)
makai_send_socket.send(serialized_box_command)
box_optimization_records.add_record(identity)
logger.debug("Sent optimization request command with identity=%s" % identity)
def send_box_info_cmds(makai_send_socket: zmq.Socket,
box_ids: typing.List[str],
box_optimization_records: BoxOptimizationRecords,
logger: BoxOptimizationPluginLogger):
"""
Send info requests to Makai.
:param makai_send_socket: The makai ZMQ socket.
:param box_ids: List of box ids to request the info for.
:param box_optimization_records: Records.
:param logger: Logger.
"""
logger.debug("Sending info commands to Makai for the following boxes: %s" % str(box_ids))
for box_id in box_ids:
cmd, identity = pb_util.build_makai_get_info_command(box_id)
box_optimization_records.add_record(identity)
serialized_cmd = pb_util.serialize_message(cmd)
makai_send_socket.send(serialized_cmd)
logger.debug("Sent box info command %s" % str(cmd))
logger.debug("Trigger record inserted for event_token %s and event_id %d", event_token, event_id)
# Create a new event
mongo.store_event(event_id,
"Mauka %s" % event_token,
box_ids,
start_timestamp_ms,
end_timestamp_ms,
mongo_client)
logger.debug("MongoDB event created with event_id %d", event_id)
# Trigger the boxes
for trigger_command in trigger_commands:
try:
zmq_trigger_socket.send(pb_util.serialize_message(trigger_command))
except Exception as exception: # pylint: disable=W0703
logger.error(str(exception))
logger.debug("%d boxes triggered", len(box_ids))
return event_token
def produce(self, topic: str, mauka_message: protobuf.mauka_pb2.MaukaMessage):
"""Produces a message with a given topic to the system
:param topic: The topic to produce this message to
:param mauka_message: The message to produce
"""
serialized_mauka_message = protobuf.pb_util.serialize_message(mauka_message)
with self.producer_lock:
self.zmq_producer.send_multipart((topic.encode(), serialized_mauka_message))
self.update_published(len(serialized_mauka_message))
def request_next_available_incident_id(self) -> typing.Optional[int]:
"""
Requests the next available incident id from the incident id provider service.
:return: The next available incident id or None.
"""
req_id = int(time.time())
req = protobuf.pb_util.build_incident_id_req(self.name, req_id)
self.zmq_incident_id_req_socket.send(protobuf.pb_util.serialize_message(req))
resp: bytes = self.zmq_incident_id_req_socket.recv()
mauka_message: protobuf.mauka_pb2.MaukaMessage = protobuf.pb_util.deserialize_mauka_message(resp)
if protobuf.pb_util.is_incident_id_resp(mauka_message):
if mauka_message.incident_id_resp.resp_id == req_id:
return mauka_message.incident_id_resp.incident_id
else:
self.logger.error("Incident req id %d != incident resp id %d",
req_id,
mauka_message.incident_id_resp.resp_id)
return None
else:
self.logger.error("Received an incorrect message for an incident id response")
return None
conf.get("mongo.db"))
next_available_incident_id = mongo.next_available_incident_id(mongo_client)
incident_id_service = services.incident_id_provider.IncidentIdProvider(next_available_incident_id)
zmq_context: zmq.Context = zmq.Context()
zmq_req_socket: zmq.Socket = zmq_context.socket(zmq.REP)
zmq_req_socket.bind(conf.get("zmq.incident_id_provider.rep.interface"))
while True:
req: bytes = zmq_req_socket.recv()
mauka_message: protobuf.pb_util.mauka_pb2.MaukaMessage = protobuf.pb_util.deserialize_mauka_message(req)
if protobuf.pb_util.is_incident_id_req(mauka_message):
resp = protobuf.pb_util.build_incident_id_resp("incident_id_service",
mauka_message.incident_id_req.req_id,
incident_id_service.get_and_inc())
zmq_req_socket.send(protobuf.pb_util.serialize_message(resp))
else:
_logger.error("Did not receive valid IncidentIdReq")