How to use the pyads.structs.SAdsNotificationHeader function in pyads

To help you get started, we’ve selected a few pyads examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github stlehmann / pyads / tests / test_connection_class.py View on Github external
def test_notification_decorator(self):
        # type: () -> None
        """Test decoding of header by notification decorator"""

        @self.plc.notification()
        def callback(handle, name, timestamp, value):
            self.assertEqual(handle, 1234)
            self.assertEqual(name, "TestName")
            self.assertEqual(timestamp, datetime.datetime(2020, 1, 1))
            self.assertEqual(value, bytearray((5,)))

        notification = structs.SAdsNotificationHeader()
        notification.hNotification = 1234
        notification.nTimeStamp = 132223104000000000
        notification.cbSampleSize = 1
        notification.data = 5
        callback(pointer(notification), "TestName")
github stlehmann / pyads / tests / test_connection_class.py View on Github external
def test_notification_decorator_filetime(self):
        # type: () -> None
        """Test passthrough of FILETIME value by notification decorator"""

        @self.plc.notification(timestamp_as_filetime=True)
        def callback(handle, name, timestamp, value):
            self.assertEqual(timestamp, 132223104000000000)

        notification = structs.SAdsNotificationHeader()
        notification.nTimeStamp = 132223104000000000
        notification.cbSampleSize = 1
        notification.data = 5
        callback(pointer(notification), "TestName")
github stlehmann / pyads / tests / test_connection_class.py View on Github external
def create_notification_struct(self, payload):
        # type: (bytes) -> structs.SAdsNotificationHeader
        buf = b"\x00" * 12  # hNotification, nTimeStamp
        buf += struct.pack("
github stlehmann / pyads / pyads / ads.py View on Github external
>>>     print(value)
        >>>
        >>> with plc:
        >>>     # Add notification with default settings
        >>>     attr = pyads.NotificationAttrib(size_of(pyads.PLCTYPE_INT))
        >>>
        >>>     handles = plc.add_device_notification("GVL.myvalue", attr, mycallback)
        >>>
        >>>     # Remove notification
        >>>     plc.del_device_notification(handles)
        """
        contents = notification.contents
        data_size = contents.cbSampleSize
        # Get dynamically sized data array
        data = (c_ubyte * data_size).from_address(
            addressof(contents) + SAdsNotificationHeader.data.offset
        )

        if plc_datatype == PLCTYPE_STRING:
            # read only until null-termination character
            value = bytearray(data).split(b"\0", 1)[0].decode("utf-8")

        elif plc_datatype is not None and issubclass(plc_datatype, Structure):
            value = plc_datatype()
            fit_size = min(data_size, sizeof(value))
            memmove(addressof(value), addressof(data), fit_size)

        elif plc_datatype is not None and issubclass(plc_datatype, Array):
            if data_size == sizeof(plc_datatype):
                value = list(plc_datatype.from_buffer_copy(bytes(data)))
            else:
                # invalid size
github stlehmann / pyads / pyads / pyads_ex.py View on Github external
# variable any more when resolving DLL paths. The following works with the default
        # installation of the Beckhoff TwinCAT ADS DLL.
        dll_path = os.environ["TWINCAT3DIR"] + "\\..\\AdsApi\\TcAdsDll"
        if platform.architecture()[0] == "64bit":
            dll_path += "\\x64"
        dlldir_handle = os.add_dll_directory(dll_path)
    try:
        _adsDLL = ctypes.WinDLL("TcAdsDll.dll")  # type: ignore
    finally:
        if dlldir_handle:
            # Do not clobber the load path for other modules
            dlldir_handle.close()
    NOTEFUNC = ctypes.WINFUNCTYPE(  # type: ignore
        ctypes.c_void_p,
        ctypes.POINTER(SAmsAddr),
        ctypes.POINTER(SAdsNotificationHeader),
        ctypes.c_ulong,
    )

elif platform_is_linux():
    # try to load local adslib.so in favor to global one
    local_adslib = os.path.join(os.path.dirname(__file__), "adslib.so")
    if os.path.isfile(local_adslib):
        adslib = local_adslib
    else:
        adslib = "adslib.so"

    _adsDLL = ctypes.CDLL(adslib)

    NOTEFUNC = ctypes.CFUNCTYPE(
        None,
        ctypes.POINTER(SAmsAddr),
github stlehmann / pyads / pyads / pyads.py View on Github external
# Get the handle of the PLC-variable
    hnl = adsSyncReadWriteReq(
        adr, ADSIGRP_SYM_HNDBYNAME, 0x0, PLCTYPE_UDINT, dataName, PLCTYPE_STRING
    )

    # Write the value of a PLC-variable, via handle
    adsSyncWriteReq(adr, ADSIGRP_SYM_VALBYHND, hnl, value, plcDataType)

    # Release the handle of the PLC-variable
    adsSyncWriteReq(adr, ADSIGRP_SYM_RELEASEHND, 0, hnl, PLCTYPE_UDINT)


NOTEFUNC = None
if platform_is_windows():
    NOTEFUNC = ctypes.WINFUNCTYPE(
        c_void_p, POINTER(SAmsAddr), POINTER(SAdsNotificationHeader), c_ulong
    )

callback_store = dict()  # type: Dict[int, Optional[Any]]


@win32_only
def adsSyncAddDeviceNotificationReq(
    adr, data_name, pNoteAttrib, callback, user_handle=None
):
    # type: (AmsAddr, str, NotificationAttrib, Callable[[AmsAddr, Any, int], None], int) -> Tuple[int, int]  # noqa: E501
    """Add an ADS device notification.

    :param AmsAddr adr: local or remote AmsAddr
    :param str data_name: name of the plc variable
    :param NotificationAttrib pNoteAttrib: notification settings
    :param Callable callback: callback function