Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
DEVICE_WATCHDOG = 280
DISCONNECT_PEER = 282
RE_AUTH = 258
SESSION_TERMINATION = 275
class Diameter(pypacker.Packet):
__hdr__ = (
("v", "B", 1),
("len", "3s", b"\x00" * 3),
("flags", "B", 0),
("cmd", "3s", b"\x00" * 3),
("app_id", "I", 0),
("hop_id", "I", 0),
("end_id", "I", 0),
("avps", None, triggerlist.TriggerList)
)
def __get_r(self):
return (self.flags >> 7) & 0x1
def __set_r(self, r):
self.flags = (self.flags & ~0x80) | ((r & 0x1) << 7)
request_flag = property(__get_r, __set_r)
def __get_p(self):
return (self.flags >> 6) & 0x1
def __set_p(self, p):
self.flags = (self.flags & ~0x40) | ((p & 0x1) << 6)
proxiable_flag = property(__get_p, __set_p)
if buf[:2] == b"\x01\x01":
self._init_handler(PFC_OPCODE, buf[2:])
else:
self._init_handler(PAUSE_OPCODE, buf[2:])
return 2
class Pause(pypacker.Packet):
__hdr__ = (
("ptime", "H", 0x0000),
)
class PFC(pypacker.Packet):
__hdr__ = (
("ms", "B", 0), # most significant octet is reserved,set to zero
("ls", "B", 0), # least significant octet indicates time_vector parameter
("time", None, triggerlist.TriggerList),
)
# Conveniant access to ls field(bit representation via list)
# e.g. 221 -> [1, 1, 0, 1, 1, 1, 0, 1]
def __get_ls(self):
#return [(self.ls >> x) & 1 for x in reversed(range(8))]
return [int(bstr) for bstr in bin(self.ls)[2:]]
# e.g. [1, 1, 0, 1, 1, 1, 0, 1] -> 221
def __set_ls(self, value):
#self.ls = int("".join(map(str, value)), 2)
self.ls = int("".join(["%d" % bint for bint in value]), 2)
ls_list = property(__get_ls, __set_ls)
# Conveniant access to time field(decimal representation via list)
def __get_time(self):
DB_TX_ATTN_MASK,
DBM_TX_POWER_MASK,
ANTENNA_MASK,
ANT_SIG_MASK,
ANT_NOISE_MASK,
RX_FLAGS_MASK,
HT_MASK,
AMPDU_MASK,
VHT_MASK
]
class FlagTriggerList(triggerlist.TriggerList):
# no __init__ needed: we just add tuples
def _pack(self, tuple_entry):
#return b"".join([flag[1] for flag in self])
return tuple_entry[1]
def get_channelinfo(channel_bytes):
"""
return -- [channel_mhz, channel_flags]
"""
return [unpack_H_le(channel_bytes[0:2])[0], unpack_H_le(channel_bytes[2:4])[0]]
def freq_to_channel(freq):
"""
freq -- frequqncy in Hz
class IP(pypacker.Packet):
__hdr__ = (
("v_hl", "B", 69, FIELD_FLAG_AUTOUPDATE), # = 0x45
("tos", "B", 0),
("len", "H", 20, FIELD_FLAG_AUTOUPDATE),
("id", "H", 0),
# TODO: rename to frag_off
("off", "H", 0),
("ttl", "B", 64),
("p", "B", IP_PROTO_TCP, FIELD_FLAG_AUTOUPDATE | FIELD_FLAG_IS_TYPEFIELD),
("sum", "H", 0, FIELD_FLAG_AUTOUPDATE),
("src", "4s", b"\x00" * 4),
("dst", "4s", b"\x00" * 4),
("opts", None, triggerlist.TriggerList)
)
__handler__ = {
IP_PROTO_ICMP: icmp.ICMP,
IP_PROTO_IGMP: igmp.IGMP,
IP_PROTO_TCP: tcp.TCP,
IP_PROTO_UDP: udp.UDP,
IP_PROTO_IP6: ip6.IP6,
IP_PROTO_ESP: esp.ESP,
IP_PROTO_PIM: pim.PIM,
IP_PROTO_IPXIP: ipx.IPX,
IP_PROTO_SCTP: sctp.SCTP,
IP_PROTO_OSPF: ospf.OSPF
}
def __get_v(self):
TCP_PROTO_RTP = (5004, 5005)
TCP_PROTO_SIP = (5060, 5061)
class TCP(pypacker.Packet):
__hdr__ = (
("sport", "H", 0xdead),
("dport", "H", 0, FIELD_FLAG_AUTOUPDATE | FIELD_FLAG_IS_TYPEFIELD),
("seq", "I", 0xdeadbeef),
("ack", "I", 0),
("off_x2", "B", ((5 << 4) | 0), FIELD_FLAG_AUTOUPDATE), # 10*4 Byte
("flags", "B", TH_SYN), # acces via (obj.flags & TH_XYZ)
("win", "H", TCP_WIN_MAX),
("sum", "H", 0, FIELD_FLAG_AUTOUPDATE),
("urp", "H", 0),
("opts", None, triggerlist.TriggerList)
)
# 4 bits | 4 bits
# offset | reserved
# offset * 4 = header length
def __get_off(self):
return self.off_x2 >> 4
def __set_off(self, value):
self.off_x2 = (value << 4) | (self.off_x2 & 0xf)
off = property(__get_off, __set_off)
# return real header length based on header info
def __get_hlen(self):
return self.off * 4
self.params.append(param)
pcount -= 1
off += plen
return off
class Parameter(pypacker.Packet):
__hdr__ = (
("type", "B", 0),
("len", "B", 0)
)
class Update(pypacker.Packet):
__hdr__ = (
("withdrawnlen", "H", 0),
("pathlen", "H", 0),
("wroutes", None, triggerlist.TriggerList),
("pathattrs", None, triggerlist.TriggerList),
("anncroutes", None, triggerlist.TriggerList),
)
def _dissect(self, buf):
# withdrawn Routes
off = 4
off_end = off + unpack_H(buf[:2])[0]
while off < off_end:
rlen = 3 + 0
route = Route(buf[off:])
self.wroutes.append(route)
off += rlen
# path attributes
# Handshake types
HNDS_HELLO_REQ = 0
HNDS_HELLO_CLIENT = 1
HNDS_HELLO_SERVER = 2
HNDS_CERTIFICATE = 11
HNDS_SERVER_KEY_EXCHANGE = 12
HNDS_CERTIFICATE_REQ = 13
HNDS_SERVER_HELLO_DONE = 14
HNDS_CERT_VERIFIY = 15
HNDS_CLIENT_KEY_EXCHANGE = 16
HNDS_FINISHED = 20
class SSL(pypacker.Packet):
__hdr__ = (
("records", None, triggerlist.TriggerList),
)
def _dissect(self, buf):
# logger.debug("parsing SSL")
# parse all records out of message
# possible types are Client/Sevrer Hello, Change Cipher Spec etc.
records = []
offset = 0
dlen = len(buf)
# records is the only header so it's ok to avoid lazy dissecting
while offset < dlen:
record_len = unpack_H(buf[offset + 3: offset + 5])[0]
record = Record(buf[offset: offset + 5 + record_len])
records.append(record)
offset += 5 + record_len
DNS_CHAOS = 3
DNS_HESIOD = 4
DNS_ANY = 255
class DNS(pypacker.Packet):
__hdr__ = (
("id", "H", 0x1234),
("flags", "H", DNS_AD | DNS_RD),
("questions_amount", "H", 0, FIELD_FLAG_AUTOUPDATE),
("answers_amount", "H", 0, FIELD_FLAG_AUTOUPDATE),
("authrr_amount", "H", 0, FIELD_FLAG_AUTOUPDATE),
("addrr_amount", "H", 0, FIELD_FLAG_AUTOUPDATE),
("queries", None, triggerlist.TriggerList),
("answers", None, triggerlist.TriggerList),
("auths", None, triggerlist.TriggerList),
("addrecords", None, triggerlist.TriggerList)
)
class Query(pypacker.Packet):
"""DNS question."""
__hdr__ = (
("name", None, b"\x03www\x04test\x03com\x00"),
("type", "H", DNS_A),
("cls", "H", DNS_IN)
)
name_s = pypacker.get_property_dnsname("name")
def _dissect(self, buf):
q_end = DNS.get_dns_length(buf)
self.name = buf[:q_end]
from pypacker import pypacker
from pypacker import triggerlist
import logging
logger = logging.getLogger("pypacker")
class SubPacket(pypacker.Packet):
"""Packet to be used in TriggerLists"""
__hdr__ = (
("static_field1", "B", 123),
)
class DynamicField(triggerlist.TriggerList):
"""Specialised TriggerList representing dynamic fields."""
def _handle_mod(self, val):
try:
# "subfield" has to be updated on changes to the header
self.packet.subfield = self.hdr_len
except:
pass
def _pack(self):
"""Assumes something like text based protos eg HTTP"""
return b"-->".join(self)
class NewProtocol(pypacker.Packet):
__hdr__ = (
("static_field0", "B", 123),
def _dissect(self, buf):
for i in range(11, 19):
self.tcbandwith.append(buf[i:i + 1])
for i in range(19, 27):
self.tsaassigment.append(buf[i:i + 1])
return len(self)
class DCBXRecommendation(pypacker.Packet):
__hdr__ = (
("type_len", "H", 65049), # type(127), length(25)
("oui_subtype", "I", 8438282), # OUI(00-80-C2), subtype(0x10)
("reserved", "B", 0),
("priority", "I", 0), # Field represents list of 8 items where one item = 4 bits
("tcbandwith", None, triggerlist.TriggerList),
("tsaassigment", None, triggerlist.TriggerList),
)
tlv_type = get_property_tlv_type()
tlv_len = get_property_tlv_len()
oui = get_property_tlv_oui()
subtype = get_property_tlv_subtype()
tcbandwith_list = get_property_to_convert_8_bytes_to_list("tcbandwith")
tsaassigment_list = get_property_to_convert_8_bytes_to_list("tsaassigment")
priority_list = get_property_to_convert_4_bytes_to_list("priority")
def _dissect(self, buf):
# start from TLV_HEADER_LEN + ORG_SPEC_HEADER_LEN +
# 1 byte(reserved) + 4 bytes(priority)
for i in range(11, 19):
self.tcbandwith.append(buf[i:i + 1])