Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_check_invalid_ip(self):
"""
Test check_invalid_ip.
"""
pkt = scapy.IP(src="1.1.1.1")
result = self.pf1.check_invalid_ip(pkt)
self.assertEqual(result, 1)
def ICMPPacketTooBig(version, srcaddr, dstaddr, packet):
if version == 4:
return ("ICMPv4 fragmentation needed",
scapy.IP(src=srcaddr, dst=dstaddr, proto=1) /
scapy.ICMPerror(type=3, code=4, unused=1280) / str(packet)[:64])
else:
udp = packet.getlayer("UDP")
udp.payload = str(udp.payload)[:1280-40-8]
return ("ICMPv6 Packet Too Big",
scapy.IPv6(src=srcaddr, dst=dstaddr) /
scapy.ICMPv6PacketTooBig() / str(packet)[:1232])
def test_options(value, test_type):
"""
Tests tampering options
"""
if test_type == "direct":
tamper = actions.tamper.TamperAction(None, field="options-%s" % value.lower(), tamper_type="corrupt", tamper_value=bytes([12]))
else:
tamper = actions.tamper.TamperAction(None)
assert tamper.parse("TCP:options-%s:replace:" % value.lower(), logger)
assert tamper.parse("TCP:options-%s:corrupt" % value.lower(), logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
tamper.run(packet, logger)
opts_dict_lookup = value.lower().replace(" ", "_")
for optname, optval in packet["TCP"].options:
if optname == value:
break
elif optname == actions.layer.TCPLayer.options_names[opts_dict_lookup]:
break
else:
pytest.fail("Failed to find %s in options" % value)
assert len(packet["TCP"].options) == 1
raw_p = bytes(packet)
assert raw_p, "options broke scapy bytes"
p2 = actions.packet.Packet(IP(bytes(raw_p)))
assert p2.haslayer("IP")
assert p2.haslayer("TCP")
def main():
if len(sys.argv)<4:
print 'pass 2 arguments: "" '
exit(1)
addr = socket.gethostbyname(sys.argv[1])
iface = get_if()
pkt = Ether(src=get_if_hwaddr(iface), dst="ff:ff:ff:ff:ff:ff") / IP(dst=addr, tos=1) / UDP(dport=4321, sport=1234) / sys.argv[2]
pkt.show2()
#hexdump(pkt)
try:
for i in range(int(sys.argv[3])):
sendp(pkt, iface=iface)
sleep(1)
except KeyboardInterrupt:
raise
def process(packet):
scapy_packet = scapy.IP(packet.get_payload())
if scapy_packet.haslayer(scapy.DNSRR):
qname = scapy_packet[scapy.DNSQR].qname
if "google.com" in qname:
answer = scapy.DNSRR(rrname=qname,rdata="127.0.0.1")
scapy_packet[scapy.DNS].an = answer
scapy_packet[scapy.DNS].ancount = 1
del scapy_packet[scapy.IP].len
del scapy_packet[scapy.IP].chksum
del scapy_packet[scapy.UDP].len
del scapy_packet[scapy.UDP].chksum
packet.set_payload(str(scapy_packet))
print "Spoofing Target",qname
packet.accept()
def udp4(self):
# UDP port 9 + WOL Payload
return sendp([Ether(dst=self.ETH_BROADCAST) / IP(dst='255.255.255.255') / UDP(sport=32767, dport=9)/ Raw(load=self.wol_payload)], iface=self.intf)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('ip_addr', type=str, help="The destination IP address to use")
parser.add_argument('message', type=str, help="The message to include in packet")
parser.add_argument('--dst_id', type=int, default=None, help='The myTunnel dst_id to use, if unspecified then myTunnel header will not be included in packet')
args = parser.parse_args()
addr = socket.gethostbyname(args.ip_addr)
dst_id = args.dst_id
iface = get_if()
if (dst_id is not None):
print "sending on interface {} to dst_id {}".format(iface, str(dst_id))
pkt = Ether(src=get_if_hwaddr(iface), dst='ff:ff:ff:ff:ff:ff')
pkt = pkt / MyTunnel(dst_id=dst_id) / IP(dst=addr) / args.message
else:
print "sending on interface {} to IP addr {}".format(iface, str(addr))
pkt = Ether(src=get_if_hwaddr(iface), dst='ff:ff:ff:ff:ff:ff')
pkt = pkt / IP(dst=addr) / TCP(dport=1234, sport=random.randint(49152,65535)) / args.message
pkt.show2()
# hexdump(pkt)
# print "len(pkt) = ", len(pkt)
sendp(pkt, iface=iface, verbose=False)
def icmp_ping(ip, mac=None):
if ip is None:
return (None, None)
if mac is None:
ans, unans = srp(Ether()/IP(dst=ip)/ICMP(), timeout=2)
else:
ans, unans = srp(Ether(dst=mac)/IP(dst=ip)/ICMP(), timeout=2)
if verbose:
print "icmp_ping: ", ip, " ans = ", len(ans), ", unans = ", len(unans)
sys.stdout.flush()
return ans, unans
def traceroute_discovery(self):
external_ip = requests.get("http://canhazip.com").text # getting external ip, to determine if cloud cluster
cloud = self.get_cloud(external_ip)
logging.getLogger("scapy.runtime").setLevel(logging.ERROR) # disables scapy's warnings
from scapy.all import ICMP, IP, Ether, srp1
node_internal_ip = srp1(Ether() / IP(dst="google.com" , ttl=1) / ICMP(), verbose=0)[IP].src
for ip in self.generate_subnet(ip=node_internal_ip, sn="24"):
self.publish_event(NewHostEvent(host=ip, cloud=external_ip))
def _ip_packet_from_bytes(self, bytez):
if len(bytez) <= 0:
return None
try:
eframe = Ether(bytez)
ipkt = eframe[IP]
except:
return None
return ipkt