Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def associate_floating_ip(self, loadbalancer):
"""Associates a Floating IP with the LoadBalancer"""
valid_ip = valid_ipv4(self.floatingip) or valid_ipv6(self.floatingip)
if not valid_ip:
LOGGER.error("'%s' is not a valid IP address", self.floatingip)
sys.exit(1)
if self._existing_floating_ip == self.floatingip:
return self._existing_floating_ip
fip = self.conn.network.find_ip(self.floatingip)
# Assign IP to LB
try:
fip = self.conn.network.update_ip(fip, port_id=loadbalancer.vip_port_id)
except OSNotFound:
LOGGER.error("The resource {} cannot be associated".format(self.floatingip))
self.delete()
sys.exit(1)
LOGGER.success("Loadbalancer external IP: %s",
def netlist(nets):
v4nets = []
v6nets = []
for net in nets:
ipNetwork = netaddr.IPNetwork(net)
parts = str(ipNetwork).split("/")
ip = parts[0]
mask = parts[1]
if netaddr.valid_ipv4(ip) and int(mask) <= 32:
v4nets.append(ipNetwork)
elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
v6nets.append(ipNetwork)
return v4nets, v6nets
pull_object = select.poll()
pull_object.register(process.stdout, select.POLLIN)
# while True, read tshark output
try:
while True:
if pull_object.poll(0):
line = process.stdout.readline()
# check if new IP and Port printed
if len(line) > 0:
# split the IP and Port
try:
ip, port = str(line.rsplit()[0].decode()), int(line.rsplit()[1])
except Exception as _:
ip, port = None, None
# check if event shows an IP
if (netaddr.valid_ipv4(ip) or netaddr.valid_ipv6(ip)) \
and ip not in ignore_ip_addresses \
and port not in ignore_ports: # ignored ip addresses and ports in python - fix later
# check if the port is in selected module
if port in honeypot_ports:
insert_selected_modules_network_event(
ip,
port,
selected_module,
machine_name
)
else:
insert_other_network_event(
ip,
port,
machine_name
)
def _format_address_for_dnsmasq(address):
# (dzyu) Check if it is legal ipv6 address, if so, need wrap
# it with '[]' to let dnsmasq to distinguish MAC address from
# IPv6 address.
if netaddr.valid_ipv6(address):
return '[%s]' % address
return address
def isIPv6(ip):
try:
return netaddr.valid_ipv6(ip)
except:
return False
def is_valid_ip(ip):
"""Return True if the IP is either v4 or v6
Return False if invalid.
"""
return netaddr.valid_ipv4(ip) or netaddr.valid_ipv6(ip)
def validIP6(self, address):
return netaddr.valid_ipv6(address)
def _is_pingable(self, mgmt_ip='', retry=5, timeout=5, port=80, **kwargs):
"""Checks whether the server is reachable by using urllib.
Waits for connectivity for `timeout` seconds,
and if connection refused, it will retry `retry`
times.
:param mgmt_ip: IP to check
:param retry: times to reconnect if connection refused
:param timeout: seconds to wait for connection
:param port: port number to check connectivity
:return: bool - True or False depending on pingability.
"""
url = 'http://' + mgmt_ip + ':' + str(port)
if netaddr.valid_ipv6(mgmt_ip):
url = 'http://[' + mgmt_ip + ']:' + str(port)
for retry_index in range(int(retry)):
try:
urlreq.urlopen(url, timeout=timeout)
return True
except urlerr.URLError:
LOG.warning('Unable to reach to the url %s', url)
return 'failure'
def format_addr(addr):
"""
Format IP address when used for URLs
"""
if url:
if netaddr.valid_ipv6(addr) is True:
return "[{}]".format(addr)
return addr