Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testDefMode(self):
s = univ.SequenceOf()
s.setComponentByPosition(0, univ.OctetString('quick brown'))
assert encoder.encode(s) == ints2octs((48, 13, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110))
def testTrue(self):
assert encoder.encode(True, asn1Spec=univ.Boolean()) == ints2octs((1, 1, 1))
def testIndefMode(self):
assert encoder.encode(
self.o, defMode=False
) == ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120))
def MakeAttid(prefixTable, oid):
# get the last value in the original OID: the value * after the last '.'
lastValue = int(oid.split('.')[-1])
# convert the dotted form of OID into a BER encoded binary * format.
# The BER encoding of OID is described in section * 8.19 of [ITUX690]
from pyasn1.type import univ
from pyasn1.codec.ber import encoder
binaryOID = encoder.encode(univ.ObjectIdentifier(oid))[2:]
# get the prefix of the OID
if lastValue < 128:
oidPrefix = list(binaryOID[:-1])
else:
oidPrefix = list(binaryOID[:-2])
# search the prefix in the prefix table, if none found, add
# one entry for the new prefix.
fToAdd = True
pos = len(prefixTable)
for j, item in enumerate(prefixTable):
if item['prefix']['elements'] == oidPrefix:
fToAdd = False
pos = j
break
def encodeControlValue(self):
attributeSelection = AttributeDescriptionList()
for i in range(len(self.attrList)):
attributeSelection.setComponentByPosition(i,self.attrList[i])
return encoder.encode(attributeSelection)
return wholeMsg
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbRecvFun)
transportDispatcher.registerTimerCbFun(cbTimerFun)
# UDP/IPv4
transportDispatcher.registerTransport(
udp.DOMAIN_NAME, udp.UdpSocketTransport().openClientMode()
)
# Pass message to dispatcher
transportDispatcher.sendMessage(
encoder.encode(reqMsg), udp.DOMAIN_NAME, ('demo.snmplabs.com', 161)
)
transportDispatcher.jobStarted(1)
## UDP/IPv6 (second copy of the same PDU will be sent)
# transportDispatcher.registerTransport(
# udp6.domainName, udp6.Udp6SocketTransport().openClientMode()
# )
# Pass message to dispatcher
# transportDispatcher.sendMessage(
# encoder.encode(reqMsg), udp6.domainName, ('::1', 161)
# )
# transportDispatcher.jobStarted(1)
# Dispatcher will finish as job#1 counter reaches zero
transportDispatcher.runDispatcher()
if NATIVE_ASYNCIO:
return
else:
raise Return()
print('ID:', message_id, dict_req)
ldap_message = LDAPMessage()
ldap_message['messageID'] = MessageID(message_id)
ldap_message['protocolOp'] = ProtocolOp().setComponentByName(response_type, response)
controls = None
message_controls = build_controls_list(controls)
if message_controls is not None:
ldap_message['controls'] = message_controls
print('sending', ldap_message)
encoded_message = encoder.encode(ldap_message)
writer.write(encoded_message)
writer.drain()
def prepareResponseData(msgId, rsp, secret):
r = Response()
r['error-indication'] = str(rsp.get('error-indication', ''))
r['snmp-pdu'] = rsp['snmp-pdu'] and encoder.encode(rsp['snmp-pdu']) or ''
msg = Message()
msg['version'] = PROTOCOL_VERSION
msg['msg-id'] = msgId
msg['content-id'] = 'response'
msg['payload'] = encoder.encode(r)
if secret:
msg['payload'] = crypto.encrypt(secret, encoder.encode(r))
else:
msg['payload'] = encoder.encode(r)
return encoder.encode(msg)
startedAt = time()
return wholeMsg
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbRecvFun)
transportDispatcher.registerTimerCbFun(cbTimerFun)
transportDispatcher.registerTransport(
udp.DOMAIN_NAME, udp.UdpSocketTransport().openClientMode()
)
transportDispatcher.sendMessage(
encoder.encode(reqMsg), udp.DOMAIN_NAME, ('demo.snmplabs.com', 161)
)
transportDispatcher.jobStarted(1)
# Dispatcher will finish as job#1 counter reaches zero
transportDispatcher.runDispatcher()
transportDispatcher.closeDispatcher()
if not isinstance(val, pMod.Null):
break
else:
transportDispatcher.jobFinished(1)
continue
# Generate request for next row
pMod.apiPDU.setVarBinds(
reqPDU, [(x, pMod.null) for x, y in varBindTable[-1]]
)
pMod.apiPDU.setRequestID(reqPDU, pMod.getNextRequestID())
transportDispatcher.sendMessage(
encoder.encode(reqMsg), transportDomain, transportAddress
)
global startedAt
if time() - startedAt > 3:
raise Exception('Request timed out')
startedAt = time()
return wholeMsg