Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def post_init(self):
# recreate the underlying socket
with self.sys_lock:
if self._sock is not None:
self._sock.close()
self._sock = config.SocketBase(AF_NETLINK,
SOCK_DGRAM,
self.family,
self._fileno)
self.sendto_gate = self._gate
# monkey patch recv_into on Python 2.6
if sys.version_info[0] == 2 and sys.version_info[1] < 7:
# --> monkey patch the socket
log.warning('patching socket.recv_into()')
def patch(data, bsize):
data[0:] = self._sock.recv(bsize)
self._sock.recv_into = patch
self.setsockopt(SOL_SOCKET, SO_SNDBUF, self._sndbuf)
self.setsockopt(SOL_SOCKET, SO_RCVBUF, self._rcvbuf)
if self.all_ns:
def load_netlink(self, table, target, event, ctable=None):
#
if self.rtnl_log:
self.log_netlink(table, target, event, ctable)
#
# Update metrics
#
if 'stats' in event['header']:
self.stats[target] = event['header']['stats']
#
# Periodic jobs
#
if time.time() - self.gctime > config.gc_timeout:
self.gctime = time.time()
# clean dead snapshots after GC timeout
for name, wref in self.snapshots.items():
if wref() is None:
del self.snapshots[name]
if name.startswith('ifinfo_'):
self.execute('DROP VIEW %s' % name[7:])
self.execute('DROP TABLE %s' % name)
# clean marked routes
self.execute('DELETE FROM routes WHERE '
'(f_gc_mark + 5) < %s' % self.plch,
(int(time.time()), ))
#
# The event type
rt_msg,
if_announcemsg,
ifma_msg,
ifa_msg)
from pyroute2.netlink.rtnl.ifaddrmsg import ifaddrmsg
from pyroute2.netlink.rtnl.ifinfmsg import ifinfmsg
from pyroute2.netlink.rtnl.rtmsg import rtmsg
from pyroute2.netlink.rtnl import (RTM_NEWLINK as RTNL_NEWLINK,
RTM_DELLINK as RTNL_DELLINK,
RTM_NEWADDR as RTNL_NEWADDR,
RTM_DELADDR as RTNL_DELADDR,
RTM_NEWROUTE as RTNL_NEWROUTE,
RTM_DELROUTE as RTNL_DELROUTE)
if config.uname[0] == 'OpenBSD':
from pyroute2.bsd.rtmsocket.openbsd import (RTMSocketBase,
RTM_ADD,
RTM_NEWADDR)
else:
from pyroute2.bsd.rtmsocket.freebsd import (RTMSocketBase,
RTM_ADD,
RTM_NEWADDR)
def convert_rt_msg(msg):
ret = rtmsg()
ret['header']['type'] = RTNL_NEWROUTE if \
msg['header']['type'] == RTM_ADD else \
RTNL_DELROUTE
ret['family'] = msg['DST']['header']['family']
ret['attrs'] = []
_BONDING_MASTERS = '/sys/class/net/bonding_masters'
_BONDING_SLAVES = '/sys/class/net/%s/bonding/slaves'
_BRIDGE_MASTER = '/sys/class/net/%s/brport/bridge/ifindex'
_BONDING_MASTER = '/sys/class/net/%s/master/ifindex'
IFNAMSIZ = 16
TUNDEV = '/dev/net/tun'
PLATFORMS = ('i386', 'i686', 'x86_64', 'armv6l', 'armv7l', 's390x', 'aarch64')
if config.machine in PLATFORMS:
TUNSETIFF = 0x400454ca
TUNSETPERSIST = 0x400454cb
TUNSETOWNER = 0x400454cc
TUNSETGROUP = 0x400454ce
elif config.machine in ('ppc64', 'mips'):
TUNSETIFF = 0x800454ca
TUNSETPERSIST = 0x800454cb
TUNSETOWNER = 0x800454cc
TUNSETGROUP = 0x800454ce
else:
TUNSETIFF = None
def sync(f):
'''
A decorator to wrap up external utility calls.
A decorated function receives a netlink message
as a parameter, and then:
1. Starts a monitoring thread
def __init__(self, *argv, **kwarg):
if 'family' in kwarg:
kwarg.pop('family')
super(IPRSocketMixin, self).__init__(NETLINK_ROUTE, *argv[1:], **kwarg)
self.marshal = MarshalRtnl()
self._s_channel = None
if sys.platform.startswith('linux'):
self._gate = self._gate_linux
self.sendto_gate = self._gate_linux
send_ns = Namespace(self, {'addr_pool': AddrPool(0x10000,
0x1ffff),
'monitor': False})
self._sproxy = NetlinkProxy(policy='return', nl=send_ns)
self._sproxy.pmap = {rtnl.RTM_NEWLINK: proxy_newlink,
rtnl.RTM_SETLINK: proxy_setlink}
if config.kernel < [3, 3, 0]:
self._recv_ns = Namespace(self,
{'addr_pool': AddrPool(0x20000,
0x2ffff),
'monitor': False})
self._sproxy.pmap[rtnl.RTM_DELLINK] = proxy_dellink
# inject proxy hooks into recv() and...
self.__recv = self._recv
self._recv = self._p_recv
# ... recv_into()
self._recv_ft = self.recv_ft
self.recv_ft = self._p_recv_ft
except InvalidateHandlerException:
try:
handlers.remove(handler)
except:
log.error('could not invalidate '
'event handler:\n%s'
% traceback.format_exc())
except ShutdownException:
stop = True
break
except DBMExitException:
return
except:
log.error('could not load event:\n%s\n%s'
% (event, traceback.format_exc()))
if time.time() - self.gctime > config.gc_timeout:
self.gctime = time.time()
except Exception as e:
self.log.error('exception <%s> in source %s' % (e, target))
# restart the target
try:
self.sources[target].restart(reason=e)
except KeyError:
pass
# release all the sources
for target in tuple(self.sources.cache):
source = self.sources.remove(target, sync=False)
if source is not None and source.th is not None:
source.shutdown.set()
source.th.join()
self.log.debug('flush DB for the target %s' % target)
# raise partial commit exceptions
if transaction.partial and transaction.errors:
error = PartialCommitException('partial commit error')
# if it is not a rollback turn
if drop and commit_phase == 1:
# drop last transaction in any case
self.drop(transaction.uid)
# raise exception for failed transaction
if error is not None:
error.debug = debug
raise error
time.sleep(config.commit_barrier)
# drop all collected errors, if any
self.errors = []
return self
fail and raise an exception::
import os
nsp = NSPopen('nsname', ['command'], flags=os.O_CREAT | os.O_EXCL)
'''
# create a child
self.nsname = nsname
if 'flags' in kwarg:
self.flags = kwarg.pop('flags')
else:
self.flags = 0
self.channel_out = config.MpQueue()
self.channel_in = config.MpQueue()
self.lock = threading.Lock()
self.released = False
self.server = config.MpProcess(target=NSPopenServer,
args=(self.nsname,
self.flags,
self.channel_out,
self.channel_in,
argv, kwarg))
# start the child and check the status
self.server.start()
response = self.channel_in.get()
if isinstance(response, Exception):
self.server.join()
raise response
else:
atexit.register(self.release)
file = file
except NameError:
file = io.IOBase
# FIXME: arch reference
__NR = {'x86_': {'64bit': 308},
'i386': {'32bit': 346},
'i686': {'32bit': 346},
'mips': {'32bit': 4344,
'64bit': 5303}, # FIXME: NABI32?
'armv': {'32bit': 375},
'aarc': {'32bit': 375,
'64bit': 268}, # FIXME: EABI vs. OABI?
'ppc6': {'64bit': 350},
's390': {'64bit': 339}}
__NR_setns = __NR.get(config.machine[:4], {}).get(config.arch, 308)
CLONE_NEWNET = 0x40000000
MNT_DETACH = 0x00000002
MS_BIND = 4096
MS_REC = 16384
MS_SHARED = 1 << 20
NETNS_RUN_DIR = '/var/run/netns'
__saved_ns = []
def _get_netnspath(name):
netnspath = name
dirname = os.path.dirname(name)
if not dirname:
netnspath = '%s/%s' % (NETNS_RUN_DIR, name)
This call always bypasses open transactions, loading
changes directly into the interface data.
'''
global supported_kinds
with self._direct_state:
if self['ipdb_scope'] == 'locked':
# do not touch locked interfaces
return
if self['ipdb_scope'] in ('shadow', 'create'):
# ignore non-broadcast messages
if dev['header']['sequence_number'] != 0:
return
# ignore ghost RTM_NEWLINK messages
if (config.kernel[0] < 3) and \
(not dev.get_attr('IFLA_AF_SPEC')):
return
for (name, value) in dev.items():
self[name] = value
for cell in dev['attrs']:
#
# Parse on demand
#
# At that moment, being not referenced, the
# NLA is not decoded (yet). Calling
# `__getitem__()` on nla_slot triggers the
# NLA decoding, if the nla is referenced:
#
norm = ifinfmsg.nla2name(cell[0])
if norm not in self.cleanup: