diff --git a/README.md b/README.md index a16c715b0eb18602062a771f39cc24dd16b01794..dccc6e447967f3e41f74efdf20f6e455d123ad45 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ [](https://travis-ci.org/secdev/scapy)  +[](https://codecov.io/gh/secdev/scapy) Scapy is a powerful Python-based interactive packet manipulation program and library. diff --git a/scapy/layers/inet.py b/scapy/layers/inet.py index 760e0675de19a1d79ee84f13ef19c45effc0362c..54fffc481adb5c3627c060080f6884ba6ed99ecd 100644 --- a/scapy/layers/inet.py +++ b/scapy/layers/inet.py @@ -42,7 +42,7 @@ class IPTools(object): t.sort() return t[t.index(self.ttl)+1] def hops(self): - return self.ottl()-self.ttl-1 + return self.ottl() - self.ttl _ip_options_names = { 0: "end_of_list", diff --git a/test/regression.uts b/test/regression.uts index 41c640a052f88744958fa0db5aade328c5bf3700..29c3e01a284f08ba7ab8a7a94f83579b74f46a69 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -622,6 +622,8 @@ assert(x.PDU.varbindlist[2].value == 1) ~ netaccess IP ICMP x=sr1(IP(dst="www.google.com")/ICMP(),timeout=3) x +assert x[IP].ottl() in [32, 64, 128, 255] +assert 0 <= x[IP].hops() <= 126 x is not None and ICMP in x and x[ICMP].type == 0 = DNS request @@ -631,6 +633,10 @@ x is not None and ICMP in x and x[ICMP].type == 0 dns_ans = sr1(IP(dst="resolver1.opendns.com")/UDP()/DNS(rd=1,qd=DNSQR(qname="www.slashdot.com")),timeout=5) DNS in dns_ans += Whois request +~ netaccess IP +IP(src="8.8.8.8").whois() + ############ ############ @@ -688,6 +694,55 @@ assert(DNS in IP(str(dns_ans2))) conf.route.route("0.0.0.0")[2] arping(_+"/24") += send() and sniff() +~ netaccess +import time +import os +def _send_or_sniff(pkt, timeout, flt, pid, fork, t_other=None): + assert pid != -1 + if pid == 0: + time.sleep(1) + (sendp if isinstance(pkt, (Ether, Dot3)) else send)(pkt) + if fork: + os._exit(0) + else: + return + else: + spkt = str(pkt) + pkts = sniff( + timeout=timeout, filter=flt, + stop_filter=lambda p: pkt.__class__ in p and str(p[pkt.__class__]) == spkt + ) + if fork: + os.waitpid(pid, 0) + else: + t_other.join() + assert str(pkt) in (str(p[pkt.__class__]) for p in pkts if pkt.__class__ in p) + +def send_and_sniff(pkt, timeout=2, flt=None): + """Send a packet, sniff, and check the packet has been seen""" + if hasattr(os, "fork"): + _send_or_sniff(pkt, timeout, flt, os.fork(), True) + else: + from threading import Thread + def run_function(pkt, timeout, flt, pid, thread, results): + _send_or_sniff(pkt, timeout, flt, pid, False, thread) + results.put(True) + results = Queue.Queue() + t_parent = Thread(target=run_function, args=(pkt, timeout, flt, 0, None, results)) + t_child = Thread(target=run_function, args=(pkt, timeout, flt, 1, t_parent, results)) + t_parent.start() + t_child.start() + t_parent.join() + t_child.join() + assert results.qsize() >= 2 + while not results.empty(): + assert results.get() + +send_and_sniff(IP(dst="secdev.org")/ICMP()) +send_and_sniff(IP(dst="secdev.org")/ICMP(), flt="icmp") +send_and_sniff(Ether()/IP(dst="secdev.org")/ICMP()) + ############ ############ @@ -4828,6 +4883,46 @@ assert len(defrags) == 1 * which should be the same as pkt reconstructed assert defrags[0] == IP(str(pkt)) += Packet().fragment() +payloadlen, fragsize = 100, 8 +assert fragsize % 8 == 0 +fragcount = (payloadlen / fragsize) + bool(payloadlen % fragsize) +* create the packet +pkt = IP() / ("X" * payloadlen) +* create the fragments +frags = pkt.fragment(fragsize) +* count the fragments +assert len(frags) == fragcount +* each fragment except the last one should have MF set +assert all(p.flags == 1 for p in frags[:-1]) +assert frags[-1].flags == 0 +* each fragment except the last one should have a payload of fragsize bytes +assert all(len(p.payload) == 8 for p in frags[:-1]) +assert len(frags[-1].payload) == ((payloadlen % fragsize) or fragsize) + += Packet().fragment() and overloaded_fields +pkt1 = Ether() / IP() / UDP() +pkt2 = pkt1.fragment()[0] +pkt3 = pkt2.__class__(str(pkt2)) +assert pkt1[IP].proto == pkt2[IP].proto == pkt3[IP].proto + += Packet().fragment() already fragmented packets +payloadlen = 1480 * 3 +ffrags = (IP() / ("X" * payloadlen)).fragment(1480) +ffrags = reduce(lambda x, y: x + y, (pkt.fragment(1400) for pkt in ffrags)) +len(ffrags) == 6 +* each fragment except the last one should have MF set +assert all(p.flags == 1 for p in ffrags[:-1]) +assert ffrags[-1].flags == 0 +* fragment offset should be well computed +plen = 0 +for p in ffrags: + assert p.frag == plen / 8 + plen += len(p.payload) + +assert plen == payloadlen + + ############ ############ + TCP/IP tests