Skip to content
Snippets Groups Projects
regression.uts 351 KiB
Newer Older
Robin Jarry's avatar
Robin Jarry committed
p /= VXLAN(flags=0x8, vni=42) / Ether() / IP()
p = Ether(str(p))
assert(p[VXLAN].reserved1 == 0x0)
assert(p[VXLAN].gpid is None)
assert(p[Ether:2].type == 0x800)
Robin Jarry's avatar
Robin Jarry committed

= Build a VXLAN packet with group policy ID = 42
p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22")
p /= IP(src="1.1.1.1", dst="2.2.2.2") / UDP(sport=1111)
p /= VXLAN(flags=0x88, gpid=42, vni=42) / Ether() / IP()
p = Ether(str(p))
assert(p[VXLAN].gpid == 42)
assert(p[VXLAN].reserved1 is None)
assert(p[Ether:2].type == 0x800)
Florian Maury's avatar
Florian Maury committed
+ Tests of SSLStreamContext

= Test with recv() calls that return exact packet-length strings
~ sslstreamsocket

import socket
class MockSocket(object):
    def __init__(self):
gpotter2's avatar
gpotter2 committed
        self.l = [ b'\x00\x00\x00\x01', b'\x00\x00\x00\x02', b'\x00\x00\x00\x03' ]
Florian Maury's avatar
Florian Maury committed
    def recv(self, x):
        if len(self.l) == 0:
            raise socket.error(100, 'EOF')
        return self.l.pop(0)

class TestPacket(Packet):
    name = 'TestPacket'
    fields_desc = [
        IntField('data', 0)
    ]
    def guess_payload_class(self, p):
        return conf.padding_layer

s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)

p = ss.recv()
assert(p.data == 1)
p = ss.recv()
assert(p.data == 2)
p = ss.recv()
assert(p.data == 3)
try:
    ss.recv()
    ret = False
except socket.error:
    ret = True

assert(ret)

= Test with recv() calls that return twice as much data as the exact packet-length
~ sslstreamsocket

import socket
class MockSocket(object):
    def __init__(self):
gpotter2's avatar
gpotter2 committed
        self.l = [ b'\x00\x00\x00\x01\x00\x00\x00\x02', b'\x00\x00\x00\x03\x00\x00\x00\x04' ]
Florian Maury's avatar
Florian Maury committed
    def recv(self, x):
        if len(self.l) == 0:
            raise socket.error(100, 'EOF')
        return self.l.pop(0)

class TestPacket(Packet):
    name = 'TestPacket'
    fields_desc = [
        IntField('data', 0)
    ]
    def guess_payload_class(self, p):
        return conf.padding_layer

s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)

p = ss.recv()
assert(p.data == 1)
p = ss.recv()
assert(p.data == 2)
p = ss.recv()
assert(p.data == 3)
p = ss.recv()
assert(p.data == 4)
try:
    ss.recv()
    ret = False
except socket.error:
    ret = True

assert(ret)

= Test with recv() calls that return not enough data
~ sslstreamsocket

import socket
class MockSocket(object):
    def __init__(self):
gpotter2's avatar
gpotter2 committed
        self.l = [ b'\x00\x00', b'\x00\x01', b'\x00\x00\x00', b'\x02', b'\x00\x00', b'\x00', b'\x03' ]
Florian Maury's avatar
Florian Maury committed
    def recv(self, x):
        if len(self.l) == 0:
            raise socket.error(100, 'EOF')
        return self.l.pop(0)

class TestPacket(Packet):
    name = 'TestPacket'
    fields_desc = [
        IntField('data', 0)
    ]
    def guess_payload_class(self, p):
        return conf.padding_layer

s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)

try:
    p = ss.recv()
    ret = False
except:
    ret = True

assert(ret)
p = ss.recv()
assert(p.data == 1)
try:
    p = ss.recv()
    ret = False
except:
    ret = True

assert(ret)
p = ss.recv()
assert(p.data == 2)
try:
    p = ss.recv()
    ret = False
except:
    ret = True

assert(ret)
try:
    p = ss.recv()
    ret = False
except:
    ret = True

assert(ret)
p = ss.recv()
assert(p.data == 3)
+ Test correct conversion from binary to string of IPv6 addresses

= IPv6 bin to string conversion
from scapy.pton_ntop import _inet6_ntop, inet_ntop
for binfrm, address in [
gpotter2's avatar
gpotter2 committed
        (b'\x00' * 16, '::'),
        (b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88',
         '1111:2222:3333:4444:5555:6666:7777:8888'),
gpotter2's avatar
gpotter2 committed
        (b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x00\x00\x00\x00\x00\x00',
         '1111:2222:3333:4444:5555::'),
gpotter2's avatar
gpotter2 committed
        (b'\x00\x00\x00\x00\x00\x00\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88',
         '::4444:5555:6666:7777:8888'),
gpotter2's avatar
gpotter2 committed
        (b'\x00\x00\x00\x00\x33\x33\x44\x44\x00\x00\x00\x00\x00\x00\x88\x88',
gpotter2's avatar
gpotter2 committed
        (b'\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
gpotter2's avatar
gpotter2 committed
        (b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01',
gpotter2's avatar
gpotter2 committed
        (b'\x11\x11\x00\x00\x00\x00\x44\x44\x00\x00\x00\x00\x77\x77\x88\x88',
         '1111::4444:0:0:7777:8888'),
gpotter2's avatar
gpotter2 committed
        (b'\x10\x00\x02\x00\x00\x30\x00\x04\x00\x05\x00\x60\x07\x00\x80\x00',
         '1000:200:30:4:5:60:700:8000'),
]:
    addr1 = inet_ntop(socket.AF_INET6, binfrm)
    addr2 = _inet6_ntop(binfrm)
    assert address == addr1 == addr2

= IPv6 bin to string conversion - Zero-block of length 1
gpotter2's avatar
gpotter2 committed
binfrm = b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x00\x00\x88\x88'
addr1, addr2 = inet_ntop(socket.AF_INET6, binfrm), _inet6_ntop(binfrm)
# On Mac OS socket.inet_ntop is not fully compliant with RFC 5952 and
# shortens the single zero block to '::'. This is a valid IPv6 address
# representation anyway.
assert(addr1 in ['1111:2222:3333:4444:5555:6666:0:8888',
                 '1111:2222:3333:4444:5555:6666::8888'])
assert(addr2 == '1111:2222:3333:4444:5555:6666:0:8888')

= IPv6 bin to string conversion - Illegal sizes
gpotter2's avatar
gpotter2 committed
for binfrm in ["\x00" * 15, b"\x00" * 17]:
    rc = False
    try:
        inet_ntop(socket.AF_INET6, binfrm)
    except Exception as exc1:
        rc = True
    assert rc
    try:
        _inet6_ntop(binfrm)
    except Exception as exc2:
        rc = isinstance(exc2, type(exc1))
    assert rc


############
############
+ VRRP tests

= VRRP - build
s = str(IP()/VRRP())
gpotter2's avatar
gpotter2 committed
s == b'E\x00\x00$\x00\x01\x00\x00@p|g\x7f\x00\x00\x01\x7f\x00\x00\x01!\x01d\x00\x00\x01z\xfd\x00\x00\x00\x00\x00\x00\x00\x00'

= VRRP - dissection
p = IP(s)
VRRP in p and p[VRRP].chksum == 0x7afd


############
############
+ L2TP tests

= L2TP - build
s = str(IP()/UDP()/L2TP())
gpotter2's avatar
gpotter2 committed
s == b'E\x00\x00*\x00\x01\x00\x00@\x11|\xc0\x7f\x00\x00\x01\x7f\x00\x00\x01\x06\xa5\x06\xa5\x00\x16\xf4e\x00\x02\x00\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'

= L2TP - dissection
p = IP(s)
L2TP in p and p[L2TP].len == 14 and p.tunnel_id == 0 and p[UDP].chksum == 0xf465


############
############
+ HSRP tests

= HSRP - build & dissection
defaddr = conf.route.route('0.0.0.0')[1]
pkt = IP(str(IP()/UDP(dport=1985, sport=1985)/HSRP()/HSRPmd5()))
assert pkt[IP].dst == "224.0.0.2" and pkt[UDP].sport == pkt[UDP].dport == 1985
assert pkt[HSRP].opcode == 0 and pkt[HSRP].state == 16
assert pkt[HSRPmd5].type == 4 and pkt[HSRPmd5].sourceip == defaddr


############
############
+ RIP tests

= RIP - build
s = str(IP()/UDP(sport=520)/RIP()/RIPEntry()/RIPAuth(authtype=2, password="scapy"))
gpotter2's avatar
gpotter2 committed
s == b'E\x00\x00H\x00\x01\x00\x00@\x11|\xa2\x7f\x00\x00\x01\x7f\x00\x00\x01\x02\x08\x02\x08\x004\xae\x99\x01\x01\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\xff\xff\x00\x02scapy\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'

= RIP - dissection
p = IP(s)
RIPEntry in p and RIPAuth in p and p[RIPAuth].password.startswith("scapy")


############
############
+ Radius tests

= Radius - build
s = str(IP()/UDP(sport=1812)/Radius(authenticator="scapy")/RadiusAttribute(value="scapy"))
gpotter2's avatar
gpotter2 committed
s == b'E\x00\x007\x00\x01\x00\x00@\x11|\xb3\x7f\x00\x00\x01\x7f\x00\x00\x01\x07\x14\x07\x15\x00#U\xb2\x01\x00\x00\x1bscapy\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x07scapy'

= Radius - dissection
p = IP(s)
Radius in p and len(p[Radius].attributes) == 1 and p[Radius].attributes[0].value == "scapy"


############
############
+ Addresses generators

= Net

n1 = Net("192.168.0.0/31")
[ip for ip in n1] == ["192.168.0.0", "192.168.0.1"]

n2 = Net("192.168.0.*")
len([ip for ip in n2]) == 256

n3 = Net("192.168.0.1-5")
len([ip for ip in n3]) == 5

(n1 == n3) == False

(n3 in n2) == True

= OID

oid = OID("1.2.3.4.5.6-8")
len([ o for o in oid ]) == 3

= Net6

n1 = Net6("2001:db8::/127")
len([ip for ip in n1]) == 2


############
############
+ IPv6 helpers

= in6_getLocalUniquePrefix()

p = in6_getLocalUniquePrefix()
len(inet_pton(socket.AF_INET6, p)) == 16 and p.startswith("fd")

= Misc addresses manipulation functions

teredoAddrExtractInfo("2001:0:0a0b:0c0d:0028:f508:f508:08f5") == ("10.11.12.13", 40, "10.247.247.10", 2807)

ip6 = IP6Field("test", None)
ip6.i2repr("", "2001:0:0a0b:0c0d:0028:f508:f508:08f5") == "2001:0:0a0b:0c0d:0028:f508:f508:08f5 [Teredo srv: 10.11.12.13 cli: 10.247.247.10:2807]"
ip6.i2repr("", "2002:0102:0304::1") == "2002:0102:0304::1 [6to4 GW: 1.2.3.4]"

in6_iseui64("fe80::bae8:58ff:fed4:e5f6") == True

in6_isanycast("2001:db8::fdff:ffff:ffff:ff80") == True

a = inet_pton(socket.AF_INET6, "2001:db8::2807")
gpotter2's avatar
gpotter2 committed
in6_xor(a, a) == b"\x00" * 16

a = inet_pton(socket.AF_INET6, "fe80::bae8:58ff:fed4:e5f6")
r = inet_ntop(socket.AF_INET6, in6_getnsma(a)) 
r == "ff02::1:ffd4:e5f6"

in6_isllsnmaddr(r) == True

in6_isdocaddr("2001:db8::2807") == True

in6_isaddrllallnodes("ff02::1") == True

in6_isaddrllallservers("ff02::2") == True

= in6_getscope()

in6_getscope("2001:db8::2807") == IPV6_ADDR_GLOBAL
in6_getscope("fec0::2807") == IPV6_ADDR_SITELOCAL
in6_getscope("fe80::2807") == IPV6_ADDR_LINKLOCAL
in6_getscope("ff02::2807") == IPV6_ADDR_LINKLOCAL
in6_getscope("ff0e::2807") == IPV6_ADDR_GLOBAL
in6_getscope("ff05::2807") == IPV6_ADDR_SITELOCAL
in6_getscope("ff01::2807") == IPV6_ADDR_LOOPBACK
in6_getscope("::1") == IPV6_ADDR_LOOPBACK

from scapy.pton_ntop import _inet6_pton, inet_pton
import socket

ip6_bad_addrs = ["fe80::2e67:ef2d:7eca::ed8a",
                 "fe80:1234:abcd::192.168.40.12:abcd",
                 "fe80:1234:abcd::192.168.40",
                 "fe80:1234:abcd::192.168.400.12",
gpotter2's avatar
gpotter2 committed
                 "1234:5678:9abc:def0:1234:5678:9abc:def0:",
                 "1234:5678:9abc:def0:1234:5678:9abc:def0:1234"]
for ip6 in ip6_bad_addrs:
    rc = False
    try:
        res1 = inet_pton(socket.AF_INET6, ip6)
    except Exception as exc1:
        res2 = _inet6_pton(ip6)
    except Exception as exc2:
        rc = isinstance(exc2, type(exc1))
ip6_good_addrs = [("fe80:1234:abcd::192.168.40.12",
gpotter2's avatar
gpotter2 committed
                   b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\xc0\xa8(\x0c'),
                  ("fe80:1234:abcd::fe06",
gpotter2's avatar
gpotter2 committed
                   b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\x00\x00\xfe\x06'),
                  ("fe80::2e67:ef2d:7ece:ed8a",
gpotter2's avatar
gpotter2 committed
                   b'\xfe\x80\x00\x00\x00\x00\x00\x00.g\xef-~\xce\xed\x8a'),
                  ("::ffff",
                   b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'),
                  ("ffff::",
                   b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
                  ('::', b'\x00' * 16)]
for ip6, res in ip6_good_addrs:
    res1 = inet_pton(socket.AF_INET6, ip6)
    res2 = _inet6_pton(ip6)
    assert res == res1 == res2


############
############
+ Test Route class

= make_route()

r4 = Route()
tmp_route = r4.make_route(host="10.12.13.14")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561934, 4294967295L, '0.0.0.0')

tmp_route = r4.make_route(net="10.12.13.0/24")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561920, 4294967040L, '0.0.0.0')

= add() & delt()

r4 = Route()
len_r4 = len(r4.routes)
r4.add(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4 + 1
r4.delt(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4

= ifchange()

r4.add(net="192.168.1.0/24", gw="1.2.3.4", dev=get_dummy_interface())
r4.ifchange(get_dummy_interface(), "5.6.7.8")
r4.routes[-1][-1] == "5.6.7.8"

= ifdel()

r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4

= ifadd() & get_if_bcast()

r4 = Route()
len_r4 = len(r4.routes)

r4.ifadd(get_dummy_interface(), "1.2.3.4/24")
r4.get_if_bcast(get_dummy_interface()) == "1.2.3.255"
r4.ifdel(get_dummy_interface())
print len(r4.routes), len_r4
len(r4.routes) == len_r4


############
############
+ Random objects

= RandomEnumeration

re = RandomEnumeration(0, 7, seed=0x2807, forever=False)
[x for x in re] == [3, 4, 2, 5, 1, 6, 0, 7]

= RandIP6

random.seed(0x2807)
r6 = RandIP6()
str(r6) == "d279:1205:e445:5a9f:db28:efc9:afd7:f594"

random.seed(0x2807)
r6 = RandIP6("2001:db8::-") 
print r6 == "2001:0db8::9e9c"

r6 = RandIP6("2001:db8::*")
print r6 == "2001:0db8::9ccb"

= RandMAC

random.seed(0x2807)
rm = RandMAC() 
rm == "d2:12:e4:5a:db:ef"

rm = RandMAC("00:01:02:03:04:0-7")
rm == "00:01:02:03:04:05"


= RandOID

random.seed(0x2807)
ro = RandOID()
ro == "7.222.44.194.276.116.320.6.84.97.31.5.25.20.13.84.104.18"

ro = RandOID("1.2.3.*")
ro == "1.2.3.41"

ro = RandOID("1.2.3.0-28")
ro == "1.2.3.11"

= RandRegExp

random.seed(0x2807)
re = RandRegExp("[g-v]* @? [0-9]{3} . (g|v)")
print re == "vmuvr @ 906  g"

= Corrupted(Bytes|Bits)

random.seed(0x2807)
cb = CorruptedBytes("ABCDE", p=0.5)
sane(str(cb)) == ".BCD)"

cb = CorruptedBits("ABCDE", p=0.2)
sane(str(cb)) == "ECk@Y"

= Rand*

random.seed(0x2807)
rs = RandSingNum(-28, 07)
random.seed(0x2807)
rss = RandSingString()
rss == "CON:"

random.seed(0x2807)
rek = RandEnumKeys({'a': 1, 'b': 2})
random.seed(0x2807)
rts = RandTermString(4, "scapy")
sane(str(rts)) == "...[scapy"


############
############
+ Flags

= IP flags
~ IP

pkt = IP(flags="MF")
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
pkt.flags |= 'evil+MF'
pkt.flags &= 'DF+MF'
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'

pkt = IP(flags=3)
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt.flags = 6
assert not pkt.flags.MF
assert pkt.flags.DF
assert pkt.flags.evil
assert repr(pkt.flags) == '<Flag 6 (DF+evil)>'

= TCP flags
~ TCP

pkt = TCP(flags="SA")
assert pkt.flags == 18
assert pkt.flags.S
assert pkt.flags.A
assert pkt.flags.SA
assert not any(getattr(pkt.flags, f) for f in 'FRPUECN')
assert repr(pkt.flags) == '<Flag 18 (SA)>'
pkt.flags.U = True
pkt.flags.S = False
assert pkt.flags.A
assert pkt.flags.U
assert pkt.flags.AU
assert not any(getattr(pkt.flags, f) for f in 'FSRPECN')
assert repr(pkt.flags) == '<Flag 48 (AU)>'
pkt.flags &= 'SFA'
pkt.flags |= 'P'
assert pkt.flags.P
assert pkt.flags.A
assert pkt.flags.PA
assert not any(getattr(pkt.flags, f) for f in 'FSRUECN')

pkt = TCP(flags=56)
assert all(getattr(pkt.flags, f) for f in 'PAU')
assert pkt.flags.PAU
assert not any(getattr(pkt.flags, f) for f in 'FSRECN')
assert repr(pkt.flags) == '<Flag 56 (PAU)>'
pkt.flags = 50
assert all(getattr(pkt.flags, f) for f in 'SAU')
assert pkt.flags.SAU
assert not any(getattr(pkt.flags, f) for f in 'FRPECN')
assert repr(pkt.flags) == '<Flag 50 (SAU)>'

= Flag values mutation with .raw_packet_cache
~ IP TCP

pkt = IP(str(IP(flags="MF")/TCP(flags="SA")))
assert pkt.raw_packet_cache is not None
assert pkt[TCP].raw_packet_cache is not None
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
assert pkt[TCP].flags.S
assert pkt[TCP].flags.A
assert pkt[TCP].flags.SA
assert not any(getattr(pkt[TCP].flags, f) for f in 'FRPUECN')
assert repr(pkt[TCP].flags) == '<Flag 18 (SA)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
pkt[TCP].flags.U = True
pkt[TCP].flags.S = False
pkt = IP(str(pkt))
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
assert pkt[TCP].flags.A
assert pkt[TCP].flags.U
assert pkt[TCP].flags.AU
assert not any(getattr(pkt[TCP].flags, f) for f in 'FSRPECN')
assert repr(pkt[TCP].flags) == '<Flag 48 (AU)>'

= Operations on flag values
~ TCP

p1, p2 = TCP(flags="SU"), TCP(flags="AU")
assert (p1.flags & p2.flags).U
assert not any(getattr(p1.flags & p2.flags, f) for f in 'FSRPAECN')
assert all(getattr(p1.flags | p2.flags, f) for f in 'SAU')
assert (p1.flags | p2.flags).SAU
assert not any(getattr(p1.flags | p2.flags, f) for f in 'FRPECN')

assert TCP(flags="SA").flags & TCP(flags="S").flags == TCP(flags="S").flags
assert TCP(flags="SA").flags | TCP(flags="S").flags == TCP(flags="SA").flags

= Using tuples and lists as flag values
~ IP TCP

plist = PacketList(list(IP()/TCP(flags=(0, 2**9 - 1))))
assert [p[TCP].flags for p in plist] == range(512)

plist = PacketList(list(IP()/TCP(flags=["S", "SA", "A"])))
assert [p[TCP].flags for p in plist] == [2, 18, 16]
Guillaume Valadon's avatar
Guillaume Valadon committed


############
############
+ SCTP

= SCTP - Chunk Init - build
s = str(IP()/SCTP()/SCTPChunkInit(params=[SCTPChunkParamIPv4Addr()]))
gpotter2's avatar
gpotter2 committed
s == b'E\x00\x00<\x00\x01\x00\x00@\x84|;\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00@,\x0b_\x01\x00\x00\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x08\x7f\x00\x00\x01'
Guillaume Valadon's avatar
Guillaume Valadon committed

= SCTP - Chunk Init - dissection
p = IP(s)
SCTPChunkParamIPv4Addr in p and p[SCTP].chksum == 0x402c0b5f and p[SCTPChunkParamIPv4Addr].addr == "127.0.0.1"

= SCTP - SCTPChunkSACK - build
s = str(IP()/SCTP()/SCTPChunkSACK(gap_ack_list=["7:28"]))
gpotter2's avatar
gpotter2 committed
s == b'E\x00\x004\x00\x01\x00\x00@\x84|C\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00;\x01\xd4\x04\x03\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x07\x00\x1c'
Guillaume Valadon's avatar
Guillaume Valadon committed

= SCTP - SCTPChunkSACK - dissection
p = IP(s)
SCTPChunkSACK in p and p[SCTP].chksum == 0x3b01d404 and p[SCTPChunkSACK].gap_ack_list[0] == "7:28"

= SCTP - answers
(IP()/SCTP()).answers(IP()/SCTP()) == True
Guillaume Valadon's avatar
Guillaume Valadon committed

Guillaume Valadon's avatar
Guillaume Valadon committed
############
############
+ DHCP

= BOOTP - misc
BOOTP().answers(BOOTP()) == True

import random
random.seed(0x2807)
str(RandDHCPOptions()) == "[('WWW_server', '90.219.239.175')]"

value = ("hostname", "scapy")
dof = DHCPOptionsField("options", value)
dof.i2repr("", value) == '[hostname scapy]'
gpotter2's avatar
gpotter2 committed
dof.i2m("", value) == b'\x0cscapy'
Guillaume Valadon's avatar
Guillaume Valadon committed


= DHCP - build
s = str(IP()/UDP()/BOOTP(chaddr="00:01:02:03:04:05")/DHCP(options=[("message-type","discover"),"end"]))
gpotter2's avatar
gpotter2 committed
s == b'E\x00\x01\x10\x00\x01\x00\x00@\x11{\xda\x7f\x00\x00\x01\x7f\x00\x00\x01\x00C\x00D\x00\xfcf\xea\x01\x01\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0000:01:02:03:04:0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00c\x82Sc5\x01\x01\xff'
Guillaume Valadon's avatar
Guillaume Valadon committed

= DHCP - dissection
p = IP(s)
DHCP in p and p[DHCP].options[0] == ('message-type', 1)
Guillaume Valadon's avatar
Guillaume Valadon committed


############
############
+ 802.11

= 802.11 - misc
PrismHeader().answers(PrismHeader()) == True

dpl = Dot11PacketList([Dot11()/LLC()/SNAP()/IP()/UDP()])
len(dpl) == 1

dpl_ether = dpl.toEthernet()
len(dpl_ether) == 1 and Ether in dpl_ether[0]

= Dot11 - build
s = str(Dot11())
gpotter2's avatar
gpotter2 committed
s == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Guillaume Valadon's avatar
Guillaume Valadon committed

= Dot11 - dissection
p = Dot11(s)
Dot11 in p and p.addr3 == "00:00:00:00:00:00"
p.mysummary() == '802.11 Management 0L 00:00:00:00:00:00 > 00:00:00:00:00:00'

= Dot11QoS - build
s = str(Dot11(type=2, subtype=8)/Dot11QoS())
gpotter2's avatar
gpotter2 committed
s == b'\x88\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Guillaume Valadon's avatar
Guillaume Valadon committed

= Dot11QoS - dissection
p = Dot11(s)
Dot11QoS in p

= Dot11 - answers
query = Dot11(type=0, subtype=0)
Dot11(type=0, subtype=1).answers(query) == True

= Dot11 - misc
Dot11Elt(info="scapy").summary() == "SSID='scapy'"


############
############
+ ASN.1

= MIB

import tempfile
fd, fname = tempfile.mkstemp()
os.write(fd, "-- MIB test\nscapy       OBJECT IDENTIFIER ::= {test 2807}\n")
os.close(fd)

load_mib(fname)
len([k for k in conf.mib.iterkeys() if "scapy" in k]) == 1

= BER tests

BER_id_enc(42) == '*'
gpotter2's avatar
gpotter2 committed
BER_id_enc(2807) == b'\xbfw'

b = BERcodec_IPADDRESS()
r1 = b.enc("8.8.8.8")
gpotter2's avatar
gpotter2 committed
r1 == b'@\x04\x08\x08\x08\x08'

r2 = b.dec(r1)[0]
r2.val == '8.8.8.8'


+ inet.py

= IPv4 - ICMPTimeStampField
test = ICMPTimeStampField("test", None)
value = test.any2i("", "07:28:28.07")
value == 26908070
test.i2repr("", value) == '7:28:28.70'

= IPv4 - UDP null checksum
gpotter2's avatar
gpotter2 committed
IP(str(IP()/UDP()/Raw(b"\xff\xff\x01\x6a")))[UDP].chksum == 0xFFFF

= IPv4 - (IP|UDP|TCP|ICMP)Error
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/UDP()/DNS()
answer = IP(dst="192.168.0.254", src="192.168.0.2", ttl=1)/ICMP()/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/UDPerror()/DNS()

query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/UDP()/DNS()
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/UDPerror()/DNS()
answer.answers(query) == True

query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/TCP()
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/TCPerror()

answer.answers(query) == True

query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/ICMP()/"scapy"
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/ICMPerror()/"scapy"

answer.answers(query) == True

= IPv4 - utilities
l = overlap_frag(IP(dst="1.2.3.4")/ICMP()/("AB"*8), ICMP()/("CD"*8))
len(l) == 6
[len(str(IP.payload)) for p in l] == [38, 38, 38, 38, 38, 38]
[(p.frag, p.flags.MF) for p in [IP(str(p)) for p in l]] == [(0, True), (1, True), (2, True), (0, True), (1, True), (2, False)]

= IPv4 - traceroute utilities
ip_ttl = [("192.168.0.%d" % i, i) for i in xrange(1, 10)]

tr_packets = [ (IP(dst="192.168.0.1", src="192.168.0.254", ttl=ttl)/TCP(options=[("Timestamp", "00:00:%.2d.00" % ttl)])/"scapy",
                IP(dst="192.168.0.254", src=ip)/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/TCPerror()/"scapy")
               for (ip, ttl) in ip_ttl ]

tr = TracerouteResult(tr_packets)
tr.get_trace() == {'192.168.0.1': {1: ('192.168.0.1', False), 2: ('192.168.0.2', False), 3: ('192.168.0.3', False), 4: ('192.168.0.4', False), 5: ('192.168.0.5', False), 6: ('192.168.0.6', False), 7: ('192.168.0.7', False), 8: ('192.168.0.8', False), 9: ('192.168.0.9', False)}}


result_show = ""
def test_show():
    def write(s):
        global result_show
        result_show += s
    mock_stdout = mock.Mock()
    mock_stdout.write = write
    saved_stdout = sys.stdout
    sys.stdout = mock_stdout
    tr = TracerouteResult(tr_packets)
    tr.show()
    sys.stdout = saved_stdout
    expected = "  192.168.0.1:tcp80  \n"
    expected += "1 192.168.0.1     11 \n"
    expected += "2 192.168.0.2     11 \n"
    expected += "3 192.168.0.3     11 \n"
    expected += "4 192.168.0.4     11 \n"
    expected += "5 192.168.0.5     11 \n"
    expected += "6 192.168.0.6     11 \n"
    expected += "7 192.168.0.7     11 \n"
    expected += "8 192.168.0.8     11 \n"
    expected += "9 192.168.0.9     11 \n"
    index_result = result_show.index("1")
    index_expected = expected.index("1")
    assert(result_show[index_result:] == expected[index_expected:])

test_show()


@mock.patch("scapy.layers.inet.plt")
def test_timeskew_graph(mock_plt):
    def fake_plot(data, **kwargs):
        return data
    mock_plt.plot = fake_plot
    srl = SndRcvList([(a, a) for a in [IP(str(p[0])) for p in tr_packets]])
    ret = srl.timeskew_graph("192.168.0.254")
    len(ret) == 9
    ret[0][1] == 0.0

test_timeskew_graph()


tr = TracerouteResult(tr_packets)
saved_AS_resolver = conf.AS_resolver
conf.AS_resolver = None
tr.make_graph()
print len(tr.graphdef) == 491
print tr.graphdef.startswith("digraph trace {") == True
print '"192.168.0.9" ->' in tr.graphdef == True
conf.AS_resolver = conf.AS_resolver

= IPv4 - reporting

result_IPID_count = ""
def test_IPID_count():
    def write(s):
        global result_IPID_count
        result_IPID_count += s
    mock_stdout = mock.Mock()
    mock_stdout.write = write
    saved_stdout = sys.stdout
    sys.stdout = mock_stdout
    random.seed(0x2807)
    IPID_count([(IP()/UDP(), IP(id=random.randint(0, 65535))/UDP()) for i in range(3)])
    sys.stdout = saved_stdout
    lines = result_IPID_count.split("\n")
    print len(lines) == 5
    print lines[0] == "Probably 3 classes: [4613, 53881, 58437]"

test_IPID_count()