Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _is_in_view(self, record):
result = False
try:
result = bool(netaddr.smallest_matching_cidr(self.remote_ip, record.get('rule').split()))
except (AttributeError, netaddr.AddrFormatError, ValueError) as e:
logging.error('{}: record id {} view rule invalid: {}: {}'.format(
type(self).__name__, record['id'], type(e).__name__, e))
return result
def _set_value(self, val):
# Select a strategy object for this address.
if self._addr_type == AT_UNSPEC:
for strategy in self.__class__.STRATEGIES:
if strategy.valid_str(val):
self.strategy = strategy
break
# Make sure we picked up a strategy object.
if self._strategy is None:
raise AddrFormatError('%r is not a recognised address ' \
'format!' % val)
# Calculate and validate the value for this address.
if isinstance(val, (str, unicode)):
val = self._strategy.str_to_int(val)
elif isinstance(val, (int, long)):
if not self._strategy.valid_int(val):
raise OverflowError('value %r cannot be represented ' \
'in %d bit(s)!' % (val, self._strategy.width))
self._value = val
def __init__(self, ipglob, fmt=IP):
"""
Constructor.
@param ipglob: a valid IPv4 ipglob address
@param fmt: (optional) callable used on return values.
Default: L{IP} objects. See L{nrange()} documentations for
more details on the various options..
"""
if not IPGlob.is_valid(ipglob):
raise AddrFormatError('%r is not a recognised ipglob address!' \
% ipglob)
t1 = []
t2 = []
for octet in ipglob.split('.'):
if '-' in octet:
oct_tokens = octet.split('-')
t1 += [oct_tokens[0]]
t2 += [oct_tokens[1]]
elif octet == '*':
t1 += ['0']
t2 += ['255']
else:
t1 += [octet]
t2 += [octet]
if targets is None:
return None
if targets is not None:
try:
target_list = []
for target in targets.split(','):
if '/' in target:
target_list.extend(list(IPNetwork(target)))
elif '-' in target:
start_addr = IPAddress(target.split('-')[0])
try:
end_addr = IPAddress(target.split('-')[1])
ip_range = IPRange(start_addr, end_addr)
except AddrFormatError:
end_addr = list(start_addr.words)
end_addr[-1] = target.split('-')[1]
end_addr = IPAddress('.'.join(map(str, end_addr)))
ip_range = IPRange(start_addr, end_addr)
target_list.extend(list(ip_range))
else:
target_list.append(IPAddress(target))
return target_list
except Exception as e:
try:
target = socket.gethostbyname(self.targets)
target_list.append(target)
if not isinstance(exposes, list):
raise ValueError('exposes must be a list')
for expose in exposes:
if ':' not in expose:
raise ValueError('expose must be formatted as inport:hostport')
inport, hostport = expose.split(':', 1)
if not (inport.isdigit() and hostport.isdigit()):
raise ValueError('port must be integer')
route = content.get('network_route', '')
if route:
try:
IPAddress(route)
except AddrFormatError:
raise ValueError('network_route must be IPv4 address')
# check build
build = appconfig['build']
if not isinstance(build, (basestring, list)):
raise ValueError('build must be string or list')
volumes = appconfig.get('volumes', [])
if not isinstance(volumes, list):
raise ValueError('volumes must be list')
binds = appconfig.get('binds', {})
if not isinstance(binds, dict):
raise ValueError('volumes must be dictionary')
if len(volumes) != len(binds):
def get_pool(self, ip):
"""ip can be either an IP or a CIDR"""
if '/' in ip:
try:
ip = IPAddress(IPNetwork(ip).first)
except (AddrFormatError, ValueError):
return
else:
try:
ip = IPAddress(ip)
except (AddrFormatError, ValueError):
return
pool = _ipam.get_pool(ip)
name = rds.get(_POOL_NAME_KEY % pool.cidr)
return WrappedNetwork.from_calico(pool, name)
)
address = None
for line in stdout:
addressmatch = self._INET_ADDRESS_RE.match(line)
if addressmatch is not None:
address = addressmatch.group('address')
break
self.logger.debug('address: ' + str(address))
if address is None:
raise RuntimeError(
_('Cannot acquire nic/bridge address')
)
try:
ipna = netaddr.IPNetwork(address)
except netaddr.AddrFormatError:
raise RuntimeError(
_('Cannot acquire a valid nic/bridge address')
)
return ipna
return self.netaddr.get_network(range_str)
except netaddr.AddrFormatError:
return None
if range_str.find('*') > -1:
try:
return self.netaddr.get_glob(range_str)
except netaddr.AddrFormatError:
return None
if ip_regex.search(range_str) and self._is_ip(range_str):
# must be a single ip
self.start_ip = range_str
try:
return self.netaddr.get_address(self.start_ip)
except netaddr.AddrFormatError:
return None
# looks like a hostname, but starts with a digit, must be
# a broken ip address. hostnames can't start with a digit
if range_str[0] in string.digits:
return None
# doesn't look like anything else, try treating it as a hostname
try:
self.start_ip = range_str
ips = [self.start_ip]
return ips
except:
return None
return None
for endpoint in endpoints:
remove_veth(endpoint.name)
ips |= {net.ip for net in endpoint.ipv4_nets}
ips |= {net.ip for net in endpoint.ipv6_nets}
client.release_ips(ips)
# Remove the IPAM host data.
client.remove_ipam_host(host_to_remove)
# If the host had an IPIP tunnel address, release it back to the IPAM pool
# so that we don't leak it when we delete the config.
raw_addr = client.get_per_host_config(host_to_remove, "IpInIpTunnelAddr")
try:
ip_addr = IPAddress(raw_addr)
client.release_ips({ip_addr})
except (AddrFormatError, ValueError, TypeError):
pass
client.remove_per_host_config(host_to_remove, "IpInIpTunnelAddr")
client.remove_host(host_to_remove)
print "Node configuration removed"
self.hash_to_fetch[info_hash] = time.time()
try:
del self.share.bad_info_hash[info_hash]
except KeyError:
pass
if info_hash in self.hash_to_fetch:
for ipport in response.get("values", []):
(ip, port) = struct.unpack("!4sH", ipport)
ip = socket.inet_ntoa(ip)
try:
if (ip, port) not in self.hash_to_fetch_tried[info_hash]:
if not netaddr.IPAddress(ip).is_private():
self.hash_to_fetch_totry[info_hash].add((ip, port))
else:
self.hash_to_fetch_tried[info_hash].add((ip, port))
except netaddr.AddrFormatError:
self.hash_to_fetch_tried[info_hash].add((ip, port))