Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def nlm_request(self, msg, msg_type,
msg_flags=NLM_F_REQUEST | NLM_F_DUMP,
terminate=None,
callback=None):
msg_seq = self.addr_pool.alloc()
with self.lock[msg_seq]:
retry_count = 0
while True:
try:
self.put(msg, msg_type, msg_flags, msg_seq=msg_seq)
for msg in self.get(msg_seq=msg_seq,
terminate=terminate,
callback=callback):
yield msg
break
except NetlinkError as e:
if e.code != 16:
def _register_mask(self, cmd, mask):
msg = tcmd()
msg['cmd'] = TASKSTATS_CMD_GET
msg['version'] = 1
msg['attrs'].append([cmd, mask])
# there is no response to this request
self.put(msg,
self.prid,
msg_flags=NLM_F_REQUEST)
def discovery(self, proto):
'''
Resolve generic netlink protocol -- takes a string
as the only parameter, return protocol description
'''
msg = ctrlmsg()
msg['cmd'] = CTRL_CMD_GETFAMILY
msg['version'] = 1
msg['attrs'].append(['CTRL_ATTR_FAMILY_NAME', proto])
msg['header']['type'] = GENL_ID_CTRL
msg['header']['flags'] = NLM_F_REQUEST
msg['header']['pid'] = self.pid
msg.encode()
self.sendto(msg.data, (0, 0))
msg = self.get()[0]
err = msg['header'].get('error', None)
if err is not None:
if hasattr(err, 'code') and err.code == errno.ENOENT:
logging.error('Generic netlink protocol %s not found' % proto)
logging.error('Please check if the protocol module is loaded')
raise err
return msg
def del_interface(self, dev):
'''
Delete a virtual interface
- dev — device index
'''
msg = nl80211cmd()
msg['cmd'] = NL80211_NAMES['NL80211_CMD_DEL_INTERFACE']
msg['attrs'] = [['NL80211_ATTR_IFINDEX', dev]]
self.nlm_request(msg,
msg_type=self.prid,
msg_flags=NLM_F_REQUEST | NLM_F_ACK)
#
# if the inode is registered, skip it
#
if item['inode'] in registry:
raise SkipInode()
registry.add(item['inode'])
#
# request NETNSA_NSID
#
# may not work on older kernels ( <4.20 ?)
#
msg['attrs'] = [('NETNSA_FD', nsfd)]
try:
for info in self.nlm_request(msg,
RTM_GETNSID,
NLM_F_REQUEST):
# response to nlm_request() is a list or a generator,
# that's why loop
item['netnsid'] = info.get_attr('NETNSA_NSID')
break
except Exception:
pass
item['attrs'] = [('NSINFO_PATH', path)]
except OSError:
raise SkipInode()
finally:
if nsfd > 0:
os.close(nsfd)
item['header']['type'] = RTM_NEWNETNS
item['event'] = 'RTM_NEWNETNS'
return item
def sendmsg(self, msg):
return self.nlm_request(msg,
msg_type=self.prid,
msg_flags=NLM_F_REQUEST|NLM_F_ACK)
if command[:4] == 'vlan':
log.warning('vlan filters are managed via `vlan_filter()`')
log.warning('this compatibility hack will be removed soon')
return self.vlan_filter(command[5:], **kwarg)
flags_dump = NLM_F_REQUEST | NLM_F_DUMP
flags_req = NLM_F_REQUEST | NLM_F_ACK
flags_create = flags_req | NLM_F_CREATE | NLM_F_EXCL
commands = {'set': (RTM_NEWLINK, flags_req),
'update': (RTM_SETLINK, flags_create),
'add': (RTM_NEWLINK, flags_create),
'del': (RTM_DELLINK, flags_create),
'remove': (RTM_DELLINK, flags_create),
'delete': (RTM_DELLINK, flags_create),
'dump': (RTM_GETLINK, flags_dump),
'get': (RTM_GETLINK, NLM_F_REQUEST)}
msg = ifinfmsg()
# ifinfmsg fields
#
# ifi_family
# ifi_type
# ifi_index
# ifi_flags
# ifi_change
#
msg['family'] = kwarg.pop('family', 0)
lrq = kwarg.pop('kwarg_filter', IPLinkRequest)
(command, msg_flags) = commands.get(command, command)
# index
msg['index'] = kwarg.pop('index', 0)
# flags
#
# ## genlmsg header
# \x01 -- command, NL80211_CMD_GET_WIPHY
# \x00 -- version, ignored
# \x00\x00 -- reserved, ignored
#
# ## NLA chain
# \x04\x00 -- length
# \xae\x00 -- NLA type, NL80211_ATTR_SPLIT_WIPHY_DUMP
#
#
# OBS: iw sends different flags with this request
#
pprint(nl.nlm_request(msg,
msg_type=nl.prid,
msg_flags=NLM_F_REQUEST | NLM_F_DUMP))
def info(self,
interface):
msg = wgmsg()
msg['cmd'] = WG_CMD_GET_DEVICE
msg['attrs'].append(['WGDEVICE_A_IFNAME', interface])
return self.nlm_request(msg,
msg_type=self.prid,
msg_flags=NLM_F_REQUEST | NLM_F_DUMP)
def delete_routes(ip, table, routes):
"""
Delete routes from a routing table
:param pyroute2.iproute.IPRoute ip: IPRoute object
:param int table: routing table
:param Iterable[Route] routes: Routes to delete
"""
flags = netlink.NLM_F_REQUEST | netlink.NLM_F_ACK
for route in routes:
msg = rtmsg_from_route(route, table)
ip.nlm_request(msg, msg_type=rtnl.RTM_DELROUTE, msg_flags=flags)