Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# indentation, for multi-line comments, ensures that subsquent lines
# are correctly alligned with the first line of the comment.
indentation = 0
if exclude:
# len('1.1.1.1/32 except;') == 21
indentation = 21 + self._DEFAULT_INDENT
else:
# len('1.1.1.1/32;') == 14
indentation = 14 + self._DEFAULT_INDENT
# length_eol is the width of the line; b/c of the addition of the space
# and the /* characters, it needs to be a little less than the actual width
# to keep from wrapping
length_eol = 77 - indentation
if isinstance(addr, (nacaddr.IPv4, nacaddr.IPv6, summarizer.DSMNet)):
if addr.text:
if line_length == 0:
# line_length of 0 means that we don't want to truncate the comment.
line_length = len(addr.text)
# There should never be a /* or */, but be safe and ignore those
# comments
if addr.text.find('/*') >= 0 or addr.text.find('*/') >= 0:
logging.debug('Malformed comment [%s] ignoring', addr.text)
else:
text = addr.text[:line_length]
comment = ' /*'
while text:
def ValueStr(self):
return self.value
class IntegerField(Field):
def __init__(self, value):
super(IntegerField, self).__init__(value)
try:
_ = int(value)
except ValueError:
raise ValueError('Invalid integer field: "%s"' % str(self))
class NamingField(Field):
"""A naming field is one that refers to names in used in naming.py."""
def __init__(self, value):
super(NamingField, self).__init__(value)
self.value = self.ParseString(value)
def ParseString(self, value):
"""Split and validate a string value into individual names."""
parts = set(value.split())
for p in parts:
self.ValidatePart(p)
return parts
def ValidatePart(self, part):
"""Validate that a string smells like a naming.py name."""
for c in part:
def __str__(self):
target = []
for (header, filter_name, terms) in self.ciscoasa_policies:
target.append('clear configure access-list %s' % filter_name)
# add the p4 tags
target.extend(aclgenerator.AddRepositoryTags('access-list %s remark '
% filter_name))
# add a header comment if one exists
for comment in header.comment:
for line in comment.split('\n'):
target.append('access-list %s remark %s' % (filter_name, line))
# now add the terms
for term in terms:
target.append(str(term))
# end for header, filter_name, filter_type...
return '\n'.join(target)
for filter_type in filter_list:
target.extend(self._AppendTargetByFilterType(filter_name, filter_type))
if filter_type == 'object-group':
obj_target.AddName(filter_name)
# Add the Perforce Id/Date tags, these must come after
# remove/re-create of the filter, otherwise config mode doesn't
# know where to place these remarks in the configuration.
if self.verbose:
if filter_type == 'standard' and filter_name.isdigit():
target.extend(
aclgenerator.AddRepositoryTags(
'access-list %s remark ' % filter_name,
date=False, revision=False))
else:
target.extend(aclgenerator.AddRepositoryTags(
' remark ', date=False, revision=False))
# add a header comment if one exists
for comment in header.comment:
for line in comment.split('\n'):
if (self._PLATFORM == 'cisco' and filter_type == 'standard' and
filter_name.isdigit()):
target.append('access-list %s remark %s' % (filter_name, line))
else:
target.append(' remark %s' % line)
# now add the terms
for term in terms:
term_str = str(term)
if term_str:
target.append(term_str)
def __str__(self):
config = Config()
for (header, filter_name, filter_type, interface_specific, terms
) in self.juniper_policies:
# add the header information
config.Append('firewall {')
config.Append('family %s {' % filter_type)
config.Append('replace:')
config.Append('/*')
# we want the acl to contain id and date tags, but p4 will expand
# the tags here when we submit the generator, so we have to trick
# p4 into not knowing these words. like taking c-a-n-d-y from a
# baby.
for line in aclgenerator.AddRepositoryTags('** '):
config.Append(line)
config.Append('**')
for comment in header.comment:
for line in comment.split('\n'):
config.Append('** ' + line)
config.Append('*/')
config.Append('filter %s {' % filter_name)
if interface_specific:
config.Append('interface-specific;')
for term in terms:
term_str = str(term)
if term_str:
config.Append(term_str, verbatim=True)
def GetIpParents(self, query):
"""Return network tokens that contain IP in query.
Args:
query: an ip string ('10.1.1.1') or nacaddr.IP object
Returns:
A sorted list of unique parent tokens.
"""
base_parents = []
recursive_parents = []
# convert string to nacaddr, if arg is ipaddr then convert str() to nacaddr
if (not isinstance(query, nacaddr.IPv4) and
not isinstance(query, nacaddr.IPv6)):
if query[:1].isdigit():
query = nacaddr.IP(query)
# Get parent token for an IP
if isinstance(query, nacaddr.IPv4) or isinstance(query, nacaddr.IPv6):
for token in self.networks:
for item in self.networks[token].items:
item = item.split('#')[0].strip()
if not item[:1].isdigit():
continue
try:
supernet = nacaddr.IP(item, strict=False)
if supernet.supernet_of(query):
base_parents.append(token)
except ValueError:
# item was not an IP
pass
Args:
query: an ip string ('10.1.1.1') or nacaddr.IP object
Returns:
A sorted list of unique parent tokens.
"""
base_parents = []
recursive_parents = []
# convert string to nacaddr, if arg is ipaddr then convert str() to nacaddr
if (not isinstance(query, nacaddr.IPv4) and
not isinstance(query, nacaddr.IPv6)):
if query[:1].isdigit():
query = nacaddr.IP(query)
# Get parent token for an IP
if isinstance(query, nacaddr.IPv4) or isinstance(query, nacaddr.IPv6):
for token in self.networks:
for item in self.networks[token].items:
item = item.split('#')[0].strip()
if not item[:1].isdigit():
continue
try:
supernet = nacaddr.IP(item, strict=False)
if supernet.supernet_of(query):
base_parents.append(token)
except ValueError:
# item was not an IP
pass
# Get parent token for another token
else:
for token in self.networks:
for item in self.networks[token].items:
def GetIpParents(self, query):
"""Return network tokens that contain IP in query.
Args:
query: an ip string ('10.1.1.1') or nacaddr.IP object
Returns:
A sorted list of unique parent tokens.
"""
base_parents = []
recursive_parents = []
# convert string to nacaddr, if arg is ipaddr then convert str() to nacaddr
if (not isinstance(query, nacaddr.IPv4) and
not isinstance(query, nacaddr.IPv6)):
if query[:1].isdigit():
query = nacaddr.IP(query)
# Get parent token for an IP
if isinstance(query, nacaddr.IPv4) or isinstance(query, nacaddr.IPv6):
for token in self.networks:
for item in self.networks[token].items:
item = item.split('#')[0].strip()
if not item[:1].isdigit():
continue
try:
supernet = nacaddr.IP(item, strict=False)
if supernet.supernet_of(query):
base_parents.append(token)
except ValueError:
# item was not an IP
pass
# Get parent token for another token
else:
base_parents = []
recursive_parents = []
# convert string to nacaddr, if arg is ipaddr then convert str() to nacaddr
if (not isinstance(query, nacaddr.IPv4) and
not isinstance(query, nacaddr.IPv6)):
if query[:1].isdigit():
query = nacaddr.IP(query)
# Get parent token for an IP
if isinstance(query, nacaddr.IPv4) or isinstance(query, nacaddr.IPv6):
for token in self.networks:
for item in self.networks[token].items:
item = item.split('#')[0].strip()
if not item[:1].isdigit():
continue
try:
supernet = nacaddr.IP(item, strict=False)
if supernet.supernet_of(query):
base_parents.append(token)
except ValueError:
# item was not an IP
pass
# Get parent token for another token
else:
for token in self.networks:
for item in self.networks[token].items:
item = item.split('#')[0].strip()
if item[:1].isalpha() and item == query:
base_parents.append(token)
# look for nested tokens
for bp in base_parents:
done = False
for token in self.networks:
self.destination_tag.append(x.value)
elif x.var_type is VarType.FLEXIBLE_MATCH_RANGE:
self.flexible_match_range.append(x.value)
else:
raise TermObjectTypeError(
'%s isn\'t a type I know how to deal with (contains \'%s\')' % (
type(x), x.value))
else:
# stupid no switch statement in python
if obj.var_type is VarType.COMMENT:
self.comment.append(str(obj))
elif obj.var_type is VarType.OWNER:
self.owner = obj.value
elif obj.var_type is VarType.EXPIRATION:
self.expiration = obj.value
elif obj.var_type is VarType.LOSS_PRIORITY:
self.loss_priority = obj.value
elif obj.var_type is VarType.ROUTING_INSTANCE:
self.routing_instance = obj.value
elif obj.var_type is VarType.PRECEDENCE:
self.precedence = obj.value
elif obj.var_type is VarType.FORWARDING_CLASS:
self.forwarding_class.append(obj.value)
elif obj.var_type is VarType.FORWARDING_CLASS_EXCEPT:
self.forwarding_class_except.append(obj.value)
elif obj.var_type is VarType.PAN_APPLICATION:
self.pan_application.append(obj.value)
elif obj.var_type is VarType.NEXT_IP:
self.next_ip = DEFINITIONS.GetNetAddr(obj.value)
elif obj.var_type is VarType.VERBATIM:
self.verbatim.append(obj.value)
elif obj.var_type is VarType.ACTION: