Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if (sys.version_info[:2] != (2, 5)):
sys.stderr.write("Python 2.5 only\n")
sys.exit(1)
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug: _log.debug(" - this_device: %r", this_device)
# make a simple application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make a console
this_console = ReadPropertyMultipleConsoleCmd()
if _debug: _log.debug(" - this_console: %r", this_console)
# enable sleeping will help with threads
enable_sleeping()
_log.debug("running")
run()
_log.debug("fini")
if _debug:
_log.debug("initialization")
if _debug:
_log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=("device", int(args.ini.objectidentifier)),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make the buttons
for button_id, bio_id in button_list:
bio = RPiBinaryInput(
button_id,
objectIdentifier=("binaryInput", bio_id),
objectName="Button-%d" % (button_id,),
)
_log.debug(" - bio: %r", bio)
this_application.add_object(bio)
# make the LEDs
for led_id, boo_id in led_list:
boo = RPiBinaryOutput(
led_id,
objectIdentifier=("binaryOutput", boo_id),
from bacpypes.local.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# counters
who_is_counter = defaultdict(int)
i_am_counter = defaultdict(int)
#
# WhoIsIAmApplication
#
@bacpypes_debugging
class WhoIsIAmApplication(BIPSimpleApplication):
def __init__(self, device, address):
if _debug: WhoIsIAmApplication._debug("__init__ %r %r", device, address)
BIPSimpleApplication.__init__(self, device, address)
def do_WhoIsRequest(self, apdu):
"""Respond to a Who-Is request."""
if _debug: WhoIsIAmApplication._debug("do_WhoIsRequest %r", apdu)
# build a key from the source and parameters
key = (str(apdu.pduSource),
apdu.deviceInstanceRangeLowLimit,
apdu.deviceInstanceRangeHighLimit,
)
# count the times this has been received
def main():
global this_application
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug: _log.debug(" - this_device: %r", this_device)
# make a simple application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make a console
this_console = ReadPropertyConsoleCmd()
if _debug: _log.debug(" - this_console: %r", this_console)
# enable sleeping will help with threads
enable_sleeping()
_log.debug("running")
run()
_log.debug("fini")
def main():
global this_application
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug: _log.debug(" - this_device: %r", this_device)
# make a simple application
this_application = BIPSimpleApplication(this_device, args.ini.address)
thread_list = []
# loop through the address and point lists
for addr, points in point_list:
# create a thread
read_thread = ReadPointListThread(addr, points)
if _debug: _log.debug(" - read_thread: %r", read_thread)
thread_list.append(read_thread)
# create a thread supervisor
thread_supervisor = ThreadSupervisor(thread_list)
# start it running when the core is running
deferred(thread_supervisor.start)
def main():
global this_application
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug: _log.debug(" - this_device: %r", this_device)
# make a simple application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make a console
this_console = ReadWritePropertyConsoleCmd()
if _debug: _log.debug(" - this_console: %r", this_console)
# enable sleeping will help with threads
enable_sleeping()
_log.debug("running")
run()
_log.debug("fini")
# parse the command line arguments
parser = ConfigArgumentParser(description=__doc__)
# parse the command line arguments
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug: _log.debug(" - this_device: %r", this_device)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
#
# Simple daily schedule (actually a weekly schedule with every day
# being identical.
#
so = LocalScheduleObject(
objectIdentifier=('schedule', 1),
objectName='Schedule 1',
presentValue=Integer(8),
effectivePeriod=DateRange(
startDate=(0, 1, 1, 1),
endDate=(254, 12, 31, 2),
),
weeklySchedule=ArrayOf(DailySchedule)([
DailySchedule(
daySchedule=[
def __init__(self, *args):
BIPSimpleApplication.__init__(self, *args)
RecurringTask.__init__(self, 250)
self.request_queue = Queue()
# assigning invoke identifiers
self.nextInvokeID = 1
# keep track of requests to line up responses
self.iocb = {}
self.install_task()
from bacpypes.core import run, enable_sleeping
from bacpypes.app import BIPSimpleApplication
from bacpypes.local.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# SampleApplication
#
@bacpypes_debugging
class SampleApplication(BIPSimpleApplication):
def __init__(self, device, address):
if _debug: SampleApplication._debug("__init__ %r %r", device, address)
BIPSimpleApplication.__init__(self, device, address)
def request(self, apdu):
if _debug: SampleApplication._debug("request %r", apdu)
BIPSimpleApplication.request(self, apdu)
def indication(self, apdu):
if _debug: SampleApplication._debug("indication %r", apdu)
BIPSimpleApplication.indication(self, apdu)
def response(self, apdu):
if _debug: SampleApplication._debug("response %r", apdu)
BIPSimpleApplication.response(self, apdu)