Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(localDevice)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# create a node, added to the network
self.node = Node(self.address, vlan)
# bind the network service to the node, no network number
self.nsap.bind(self.node)
# placeholder for the result block
self.confirmed_private_result = None
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(local_device)
# the segmentation state machines need access to some device
# information cache, usually shared with the application
self.smap.deviceInfoCache = DeviceInfoCache()
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = _NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# create a node, added to the network
self.node = Node(self.address, vlan)
if _debug: ApplicationLayerStateMachine._debug(" - node: %r", self.node)
# bind the stack to the local network
self.nsap.bind(self.node)
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(localDevice)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = _NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# create a node, added to the network
self.node = Node(self.address, vlan)
# bind the network service to the node, no network number
self.nsap.bind(self.node)
local_address = Address(args.localaddr)
if _debug: _log.debug(" - local_address: %r", local_address)
# create a null client that will accept, but do nothing with upstream
# packets from the BBMD
null_client = NullClient()
if _debug: _log.debug(" - null_client: %r", null_client)
# create a BBMD, bound to the Annex J server on a UDP multiplexer
bbmd = BIPBBMD(local_address)
annexj = AnnexJCodec()
multiplexer = UDPMultiplexer(local_address)
# bind the layers together
bind(null_client, bbmd, annexj, multiplexer.annexJ)
# loop through the rest of the addresses
for bdtentry in args.bdtentry:
if _debug: _log.debug(" - bdtentry: %r", bdtentry)
bdt_address = Address(bdtentry)
bbmd.add_peer(bdt_address)
if _debug: _log.debug(" - bbmd: %r", bbmd)
_log.debug("running")
run()
_log.debug("fini")
host = args.host
if host == "any":
host = ''
server_address = (host, args.port)
if _debug: _log.debug(" - server_address: %r", server_address)
# create a director listening to the address
this_director = TCPServerDirector(server_address)
if _debug: _log.debug(" - this_director: %r", this_director)
# create an echo
echo_master = EchoMaster()
if _debug: _log.debug(" - echo_master: %r", echo_master)
# bind everything together
bind(echo_master, this_director)
bind(MiddleManASE(), this_director)
_log.debug("running")
run()
_log.debug("fini")
def __init__(self, addr1, net1, addr2, net2):
if _debug: IP2IPRouter._debug("__init__ %r %r %r %r", addr1, net1, addr2, net2)
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
#== First stack
# create a generic BIP stack, bound to the Annex J server
# on the UDP multiplexer
self.s1_bip = BIPSimple()
self.s1_annexj = AnnexJCodec()
self.s1_mux = UDPMultiplexer(addr1)
# bind the bottom layers
bind(self.s1_bip, self.s1_annexj, self.s1_mux.annexJ)
# bind the BIP stack to the local network
self.nsap.bind(self.s1_bip, net1, addr1)
#== Second stack
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
#== First stack
# create a generic BIP stack, bound to the Annex J server
# on the UDP multiplexer
self.s1_bip = BIPSimple()
self.s1_annexj = AnnexJCodec()
self.s1_mux = UDPMultiplexer(addr1)
# bind the bottom layers
bind(self.s1_bip, self.s1_annexj, self.s1_mux.annexJ)
# bind the BIP stack to the local network
self.nsap.bind(self.s1_bip, net1, addr1)
#== Second stack
# create a generic BIP stack, bound to the Annex J server
# on the UDP multiplexer
self.s2_bip = BIPSimple()
self.s2_annexj = AnnexJCodec()
self.s2_mux = UDPMultiplexer(addr2)
# bind the bottom layers
bind(self.s2_bip, self.s2_annexj, self.s2_mux.annexJ)
# bind the BIP stack to the local network
local_broadcast_tuple = ('255.255.255.255', port)
else:
address = Address(args.address)
if _debug: _log.debug(" - local_address: %r", address)
local_unicast_tuple = address.addrTuple
local_broadcast_tuple = address.addrBroadcastTuple
if _debug: _log.debug(" - local_unicast_tuple: %r", local_unicast_tuple)
if _debug: _log.debug(" - local_broadcast_tuple: %r", local_broadcast_tuple)
console = ConsoleClient()
middle_man = MiddleMan()
unicast_director = UDPDirector(local_unicast_tuple)
bind(console, middle_man, unicast_director)
if args.noBroadcast:
_log.debug(" - skipping broadcast")
elif local_unicast_tuple == local_broadcast_tuple:
_log.debug(" - identical unicast and broadcast tuples")
elif local_broadcast_tuple[0] == '255.255.255.255':
_log.debug(" - special broadcast address only for sending")
else:
broadcast_receiver = BroadcastReceiver()
broadcast_director = UDPDirector(local_broadcast_tuple, reuse=True)
bind(broadcast_receiver, broadcast_director)
_log.debug("running")
host = args.host
if host == "any":
host = ''
server_address = (host, args.port)
if _debug: _log.debug(" - server_address: %r", server_address)
# create a director listening to the address
this_director = TCPServerDirector(server_address, idle_timeout=args.idle_timeout)
if _debug: _log.debug(" - this_director: %r", this_director)
# create a client
write_property_client = WritePropertyClient()
if _debug: _log.debug(" - write_property_client: %r", write_property_client)
# bind everything together
bind(write_property_client, this_director)
bind(WritePropertyASE(), this_director)
_log.debug("running")
run()
_log.debug("fini")
host = args.host
port = args.port
server_address = (host, port)
if _debug: _log.debug(" - server_address: %r", server_address)
# build the stack
this_console = ConsoleClient()
if _debug: _log.debug(" - this_console: %r", this_console)
this_middle_man = MiddleMan()
if _debug: _log.debug(" - this_middle_man: %r", this_middle_man)
this_director = TCPClientDirector()
if _debug: _log.debug(" - this_director: %r", this_director)
bind(this_console, this_middle_man, this_director)
bind(MiddleManASE(), this_director)
# create a task manager for scheduled functions
task_manager = TaskManager()
if _debug: _log.debug(" - task_manager: %r", task_manager)
# don't wait to connect
this_director.connect(server_address)
if _debug: _log.debug("running")
run()