Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def header_to_binary(hdr):
b = []
b.append(struct.pack("<3ss", hdr.MessageType, hdr.ChunkType))
size = hdr.body_size + 8
if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
size += 4
b.append(Primitives.UInt32.pack(size))
if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
b.append(Primitives.UInt32.pack(hdr.ChannelId))
return b"".join(b)
def _copy_node(server, parent_nodeid, rdesc, nodeid, recursive):
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = rdesc.BrowseName
addnode.ParentNodeId = parent_nodeid
addnode.ReferenceTypeId = rdesc.ReferenceTypeId
addnode.TypeDefinition = rdesc.TypeDefinition
addnode.NodeClass = rdesc.NodeClass
node_to_copy = Node(server, rdesc.NodeId)
attrObj = getattr(ua, rdesc.NodeClass.name + "Attributes")
_read_and_copy_attrs(node_to_copy, attrObj(), addnode)
res = server.add_nodes([addnode])[0]
added_nodes = [res.AddedNodeId]
def _receive(self, msg):
self._check_incoming_chunk(msg)
self._incoming_parts.append(msg)
if msg.MessageHeader.ChunkType == ua.ChunkType.Intermediate:
return None
if msg.MessageHeader.ChunkType == ua.ChunkType.Abort:
err = struct_from_binary(ua.ErrorMessage, ua.utils.Buffer(msg.Body))
logger.warning("Message %s aborted: %s", msg, err)
# specs Part 6, 6.7.3 say that aborted message shall be ignored
# and SecureChannel should not be closed
self._incoming_parts = []
return None
elif msg.MessageHeader.ChunkType == ua.ChunkType.Single:
message = ua.Message(self._incoming_parts)
self._incoming_parts = []
return message
else:
raise ua.UaError("Unsupported chunk type: {0}".format(msg))
async def find_servers_on_network(self, params):
self.logger.info("find_servers_on_network")
request = ua.FindServersOnNetworkRequest()
request.Parameters = params
data = await self.protocol.send_request(request)
response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
self.logger.debug(response)
response.ResponseHeader.ServiceResult.check()
return response.Parameters
def _create_request_header(self, timeout=1000):
hdr = ua.RequestHeader()
hdr.AuthenticationToken = self.authentication_token
self._request_handle += 1
hdr.RequestHandle = self._request_handle
hdr.TimeoutHint = timeout
return hdr
# We should therefore also check for len(self._publishcallbacks) == 0, but
# this gets us into trouble if a Publish response arrives before the
# DeleteSubscription response.
#
# We could remove the callback already when sending the DeleteSubscription request,
# but there are some legitimate reasons to keep them around, such as when the server
# responds with "BadTimeout" and we should try again later instead of just removing
# the subscription client-side.
#
# There are a variety of ways to act correctly, but the most practical solution seems
# to be to just ignore any BadNoSubscription responses.
self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
return
# parse publish response
try:
response = struct_from_binary(ua.PublishResponse, data)
self.logger.debug(response)
except Exception:
# INFO: catching the exception here might be obsolete because we already
# catch BadTimeout above. However, it's not really clear what this code
# does so it stays in, doesn't seem to hurt.
self.logger.exception("Error parsing notification from server")
# send publish request ot server so he does stop sending notifications
self.loop.create_task(self.publish([]))
return
# look for callback
try:
callback = self._publish_callbacks[response.Parameters.SubscriptionId]
except KeyError:
self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
return
# do callback
def _create_data_type(self, type_name):
name = _to_camel_case(type_name)
# apply for new node id
data_type_node_id = self._nodeid_generator()
description_node_id = self._nodeid_generator()
bind_obj_node_id = self._nodeid_generator()
# create data type node
dt_node = ua.AddNodesItem()
dt_node.RequestedNewNodeId = data_type_node_id
dt_node.BrowseName = ua.QualifiedName(name, self._idx)
dt_node.NodeClass = ua.NodeClass.DataType
dt_node.ParentNodeId = ua.NodeId(ua.ObjectIds.Structure, 0)
dt_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype, 0)
dt_attributes = ua.DataTypeAttributes()
dt_attributes.DisplayName = ua.LocalizedText(type_name)
dt_node.NodeAttributes = dt_attributes
# create description node
desc_node = ua.AddNodesItem()
desc_node.RequestedNewNodeId = description_node_id
desc_node.BrowseName = ua.QualifiedName(name, self._idx)
desc_node.NodeClass = ua.NodeClass.Variable
desc_node.ParentNodeId = self.dict_id
desc_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent, 0)
desc_node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeDescriptionType, 0)
desc_attributes = ua.VariableAttributes()
desc_attributes.DisplayName = ua.LocalizedText(type_name)
desc_attributes.DataType = ua.NodeId(ua.ObjectIds.String)
desc_attributes.Value = ua.Variant(name, ua.VariantType.String)
desc_attributes.ValueRank = -1
mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
mydevice_var = mydevice.get_child(["{}:controller".format(idx), "{}:state".format(idx)]) # get proxy to our device state variable
# create directly some objects and variables
myobj = server.nodes.objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", 6.7)
mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)
myvar.set_writable() # Set MyVariable to be writable by clients
mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
mystringvar.set_writable() # Set MyVariable to be writable by clients
mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())
mydtvar.set_writable() # Set MyVariable to be writable by clients
myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
myprop = myobj.add_property(idx, "myproperty", "I am a property")
mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
# import some nodes from xml
server.import_xml("custom_nodes.xml")
# creating a default event object
# The event object automatically will have members for all events properties
# you probably want to create a custom event type, see other examples
myevgen = server.get_event_generator()
myevgen.event.Severity = 300
# starting!
server.start()
print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
vup = VarUpdater(mysin) # just a stupide class update a variable
vup.start()
try:
def check_answer(self, data, context):
data = data.copy()
typeid = nodeid_from_binary(data)
if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
self.logger.warning("ServiceFault from server received %s", context)
hdr = struct_from_binary(ua.ResponseHeader, data)
hdr.ServiceResult.check()
return False
return True