Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
lldp_increment_step, lldp_increment_count = self._check_increment(lldp_sa_increment, "lldp_sa_increment")
udf_id = len(self.udf_dict) + 1
lldp_tlv = hex_data[28:34]
lldp_mac_initval = hex_data[38:46]
if lldp_tlv == '020704':
self.udf_dict["lldp_sa"] = {"udf_id": udf_id, "initval": lldp_mac_initval, "offset": 19, "counter_type": 'c32', "step": lldp_increment_step,
"count": lldp_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["lldp_sa"]))
else:
raise TypeError("lldp tlv is not '\\x02\\x07\\x04'.")
# Set ether type increment
if eth_type_increment is not None:
eth_type_increment_step, eth_type_increment_count = self._check_increment(eth_type_increment, "eth_type_increment")
udf_id = len(self.udf_dict) + 1
eth_type = packet[pypacker.layer12.ethernet.Ethernet].type
eth_type_initval = str(hex(eth_type))[2:].zfill(4)
self.udf_dict["eth_type"] = {"udf_id": udf_id, "initval": eth_type_initval, "offset": 12, "counter_type": 'c16', "step": eth_type_increment_step,
"count": eth_type_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["eth_type"]))
# Set dhcp siaddr increment
if dhcp_si_increment is not None:
dhcp_si_increment_step, dhcp_si_increment_count = self._check_increment(dhcp_si_increment, "dhcp_si_increment")
udf_id = len(self.udf_dict) + 1
dhcp_si = packet.getlayer(pypacker.BOOTP).siaddr.split('.') # pylint: disable=no-member
dhcp_si_initval = str(hex(int(dhcp_si[0])))[2:].zfill(2) + str(hex(int(dhcp_si[1])))[2:].zfill(2) + \
str(hex(int(dhcp_si[2])))[2:].zfill(2) + str(hex(int(dhcp_si[3])))[2:].zfill(2)
if packet.vlan:
offset = 66
else:
offset = 62
for field, value in layer_dict[layer_name].items():
if getattr(packet_layer, '{0}_s'.format(field), None):
setattr(packet_layer, '{0}_s'.format(field), value)
else:
setattr(packet_layer, field, value)
return packet_layer
# Converting packet_definition to pypacker.Packet.
packet = reduce(lambda a, b: a + b, map(_pypacker_layer, packet_definition))
# Handle Dot1Q layer and IP options in packet_definition
first = operator.itemgetter(0)
for layer in packet_definition:
with suppress(KeyError):
dot1q_definition = layer["Dot1Q"]
pypacker_vlan = struct.pack("!H", pypacker.layer12.ethernet.ETH_TYPE_8021Q) + \
struct.pack("!H", dot1q_definition["vlan"])
packet.vlan = pypacker_vlan
with suppress(KeyError):
opts = layer["IP"]["opts"]
packet.ip.opts = [ip.IPOptMulti(**opt) for opt in opts]
with suppress(KeyError):
tlvlist_def = layer["LLDP"]["tlvlist"]
packet.lldp.tlvlist = []
for tlv_dict in tlvlist_def:
tlv_type = next(iter(tlv_dict))
packet.lldp.tlvlist.append(_pypacker_layer(
{tlv_type: OrderedDict(sorted(tlv_dict[tlv_type].items(), key=first))},
))
with suppress(KeyError):
# Inner list of DCBXApplicationPriorityTable
apppriotables = tlv_dict['DCBXApplicationPriority']['apppriotable']
def test_create_eth(self):
print_header("Keyword creation")
eth = ethernet.Ethernet()
# print(str(eth))
self.assertEqual(eth.bin(), b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x08\x00")
eth = ethernet.Ethernet(dst=b"\x00\x01\x02\x03\x04\x05", src=b"\x06\x07\x08\x09\x0A\x0B", type=2048)
print(str(eth))
print(eth.bin())
self.assertEqual(eth.bin(), b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x08\x00")
# test packet creation (default, keyword, bytes + keyword)
bts = get_pcap("tests/packets_ether.pcap")[0]
eth = ethernet.Ethernet()
self.assertEqual(eth.src_s, "FF:FF:FF:FF:FF:FF")
eth = ethernet.Ethernet(src=b"\xAA\xAA\xAA\xAA\xAA\xAA")
self.assertEqual(eth.src_s, "AA:AA:AA:AA:AA:AA")
eth = ethernet.Ethernet(dst=b"\xAA\xAA\xAA\xAA\xAA\xAA")
self.assertEqual(eth.dst_s, "AA:AA:AA:AA:AA:AA")
print_header("DNS")
packet_bytes = get_pcap("tests/packets_dns.pcap")
print()
print(">>> DNS 1")
dns1 = ethernet.Ethernet(packet_bytes[0])[dns.DNS]
print(dns1.bin())
print(packet_bytes[0][42:])
self.assertEqual(dns1.bin(), packet_bytes[0][42:])
self.assertEqual(len(dns1.queries), 1)
self.assertEqual(len(dns1.answers), 0)
self.assertEqual(len(dns1.auths), 0)
self.assertEqual(len(dns1.addrecords), 1)
print()
print(">>> DNS 2")
dns2 = ethernet.Ethernet(packet_bytes[1])[dns.DNS]
print("---> checking bin")
print(dns2.queries[0]._name_format)
self.assertEqual(dns2.bin(), packet_bytes[1][42:])
print("---> checking repr")
print(dns2.queries[0]._name_format)
print("%s" % dns2)
self.assertEqual(len(dns2.queries), 1)
self.assertEqual(len(dns2.answers), 3)
self.assertEqual(len(dns2.auths), 0)
self.assertEqual(len(dns2.addrecords), 1)
print()
print(">>> DNS 3")
print("---> checking bin")
dns3 = ethernet.Ethernet(packet_bytes[2])[dns.DNS]
self.assertEqual(dns3.bin(), packet_bytes[2][42:])
print("---> checking str")
def test_dissectfail(self):
print_header("Dissectfail")
tcp_bytes_fail = b"\x00" * 16
pkt1 = ethernet.Ethernet() + ip.IP() + tcp_bytes_fail
pkt1_bts = pkt1.bin()
self.assertTrue(pkt1_bts.endswith(tcp_bytes_fail))
pkt1 = ethernet.Ethernet(pkt1_bts)
pkt_tcp = pkt1.upper_layer.upper_layer
print("TCP for dissectfail #1: %r" % pkt_tcp)
self.assertIsNone(pkt_tcp)
pkt1 = ethernet.Ethernet(pkt1_bts)
pkt_tcp = pkt1.ip.tcp
print("TCP for dissectfail #2: %r" % pkt_tcp)
self.assertIsNone(pkt_tcp)
# retrieving body type on failed dissect only works 1st time (returns None)
# 2nd time raises Exception
# pkt_tcp = pkt1.ip.tcp
self.assertRaises(Exception, lambda: pkt1.ip.tcp)
ip_bytes_orig = pkt1_bts[-len(tcp_bytes_fail):]
ip_bytes = pkt1.ip.body_bytes
self.assertEqual(ip_bytes, ip_bytes_orig)
def test_eth_vlan_tags(self):
print_header("ETHERNET + VLAN Tags")
# Ethernet + VLAN tag, type 0x8100 ) + ARP
# VALN tag: type=0x8100, prio=0, cfi=0, vid=5
s1 = b"\x00\x00\x00333\x00\x00 \x00\x10\x02\x81\x00\x00\x05\x08\x06\x00\x01"\
b"\x08\x00\x06\x04\x00\x01\x00\x00 \x00\x10\x02\x01\x01\x01\x01\x00\x00"\
b"\x00\x00\x00\x00\x01\x01\x01\x02"
eth1 = ethernet.Ethernet(s1)
# parsing
self.assertEqual(eth1.bin(), s1)
self.assertEqual(eth1.dst_s, "00:00:00:33:33:33")
self.assertEqual(eth1.src_s, "00:00:20:00:10:02")
self.assertEqual(eth1.type, ethernet.ETH_TYPE_ARP)
self.assertEqual(len(eth1.vlan), 1)
self.assertEqual(eth1.vlan[0].type, ethernet.ETH_TYPE_8021Q)
self.assertEqual(eth1.vlan[0].prio, 0)
self.assertEqual(eth1.vlan[0].cfi, 0)
self.assertEqual(eth1.vlan[0].vid, 5)
self.assertEqual(type(eth1.arp).__name__, "ARP")
# Ethernet + QinQ(double tags, type 0x8100 ) + IP
# Outer tag: type=0x81A8, prio=1, cfi=1, vid=5
# Inner tag: type=0x8100, prio=2, cfi=0, vid=99
s = b"\x00\x00\x00\x00\x00\xaa\x00\x00\x00\x00\x00\xbb\x88\xA80\x05\x81\x00@c"\
b"\x08\x00E\x00\x00&\x00\x01\x00\x00@\x00|\xd5\x7f\x00\x00\x01\x7f\x00\x00"\
b"\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
eth1 = ethernet.Ethernet(s)
# parsing
self.assertEqual(eth1.bin(), s)
def sr(self, packet_send, max_packets_recv=1, pfilter=None, lowest_layer=ethernet.Ethernet):
"""
Send a packet and receive answer packets. This will use information retrieved
from direction() to retrieve answer packets. This is not 100% reliable as
it primarily depends on source/destination data of layers like Ethernet, IP etc.
Raises socket.timeout on timeout.
packet_send -- pypacker packet to be sent
max_packets_recv -- max packets to be received
pfilter -- filter as lambda function to match packets to be retrieved,
return True to accept a specific packet.
Set to None to accept everything.
lowest_layer -- packet class to be used to create new packets
return -- packets receives
"""
from pypacker import psocket
from pypacker.layer12 import ethernet
from pypacker.layer3 import ip, icmp
# send ICMP request
psock = psocket.SocketHndl(iface_name="wlan0")
icmpreq = ethernet.Ethernet(src_s="20:16:d8:ef:1f:49", dst_s="24:65:11:85:e9:00", type=ethernet.ETH_TYPE_IP) +\
ip.IP(p=ip.IP_PROTO_ICMP, src_s="192.168.178.27", dst_s="192.168.178.24") +\
icmp.ICMP(type=8) +\
icmp.ICMP.Echo(id=1, ts=123456789, body_bytes=b"12345678901234567890")
psock.send(icmpreq.bin())
pcapwriter = ppcap.Writer(filename="parsefail.pcap")
raw_bytes = b""
cnt = 0
time_start = time.time()
for bts in sockhndl:
if cnt % 1000 == 0:
print("%d pps" % (cnt / (time.time() - time_start)))
time_start = time.time()
cnt = 0
cnt += 1
try:
#pkt = radiotap.Radiotap(bts)
pkt = ethernet.Ethernet(bts)
# print(pkt)
# print(pkt.body_handler)
"""
if pkt[ip.IP] is not None:
tmp = pkt[ip.IP].src_s
tmp = pkt[ip.IP].dst_s
tmp = pkt[ip.IP].body_handler.body_handler
"""
pkt.dissect_full()
raw_bytes = pkt.bin()
#print("%r" % pkt)
# pcapwriter.write(raw_bytes)
for layer in pkt:
if layer.dissect_error:
from pypacker.layer3 import ip, icmp
from pypacker.layer4 import udp, tcp
wlan_monitor_if = "wlan1"
#
# create packets using raw bytes
#
BYTES_ETH_IP_ICMPREQ = b"\x52\x54\x00\x12\x35\x02\x08\x00\x27\xa9\x93\x9e\x08\x00" +\
b"\x45\x00\x00\x54\x00\x00\x40\x00\x40\x01\x54\xc1\x0a\x00" +\
b"\x02\x0f\xad\xc2\x2c\x17\x08\x00\xec\x66\x09\xb1\x00\x01" +\
b"\xd0\xd5\x18\x51\x28\xbd\x05\x00\x08\x09\x0a\x0b\x0c\x0d" +\
b"\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b" +\
b"\x1c\x1d\x1e\x1f\x20\x21\x22\x23\x24\x25\x26\x27\x28\x29" +\
b"\x2a\x2b\x2c\x2d\x2e\x2f\x30\x31\x32\x33\x34\x35\x36\x37"
packet1 = ethernet.Ethernet(BYTES_ETH_IP_ICMPREQ)
print("packet contents: %s" % packet1)
print("packet as bytes: %s" % packet1.bin())
# create custom packets and concat them
packet1 = ethernet.Ethernet(dst_s="aa:bb:cc:dd:ee:ff", src_s="ff:ee:dd:cc:bb:aa") +\
ip.IP(src_s="192.168.0.1", dst_s="192.168.0.2") +\
icmp.ICMP(type=8) +\
icmp.ICMP.Echo(id=1, ts=123456789, body_bytes=b"12345678901234567890")
print("custom packet: %s" % packet1)
# change dynamic header
packet1.ip.opts.append(ip.IPOptMulti(type=ip.IP_OPT_TS, len=3, body_bytes=b"\x00\x11\x22"))
# change dynamic header even more
# opts = [(ip.IP_OPT_TR, b"\x33\x44\x55"), (ip.IP_OPT_NOP, b"")]
opts = [ip.IPOptMulti(type=ip.IP_OPT_TR,
len=3,