Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wait(self, poller):
if self.rpc is not None:
self.rpc.wait(poller)
elif self.stream is not None:
self.stream.run_wait(poller)
self.stream.connect_wait(poller)
if self.pstream is not None:
self.pstream.wait(poller)
self.reconnect.wait(poller, ovs.timeval.msec())
vlog.warn("failed to disable core dumps")
# Throttle restarts to no more than once every 10 seconds.
if (last_restart is not None and
ovs.timeval.msec() < last_restart + 10000):
vlog.warn("%s, waiting until 10 seconds since last "
"restart" % status_msg)
while True:
now = ovs.timeval.msec()
wakeup = last_restart + 10000
if now > wakeup:
break
sys.stdout.write("sleep %f\n" % (
(wakeup - now) / 1000.0))
time.sleep((wakeup - now) / 1000.0)
last_restart = ovs.timeval.msec()
vlog.err("%s, restarting" % status_msg)
daemon_pid = _fork_and_wait_for_startup()
if not daemon_pid:
break
else:
vlog.info("%s, exiting" % status_msg)
sys.exit(0)
self.reconnect.listen_error(ovs.timeval.msec(), error)
self.pstream.close()
self.pstream = None
if self.rpc:
backlog = self.rpc.get_backlog()
self.rpc.run()
if self.rpc.get_backlog() < backlog:
# Data previously caught in a queue was successfully sent (or
# there's an error, which we'll catch below).
#
# We don't count data that is successfully sent immediately as
# activity, because there's a lot of queuing downstream from
# us, which means that we can push a lot of data into a
# connection that has stalled and won't ever recover.
self.reconnect.activity(ovs.timeval.msec())
error = self.rpc.get_status()
if error != 0:
self.reconnect.disconnected(ovs.timeval.msec(), error)
self.__disconnect()
elif self.stream is not None:
self.stream.run()
error = self.stream.connect()
if error == 0:
self.reconnect.connected(ovs.timeval.msec())
self.rpc = Connection(self.stream)
self.stream = None
elif error != errno.EAGAIN:
self.reconnect.connect_failed(ovs.timeval.msec(), error)
self.pick_remote()
self.stream.close()
self.reconnect.disconnected(ovs.timeval.msec(), error)
self.__disconnect()
elif self.stream is not None:
self.stream.run()
error = self.stream.connect()
if error == 0:
self.reconnect.connected(ovs.timeval.msec())
self.rpc = Connection(self.stream)
self.stream = None
elif error != errno.EAGAIN:
self.reconnect.connect_failed(ovs.timeval.msec(), error)
self.pick_remote()
self.stream.close()
self.stream = None
action = self.reconnect.run(ovs.timeval.msec())
if action == ovs.reconnect.CONNECT:
self.__connect()
elif action == ovs.reconnect.DISCONNECT:
self.reconnect.disconnected(ovs.timeval.msec(), 0)
self.__disconnect()
elif action == ovs.reconnect.PROBE:
if self.rpc:
request = Message.create_request("echo", [])
request.id = "echo"
self.rpc.send(request)
else:
assert action is None
'name', which should be a string acceptable to ovs.stream.Stream or
ovs.stream.PassiveStream's initializer.
If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
session connects and reconnects, with back-off, to 'name'.
If 'name' is a passive connection method, e.g. "ptcp:", the new session
listens for connections to 'name'. It maintains at most one connection
at any given time. Any new connection causes the previous one (if any)
to be dropped."""
reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
reconnect.set_name(name)
reconnect.enable(ovs.timeval.msec())
if ovs.stream.PassiveStream.is_valid_name(name):
reconnect.set_passive(True, ovs.timeval.msec())
if ovs.stream.stream_or_pstream_needs_probes(name):
reconnect.set_probe_interval(0)
return Session(reconnect, None)
import uuid
from ovs import jsonrpc
from ovs import poller
from ovs import reconnect
from ovs import stream
from ovs import timeval
from ovs.db import idl
from ryu.base import app_manager
from ryu.lib import hub
from ryu.services.protocols.ovsdb import event
from ryu.services.protocols.ovsdb import model
now = timeval.msec
def _uuid_to_row(atom, base):
if base.ref_table:
value = base.ref_table.rows.get(atom)
else:
value = atom
if isinstance(value, idl.Row):
value = str(value.uuid)
return value
def dictify(row):
if row is None:
def recv(self):
if self.rpc is not None:
received_bytes = self.rpc.get_received_bytes()
error, msg = self.rpc.recv()
if received_bytes != self.rpc.get_received_bytes():
# Data was successfully received.
#
# Previously we only counted receiving a full message as
# activity, but with large messages or a slow connection that
# policy could time out the session mid-message.
self.reconnect.activity(ovs.timeval.msec())
if not error:
if msg.type == Message.T_REQUEST and msg.method == "echo":
# Echo request. Send reply.
self.send(Message.create_reply(msg.params, msg.id))
elif msg.type == Message.T_REPLY and msg.id == "echo":
# It's a reply to our echo request. Suppress it.
pass
else:
return msg
return None
def open_multiple(remotes, probe_interval=None):
reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
session = Session(reconnect, None, remotes)
session.pick_remote()
reconnect.enable(ovs.timeval.msec())
reconnect.set_backoff_free_tries(len(remotes))
if ovs.stream.PassiveStream.is_valid_name(reconnect.get_name()):
reconnect.set_passive(True, ovs.timeval.msec())
if not ovs.stream.stream_or_pstream_needs_probes(reconnect.get_name()):
reconnect.set_probe_interval(0)
elif probe_interval is not None:
reconnect.set_probe_interval(probe_interval)
return session
def run(self):
if self.pstream is not None:
error, stream = self.pstream.accept()
if error == 0:
if self.rpc or self.stream:
# XXX rate-limit
vlog.info("%s: new connection replacing active "
"connection" % self.reconnect.get_name())
self.__disconnect()
self.reconnect.connected(ovs.timeval.msec())
self.rpc = Connection(stream)
elif error != errno.EAGAIN:
self.reconnect.listen_error(ovs.timeval.msec(), error)
self.pstream.close()
self.pstream = None
if self.rpc:
backlog = self.rpc.get_backlog()
self.rpc.run()
if self.rpc.get_backlog() < backlog:
# Data previously caught in a queue was successfully sent (or
# there's an error, which we'll catch below).
#
# We don't count data that is successfully sent immediately as
# activity, because there's a lot of queuing downstream from
# us, which means that we can push a lot of data into a
# connection that has stalled and won't ever recover.
self.reconnect.activity(ovs.timeval.msec())