Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return '\n'.join(ret_str)
# protocol
if not self.term.protocol:
protocol = ['ip']
else:
# fix the protocol
protocol = self.term.protocol
# source address
if self.term.source_address:
source_address = self.term.GetAddressOfVersion('source_address', self.af)
source_address_exclude = self.term.GetAddressOfVersion(
'source_address_exclude', self.af)
if source_address_exclude:
source_address = nacaddr.ExcludeAddrs(
source_address,
source_address_exclude)
else:
# source address not set
source_address = ['any']
# destination address
if self.term.destination_address:
destination_address = self.term.GetAddressOfVersion(
'destination_address', self.af)
destination_address_exclude = self.term.GetAddressOfVersion(
'destination_address_exclude', self.af)
if destination_address_exclude:
destination_address = nacaddr.ExcludeAddrs(
destination_address,
destination_address_exclude)
def _GetIpString(self, addr):
"""Formats the address object for printing in the ACL.
Args:
addr: str or ipaddr, address
Returns:
An address string suitable for the ACL.
"""
if isinstance(addr, nacaddr.IPv4) or isinstance(addr,
ipaddress.IPv4Network):
if addr.num_addresses > 1:
if self.platform == 'arista':
return addr.with_prefixlen
return '%s %s' % (addr.network_address, addr.hostmask)
return 'host %s' % (addr.network_address)
if isinstance(addr, nacaddr.IPv6) or isinstance(addr,
ipaddress.IPv6Network):
if addr.num_addresses > 1:
return addr.with_prefixlen
return 'host %s' % (addr.network_address)
# DSMO enabled
if isinstance(addr, summarizer.DSMNet):
return '%s %s' % summarizer.ToDottedQuad(addr, negate=True)
return addr
"""Reduce source, dest, and address fields to their post-exclude state.
Populates the self.flattened_addr, self.flattened_saddr,
self.flattened_daddr by removing excludes from includes.
"""
# No excludes, set flattened attributes and move along.
self.flattened = True
if not (self.source_address_exclude or self.destination_address_exclude or
self.address_exclude):
self.flattened_saddr = self.source_address
self.flattened_daddr = self.destination_address
self.flattened_addr = self.address
return
if self.source_address_exclude:
self.flattened_saddr = nacaddr.AddressListExclude(
self.source_address,
self.source_address_exclude,
collapse_addrs=False)
self.source_address = self.flattened_saddr
if self.destination_address_exclude:
self.flattened_daddr = nacaddr.AddressListExclude(
self.destination_address,
self.destination_address_exclude,
collapse_addrs=False)
self.destination_address = self.flattened_daddr
if self.address_exclude:
self.flattened_addr = nacaddr.AddressListExclude(
self.address, self.address_exclude, collapse_addrs=False)
self.address = self.flattened_addr
if filter_options:
network = filter_options[0]
else:
logging.warn('GCE filter does not specify a network.')
term_names = set()
if IsDefaultDeny(terms[-1]):
terms[-1].protocol = ['all']
terms[-1].priority = 65534
if direction == 'EGRESS':
terms[-1].destination_address = [nacaddr.IP('0.0.0.0/0'),
nacaddr.IP('::/0')]
else:
terms[-1].source_address = [
nacaddr.IP('0.0.0.0/0'),
nacaddr.IP('::/0')
]
for term in terms:
if term.stateless_reply:
logging.warn('WARNING: Term %s in policy %s is a stateless reply '
'term and will not be rendered.',
term.name, filter_name)
continue
term.network = network
if not term.comment:
term.comment = header.comment
term.name = self.FixTermLength(term.name)
if term.name in term_names:
raise GceFirewallError('Duplicate term name')
term_names.add(term.name)