Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if options.IPfile is not None:
parseIPlistLocation(options.IPfile)
sys.exit(0)
if options.pcapfile is not None:
if (options.srcIP or options.dstIP) == False:
print "choose -s or -d"
sys.exit(0)
f = open(options.pcapfile)
try:
pcap = dpkt.pcapng.Reader(f)
except:
print "it is not pcapng format..."
f.close()
f = open(options.pcapfile)
pcap = dpkt.pcap.Reader(f)
printPcap(pcap, options.srcIP, options.dstIP)
parseIPlistLocation("./out_IP.txt")
f.close()
sys.exit(0)
from datetime import datetime
from collections import OrderedDict
import dpkt, socket, struct, sys, S7Packet
from time import sleep
"""
main functionality
"""
if(len(sys.argv)>1):
f = open(sys.argv[1], 'r')
else:
print("Please enter the .pcap file to parse!")
sys.exit(0)
pcap = dpkt.pcap.Reader(f)
"""
pcap = pcap.pcap()
capture_interface = 'eth0'
pcap = pcap.pcap(name=capture_interface)
"""
PLC_ADDRESS = '192.168.2.101'
f_out = open('config_file_information.txt', 'w')
f_out.write('TITLE: config_file_information\n\n')
f_out.write("ATTRIBUTE: functionCode\n")
f_out.write("ATTRIBUTE: packetsPerThirtySecsOfThisType\n")
f_out.write("ATTRIBUTE: 5MinuteTimeInterval\n")
def collect_records(from_file):
records = []
buffer = StringIO()
for ts, buf in dpkt.pcap.Reader(open(from_file)):
eth = dpkt.ethernet.Ethernet(buf)
data = eth.data.tcp.data.split("\r\n")[-1]
if data.startswith("TNBU") and buffer.tell() != 0:
records.append(buffer.getvalue())
buffer.seek(0)
buffer.write(data)
else:
buffer.write(data)
return records
def process_file(pcap_filename, mdp_parser, secdef, pretty_print, print_data, skip_fields):
with gzip.open(pcap_filename, 'rb') if pcap_filename.endswith('.gz') else open(pcap_filename, 'rb') as pcap:
pcap_reader = dpkt.pcap.Reader(pcap)
packet_number = 0
for ts, packet in pcap_reader:
packet_number += 1
ethernet = dpkt.ethernet.Ethernet(packet)
if ethernet.type == dpkt.ethernet.ETH_TYPE_IP:
ip = ethernet.data
if ip.p == dpkt.ip.IP_PROTO_UDP:
udp = ip.data
try:
timestamp = datetime.fromtimestamp(ts)
mdp.decode.decode_packet(mdp_parser, timestamp, udp.data, skip_fields,
print_data, pretty_print, secdef, packet_number)
except Exception as e:
print('Error parsing packet #{} - {}'.format(packet_number, e))
def main(f):
"""This function decodes a packet capture file f and breaks it up into tcp connections"""
pcap = dpkt.pcap.Reader(f)
print "counter\tsrc prt\tdst prt\tflags"
packet_cntr = 0
connection_table = {}
for ts, buf in pcap:
packet_cntr += 1
eth = dpkt.ethernet.Ethernet(buf)
# Also, this changes a little bit with IPv6. To tell the difference between IPv4 and IPv6, you have to look
# at the ethertype field, which is given by http://www.iana.org/assignments/ethernet-numbers. IPv4 is 0x800 or 2048
# and IPv6 is 0x86DD or 34525
# This is simplistic - IP packets can be fragmented. Also, this only works for IPv4. IPv6 has a different Ethertype
if eth.type != dpkt.ethernet.ETH_TYPE_IP :
continue
elif eth.type == dpkt.ethernet.ETH_TYPE_IP6 :
pass
ip = eth.data
def _get_raw_flows(apps: dict, filename: str, max_packets_per_flow: typing.Optional[int] = None) -> dict:
""" transform packets to flows for each app """
flows = dict.fromkeys(apps)
packet_counter = dict.fromkeys(apps)
client_tuple = dict.fromkeys(apps.keys())
with open(filename, "rb") as pcap_file:
for pkt_number, (timestamp, ip, seg) in enumerate(_filter_packets(dpkt.pcap.Reader(pcap_file))):
if isinstance(seg, dpkt.tcp.TCP):
transp_proto = "tcp"
elif isinstance(seg, dpkt.udp.UDP):
transp_proto = "udp"
else:
raise UnknownProtocol(seg.__class__.__name__)
source = Endpoint(ip_to_string(ip.src), seg.sport)
destination = Endpoint(ip_to_string(ip.dst), seg.dport)
connection = Connection(transp_proto, frozenset([source, destination]))
assert connection in client_tuple
# if client tuple is empty, then no packets from the flow has been seen so far
if not client_tuple[connection]:
client_tuple[connection] = source
flows[connection] = np.zeros((max_packets_per_flow, 8))
def pkt_ctr(pcap_dir, file_name, pkt_type):
udp_ctr = 0
tcp_ctr = 0
other_ctr = 0
total_ctr = 0
filepath = pcap_dir + "/" + file_name
f = open(filepath)
for ts, buf in dpkt.pcap.Reader(file(filepath, "rb")):
eth = dpkt.ethernet.Ethernet(buf)
total_ctr += 1
if eth.type == dpkt.ethernet.ETH_TYPE_IP: # 2048
ip = eth.data
if ip.p == dpkt.ip.IP_PROTO_UDP: # 17
udp_ctr += 1
if ip.p == dpkt.ip.IP_PROTO_TCP: # 6
tcp_ctr += 1
else:
other_ctr += 1
# Returns the number of packets depending on the type
if pkt_type == 'total':
return total_ctr
elif pkt_type == 'tcp':
def _get_dpkt_handle(self):
f = open(self._pcap_file)
pcap = dpkt.pcap.Reader(f)
return pcap