Skip to content
Snippets Groups Projects
regression.uts 405 KiB
Newer Older
gpotter2's avatar
gpotter2 committed
n1 = ip.dst
assert isinstance(n1, Net)
assert n1.ip_regex.match(str(n1))
ip.show()

= OID

oid = OID("1.2.3.4.5.6-8")
len([ o for o in oid ]) == 3

= Net6

n1 = Net6("2001:db8::/127")
len([ip for ip in n1]) == 2

= Net6 using web address
~ netaccess ipv6

ip = IPv6(dst="www.google.com")
n1 = ip.dst
assert isinstance(n1, Net6)
assert n1.ip_regex.match(str(n1))
ip.show()

############
############
+ IPv6 helpers

= in6_getLocalUniquePrefix()

p = in6_getLocalUniquePrefix()
len(inet_pton(socket.AF_INET6, p)) == 16 and p.startswith("fd")

= Misc addresses manipulation functions

teredoAddrExtractInfo("2001:0:0a0b:0c0d:0028:f508:f508:08f5") == ("10.11.12.13", 40, "10.247.247.10", 2807)

ip6 = IP6Field("test", None)
ip6.i2repr("", "2001:0:0a0b:0c0d:0028:f508:f508:08f5") == "2001:0:0a0b:0c0d:0028:f508:f508:08f5 [Teredo srv: 10.11.12.13 cli: 10.247.247.10:2807]"
ip6.i2repr("", "2002:0102:0304::1") == "2002:0102:0304::1 [6to4 GW: 1.2.3.4]"

in6_iseui64("fe80::bae8:58ff:fed4:e5f6") == True

in6_isanycast("2001:db8::fdff:ffff:ffff:ff80") == True

a = inet_pton(socket.AF_INET6, "2001:db8::2807")
gpotter2's avatar
gpotter2 committed
in6_xor(a, a) == b"\x00" * 16

a = inet_pton(socket.AF_INET6, "fe80::bae8:58ff:fed4:e5f6")
r = inet_ntop(socket.AF_INET6, in6_getnsma(a)) 
r == "ff02::1:ffd4:e5f6"

in6_isllsnmaddr(r) == True

in6_isdocaddr("2001:db8::2807") == True

in6_isaddrllallnodes("ff02::1") == True

in6_isaddrllallservers("ff02::2") == True

= in6_getscope()

assert in6_getscope("2001:db8::2807") == IPV6_ADDR_GLOBAL
assert in6_getscope("fec0::2807") == IPV6_ADDR_SITELOCAL
assert in6_getscope("fe80::2807") == IPV6_ADDR_LINKLOCAL
assert in6_getscope("ff02::2807") == IPV6_ADDR_LINKLOCAL
assert in6_getscope("ff0e::2807") == IPV6_ADDR_GLOBAL
assert in6_getscope("ff05::2807") == IPV6_ADDR_SITELOCAL
assert in6_getscope("ff01::2807") == IPV6_ADDR_LOOPBACK
assert in6_getscope("::1") == IPV6_ADDR_LOOPBACK

= construct_source_candidate_set()

dev_addresses = [('fe80::', IPV6_ADDR_LINKLOCAL, "linklocal"),('fec0::', IPV6_ADDR_SITELOCAL, "sitelocal"),('ff0e::', IPV6_ADDR_GLOBAL, "global")]

assert construct_source_candidate_set("2001:db8::2807", 0, dev_addresses) == ["ff0e::"]
assert construct_source_candidate_set("fec0::2807", 0, dev_addresses) == ["fec0::"]
assert construct_source_candidate_set("fe80::2807", 0, dev_addresses) == ["fe80::"]
assert construct_source_candidate_set("ff02::2807", 0, dev_addresses) == ["fe80::"]
assert construct_source_candidate_set("ff0e::2807", 0, dev_addresses) == ["ff0e::"]
assert construct_source_candidate_set("ff05::2807", 0, dev_addresses) == ["fec0::"]
assert construct_source_candidate_set("ff01::2807", 0, dev_addresses) == ["::1"]
assert construct_source_candidate_set("::", 0, dev_addresses) == ["ff0e::"]
from scapy.pton_ntop import _inet6_pton, inet_pton
import socket

ip6_bad_addrs = ["fe80::2e67:ef2d:7eca::ed8a",
                 "fe80:1234:abcd::192.168.40.12:abcd",
                 "fe80:1234:abcd::192.168.40",
                 "fe80:1234:abcd::192.168.400.12",
gpotter2's avatar
gpotter2 committed
                 "1234:5678:9abc:def0:1234:5678:9abc:def0:",
                 "1234:5678:9abc:def0:1234:5678:9abc:def0:1234"]
for ip6 in ip6_bad_addrs:
    rc = False
        res1 = inet_pton(socket.AF_INET6, ip6)
        res2 = _inet6_pton(ip6)
    except Exception as exc2:
        rc = isinstance(exc2, type(exc1))
ip6_good_addrs = [("fe80:1234:abcd::192.168.40.12",
gpotter2's avatar
gpotter2 committed
                   b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\xc0\xa8(\x0c'),
                  ("fe80:1234:abcd::fe06",
gpotter2's avatar
gpotter2 committed
                   b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\x00\x00\xfe\x06'),
                  ("fe80::2e67:ef2d:7ece:ed8a",
gpotter2's avatar
gpotter2 committed
                   b'\xfe\x80\x00\x00\x00\x00\x00\x00.g\xef-~\xce\xed\x8a'),
                  ("::ffff",
                   b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'),
                  ("ffff::",
                   b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
                  ('::', b'\x00' * 16)]
for ip6, res in ip6_good_addrs:
    res1 = inet_pton(socket.AF_INET6, ip6)
    res2 = _inet6_pton(ip6)
    assert res == res1 == res2


############
############
+ Test Route class

= make_route()

r4 = Route()
tmp_route = r4.make_route(host="10.12.13.14")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561934, 4294967295, '0.0.0.0')

tmp_route = r4.make_route(net="10.12.13.0/24")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561920, 4294967040, '0.0.0.0')

= add() & delt()

r4 = Route()
len_r4 = len(r4.routes)
r4.add(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4 + 1
r4.delt(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4

= ifchange()

r4.add(net="192.168.1.0/24", gw="1.2.3.4", dev=get_dummy_interface())
r4.ifchange(get_dummy_interface(), "5.6.7.8")
r4.routes[-1][-1] == "5.6.7.8"

= ifdel()

r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4

= ifadd() & get_if_bcast()

r4 = Route()
len_r4 = len(r4.routes)

r4.ifadd(get_dummy_interface(), "1.2.3.4/24")
r4.get_if_bcast(get_dummy_interface()) == "1.2.3.255"
r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4


############
############
+ Random objects

= RandomEnumeration

re = RandomEnumeration(0, 7, seed=0x2807, forever=False)
[x for x in re] == ([3, 4, 2, 5, 1, 6, 0, 7] if six.PY2 else [5, 0, 2, 7, 6, 3, 1, 4])

= RandIP6

random.seed(0x2807)
r6 = RandIP6()
assert(r6 == ("d279:1205:e445:5a9f:db28:efc9:afd7:f594" if six.PY2 else
              "240b:238f:b53f:b727:d0f9:bfc4:2007:e265"))

random.seed(0x2807)
r6 = RandIP6("2001:db8::-") 
assert(r6 == ("2001:0db8::e445" if six.PY2 else "2001:0db8::b53f"))

r6 = RandIP6("2001:db8::*")
assert(r6 == ("2001:0db8::efc9" if six.PY2 else "2001:0db8::bfc4"))

= RandMAC

random.seed(0x2807)
rm = RandMAC() 
assert(rm == ("d2:12:e4:5a:db:ef" if six.PY2 else "24:23:b5:b7:d0:bf"))

rm = RandMAC("00:01:02:03:04:0-7")
assert(rm == ("00:01:02:03:04:05" if six.PY2 else "00:01:02:03:04:01"))

= RandOID

random.seed(0x2807)
ro = RandOID()
assert(ro == "7.222.44.194.276.116.320.6.84.97.31.5.25.20.13.84.104.18")

ro = RandOID("1.2.3.*")

ro = RandOID("1.2.3.0-28")
assert(ro == ("1.2.3.11" if six.PY2 else "1.2.3.12"))

= RandRegExp

random.seed(0x2807)
re = RandRegExp("[g-v]* @? [0-9]{3} . (g|v)")
re == ('vmuvr @ 906 \x9e g' if six.PY2 else 'irrtv @ 517 ¸ v')

= Corrupted(Bytes|Bits)

random.seed(0x2807)
cb = CorruptedBytes("ABCDE", p=0.5)
assert(sane(raw(cb)) in [".BCD)", "&BCDW"])

cb = CorruptedBits("ABCDE", p=0.2)
assert(sane(raw(cb)) in ["ECk@Y", "QB.P."])
= RandEnumKeys
~ not_pypy
random.seed(0x2807)
rek = RandEnumKeys({'a': 1, 'b': 2, 'c': 3}, seed=0x2807)
assert(rek == ('b' if six.PY2 else 'a'))
random.seed(0x2807)
gpotter2's avatar
gpotter2 committed
rs = RandSingNum(-28, 7)
assert(rs == (3 if six.PY2 else 2))
assert(rs == (-27 if six.PY2 else -17))
random.seed(0x2807)
rss = RandSingString()
assert(rss == ("CON:" if six.PY2 else "foo.exe:"))
random.seed(0x2807)
rts = RandTermString(4, "scapy")
assert(sane(raw(rts)) in ["...[scapy", "......scapy"])


############
############
+ Flags

= IP flags
~ IP

pkt = IP(flags="MF")
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
pkt.flags |= 'evil+MF'
pkt.flags &= 'DF+MF'
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'

pkt = IP(flags=3)
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt.flags = 6
assert not pkt.flags.MF
assert pkt.flags.DF
assert pkt.flags.evil
assert repr(pkt.flags) == '<Flag 6 (DF+evil)>'
assert len({IP().flags, IP().flags}) == 1

= TCP flags
~ TCP

pkt = TCP(flags="SA")
assert pkt.flags == 18
assert pkt.flags.S
assert pkt.flags.A
assert pkt.flags.SA
assert not any(getattr(pkt.flags, f) for f in 'FRPUECN')
assert repr(pkt.flags) == '<Flag 18 (SA)>'
pkt.flags.U = True
pkt.flags.S = False
assert pkt.flags.A
assert pkt.flags.U
assert pkt.flags.AU
assert not any(getattr(pkt.flags, f) for f in 'FSRPECN')
assert repr(pkt.flags) == '<Flag 48 (AU)>'
pkt.flags &= 'SFA'
pkt.flags |= 'P'
assert pkt.flags.P
assert pkt.flags.A
assert pkt.flags.PA
assert not any(getattr(pkt.flags, f) for f in 'FSRUECN')

pkt = TCP(flags=56)
assert all(getattr(pkt.flags, f) for f in 'PAU')
assert pkt.flags.PAU
assert not any(getattr(pkt.flags, f) for f in 'FSRECN')
assert repr(pkt.flags) == '<Flag 56 (PAU)>'
pkt.flags = 50
assert all(getattr(pkt.flags, f) for f in 'SAU')
assert pkt.flags.SAU
assert not any(getattr(pkt.flags, f) for f in 'FRPECN')
assert repr(pkt.flags) == '<Flag 50 (SAU)>'

= Flag values mutation with .raw_packet_cache
~ IP TCP

pkt = IP(raw(IP(flags="MF")/TCP(flags="SA")))
assert pkt.raw_packet_cache is not None
assert pkt[TCP].raw_packet_cache is not None
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
assert pkt[TCP].flags.S
assert pkt[TCP].flags.A
assert pkt[TCP].flags.SA
assert not any(getattr(pkt[TCP].flags, f) for f in 'FRPUECN')
assert repr(pkt[TCP].flags) == '<Flag 18 (SA)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
pkt[TCP].flags.U = True
pkt[TCP].flags.S = False
pkt = IP(raw(pkt))
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
assert pkt[TCP].flags.A
assert pkt[TCP].flags.U
assert pkt[TCP].flags.AU
assert not any(getattr(pkt[TCP].flags, f) for f in 'FSRPECN')
assert repr(pkt[TCP].flags) == '<Flag 48 (AU)>'

= Operations on flag values
~ TCP

p1, p2 = TCP(flags="SU"), TCP(flags="AU")
assert (p1.flags & p2.flags).U
assert not any(getattr(p1.flags & p2.flags, f) for f in 'FSRPAECN')
assert all(getattr(p1.flags | p2.flags, f) for f in 'SAU')
assert (p1.flags | p2.flags).SAU
assert not any(getattr(p1.flags | p2.flags, f) for f in 'FRPECN')

assert TCP(flags="SA").flags & TCP(flags="S").flags == TCP(flags="S").flags
assert TCP(flags="SA").flags | TCP(flags="S").flags == TCP(flags="SA").flags

= Using tuples and lists as flag values
~ IP TCP

plist = PacketList(list(IP()/TCP(flags=(0, 2**9 - 1))))
assert [p[TCP].flags for p in plist] == [x for x in range(512)]

plist = PacketList(list(IP()/TCP(flags=["S", "SA", "A"])))
assert [p[TCP].flags for p in plist] == [2, 18, 16]
Guillaume Valadon's avatar
Guillaume Valadon committed


############
############
+ SCTP

= SCTP - Chunk Init - build
s = raw(IP()/SCTP()/SCTPChunkInit(params=[SCTPChunkParamIPv4Addr()]))
gpotter2's avatar
gpotter2 committed
s == b'E\x00\x00<\x00\x01\x00\x00@\x84|;\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00@,\x0b_\x01\x00\x00\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x08\x7f\x00\x00\x01'
Guillaume Valadon's avatar
Guillaume Valadon committed

= SCTP - Chunk Init - dissection
p = IP(s)
SCTPChunkParamIPv4Addr in p and p[SCTP].chksum == 0x402c0b5f and p[SCTPChunkParamIPv4Addr].addr == "127.0.0.1"

= SCTP - SCTPChunkSACK - build
s = raw(IP()/SCTP()/SCTPChunkSACK(gap_ack_list=["7:28"]))
gpotter2's avatar
gpotter2 committed
s == b'E\x00\x004\x00\x01\x00\x00@\x84|C\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00;\x01\xd4\x04\x03\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x07\x00\x1c'
Guillaume Valadon's avatar
Guillaume Valadon committed

= SCTP - SCTPChunkSACK - dissection
p = IP(s)
SCTPChunkSACK in p and p[SCTP].chksum == 0x3b01d404 and p[SCTPChunkSACK].gap_ack_list[0] == "7:28"

= SCTP - answers
(IP()/SCTP()).answers(IP()/SCTP()) == True
Guillaume Valadon's avatar
Guillaume Valadon committed

Lucas Pascal's avatar
Lucas Pascal committed
= SCTP basic header - Dissection
~ sctp
blob = b"\x1A\x85\x26\x94\x00\x00\x00\x0D\x00\x00\x04\xD2"
p = SCTP(blob)
assert(p.dport == 9876)
assert(p.sport == 6789)
assert(p.tag == 13)
assert(p.chksum == 1234)

= basic SCTPChunkData - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x64\x61\x74\x61"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkData))
assert(p.reserved == 0)
assert(p.delay_sack == 0)
assert(p.unordered == 0)
assert(p.beginning == 0)
assert(p.ending == 0)
assert(p.tsn == 0)
assert(p.stream_id == 0)
assert(p.stream_seq == 0)
assert(p.len == (len("data") + 16))
Lucas Pascal's avatar
Lucas Pascal committed

= basic SCTPChunkInit - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInit))
assert(p.flags == 0)
assert(p.len == 20)
assert(p.init_tag == 0)
assert(p.a_rwnd == 0)
assert(p.n_out_streams == 0)
assert(p.n_in_streams == 0)
assert(p.init_tsn == 0)
assert(p.params == [])

= SCTPChunkInit multiple valid parameters - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x5C\x00\x00\x00\x65\x00\x00\x00\x66\x00\x67\x00\x68\x00\x00\x00\x69\x00\x0C\x00\x06\x00\x05\x00\x00\x80\x00\x00\x04\xC0\x00\x00\x04\x80\x08\x00\x07\x0F\xC1\x80\x00\x80\x03\x00\x04\x80\x02\x00\x24\x87\x77\x21\x29\x3F\xDA\x62\x0C\x06\x6F\x10\xA5\x39\x58\x60\x98\x4C\xD4\x59\xD8\x8A\x00\x85\xFB\x9E\x2E\x66\xBA\x3A\x23\x54\xEF\x80\x04\x00\x06\x00\x01\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInit))
assert(p.flags == 0)
assert(p.len == 92)
assert(p.init_tag == 101)
assert(p.a_rwnd == 102)
assert(p.n_out_streams == 103)
assert(p.n_in_streams == 104)
assert(p.init_tsn == 105)
assert(len(p.params) == 7)
params = {type(param): param for param in p.params}
assert(set(params.keys()) == {SCTPChunkParamECNCapable, SCTPChunkParamFwdTSN,
                              SCTPChunkParamSupportedExtensions, SCTPChunkParamChunkList,
                              SCTPChunkParamRandom, SCTPChunkParamRequestedHMACFunctions,
                              SCTPChunkParamSupportedAddrTypes})
assert(params[SCTPChunkParamECNCapable] == SCTPChunkParamECNCapable())
assert(params[SCTPChunkParamFwdTSN] == SCTPChunkParamFwdTSN())
assert(params[SCTPChunkParamSupportedExtensions] == SCTPChunkParamSupportedExtensions(len=7))
assert(params[SCTPChunkParamChunkList] == SCTPChunkParamChunkList(len=4))
assert(params[SCTPChunkParamRandom].len == 4+32)
assert(len(params[SCTPChunkParamRandom].random) == 32)
assert(params[SCTPChunkParamRequestedHMACFunctions] == SCTPChunkParamRequestedHMACFunctions(len=6))
assert(params[SCTPChunkParamSupportedAddrTypes] == SCTPChunkParamSupportedAddrTypes(len=6))

= basic SCTPChunkInitAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInitAck))
assert(p.flags == 0)
assert(p.len == 20)
assert(p.init_tag == 0)
assert(p.a_rwnd == 0)
assert(p.n_out_streams == 0)
assert(p.n_in_streams == 0)
assert(p.init_tsn == 0)
assert(p.params == [])

= SCTPChunkInitAck with state cookie - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x4C\x00\x00\x00\x65\x00\x00\x00\x66\x00\x67\x00\x68\x00\x00\x00\x69\x80\x00\x00\x04\x00\x0B\x00\x0D\x6C\x6F\x63\x61\x6C\x68\x6F\x73\x74\x00\x00\x00\xC0\x00\x00\x04\x80\x08\x00\x07\x0F\xC1\x80\x00\x00\x07\x00\x14\x00\x10\x9E\xB2\x86\xCE\xE1\x7D\x0F\x6A\xAD\xFD\xB3\x5D\xBC\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInitAck))
assert(p.flags == 0)
assert(p.len == 76)
assert(p.init_tag == 101)
assert(p.a_rwnd == 102)
assert(p.n_out_streams == 103)
assert(p.n_in_streams == 104)
assert(p.init_tsn == 105)
assert(len(p.params) == 5)
params = {type(param): param for param in p.params}
assert(set(params.keys()) == {SCTPChunkParamECNCapable, SCTPChunkParamHostname,
                              SCTPChunkParamFwdTSN, SCTPChunkParamSupportedExtensions,
                              SCTPChunkParamStateCookie})
assert(params[SCTPChunkParamECNCapable] == SCTPChunkParamECNCapable())
assert(raw(params[SCTPChunkParamHostname]) == raw(SCTPChunkParamHostname(len=13, hostname="localhost")))
Lucas Pascal's avatar
Lucas Pascal committed
assert(params[SCTPChunkParamFwdTSN] == SCTPChunkParamFwdTSN())
assert(params[SCTPChunkParamSupportedExtensions] == SCTPChunkParamSupportedExtensions(len=7))
assert(params[SCTPChunkParamStateCookie].len == 4+16)
assert(len(params[SCTPChunkParamStateCookie].cookie) == 16)

= basic SCTPChunkSACK - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkSACK))
assert(p.flags == 0)
assert(p.len == 16)
assert(p.cumul_tsn_ack == 0)
assert(p.a_rwnd == 0)
assert(p.n_gap_ack == 0)
assert(p.n_dup_tsn == 0)
assert(p.gap_ack_list == [])
assert(p.dup_tsn_list == [])

= basic SCTPChunkHeartbeatReq - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkHeartbeatReq))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.params == [])

= basic SCTPChunkHeartbeatAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkHeartbeatAck))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.params == [])

= basic SCTPChunkAbort - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAbort))
assert(p.reserved == 0)
assert(p.TCB == 0)
assert(p.len == 4)
assert(p.error_causes == "")

= basic SCTPChunkShutDown - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkShutdown))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.cumul_tsn_ack == 0)

= basic SCTPChunkShutDownAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkShutdownAck))
assert(p.flags == 0)
assert(p.len == 4)

= basic SCTPChunkError - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x09\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkError))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.error_causes == "")

= basic SCTPChunkCookieEcho - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0A\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkCookieEcho))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.cookie == "")

= basic SCTPChunkCookieAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0B\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkCookieAck))
assert(p.flags == 0)
assert(p.len == 4)

= basic SCTPChunkShutdownComplete - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0E\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkShutdownComplete))
assert(p.reserved == 0)
assert(p.TCB == 0)
assert(p.len == 4)

= basic SCTPChunkAuthentication - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0f\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAuthentication))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.shared_key_id == 0)
assert(p.HMAC_function == 0)

= basic SCTPChunkAddressConf - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xc1\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAddressConf))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.seq == 0)
assert(p.params == [])

= basic SCTPChunkAddressConfAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAddressConfAck))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.seq == 0)
assert(p.params == [])
= SCTPChunkParamRandom - Consecutive calls
~ sctp
param1, param2 = SCTPChunkParamRandom(), SCTPChunkParamRandom()
assert(param1.random != param2.random)
Guillaume Valadon's avatar
Guillaume Valadon committed
############
############
+ DHCP

= BOOTP - misc
BOOTP().answers(BOOTP()) == True
BOOTP().hashret() == b"\x00\x00\x00\x00"
Guillaume Valadon's avatar
Guillaume Valadon committed

import random
random.seed(0x2807)
gpotter2's avatar
gpotter2 committed
str(RandDHCPOptions()) == "[('WWW_server', '90.219.239.175')]"
Guillaume Valadon's avatar
Guillaume Valadon committed

value = ("hostname", "scapy")
dof = DHCPOptionsField("options", value)
dof.i2repr("", value) == '[hostname scapy]'
gpotter2's avatar
gpotter2 committed
dof.i2m("", value) == b'\x0cscapy'
Guillaume Valadon's avatar
Guillaume Valadon committed

unknown_value_end = b"\xfe" + b"\xff"*257
udof = DHCPOptionsField("options", unknown_value_end)
gpotter2's avatar
gpotter2 committed
udof.m2i("", unknown_value_end) == [(254, b'\xff'*255), 'end']

unknown_value_pad = b"\xfe" + b"\xff"*256 + b"\x00"
udof = DHCPOptionsField("options", unknown_value_pad)
gpotter2's avatar
gpotter2 committed
udof.m2i("", unknown_value_pad) == [(254, b'\xff'*255), 'pad']
Guillaume Valadon's avatar
Guillaume Valadon committed

= DHCP - build
gpotter2's avatar
gpotter2 committed
s = raw(IP(src="127.0.0.1")/UDP()/BOOTP(chaddr="00:01:02:03:04:05")/DHCP(options=[("message-type","discover"),"end"]))
gpotter2's avatar
gpotter2 committed
s == b'E\x00\x01\x10\x00\x01\x00\x00@\x11{\xda\x7f\x00\x00\x01\x7f\x00\x00\x01\x00C\x00D\x00\xfcf\xea\x01\x01\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0000:01:02:03:04:0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00c\x82Sc5\x01\x01\xff'
Guillaume Valadon's avatar
Guillaume Valadon committed

= DHCP - dissection
p = IP(s)
DHCP in p and p[DHCP].options[0] == ('message-type', 1)
Guillaume Valadon's avatar
Guillaume Valadon committed


############
############
+ 802.11

= 802.11 - misc
PrismHeader().answers(PrismHeader()) == True

dpl = Dot11PacketList([Dot11()/LLC()/SNAP()/IP()/UDP()])
len(dpl) == 1

dpl_ether = dpl.toEthernet()
len(dpl_ether) == 1 and Ether in dpl_ether[0]

= Dot11 - build
s = raw(Dot11())
gpotter2's avatar
gpotter2 committed
s == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Guillaume Valadon's avatar
Guillaume Valadon committed

= Dot11 - dissection
p = Dot11(s)
Dot11 in p and p.addr3 == "00:00:00:00:00:00"
p.mysummary() == '802.11 Management 0 00:00:00:00:00:00 > 00:00:00:00:00:00'
Guillaume Valadon's avatar
Guillaume Valadon committed

= Dot11QoS - build
s = raw(Dot11(type=2, subtype=8)/Dot11QoS(TID=4))
s == b'\x88\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00'
Guillaume Valadon's avatar
Guillaume Valadon committed

= Dot11QoS - dissection
p = Dot11(s)
Dot11QoS in p

= Dot11 - answers
query = Dot11(type=0, subtype=0)
Dot11(type=0, subtype=1).answers(query) == True

= Dot11 - misc
assert Dot11Elt(info="scapy").summary() == "SSID='scapy'"
assert Dot11Elt(ID=1).mysummary() == ""

= Multiple Dot11Elt layers
pkt = Dot11() / Dot11Beacon() / Dot11Elt(ID="Rates") / Dot11Elt(ID="SSID", info="Scapy")
assert pkt[Dot11Elt::{"ID": 0}].info == "Scapy"
assert pkt.getlayer(Dot11Elt, ID=0).info == "Scapy"

= Dot11WEP - build
~ crypto
conf.wepkey = ""
assert raw(PPI()/Dot11(FCfield=0x40)/Dot11WEP()) == b'\x00\x00\x08\x00i\x00\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
conf.wepkey = "test123"
assert raw(PPI()/Dot11(type=2, subtype=8, FCfield=0x40)/Dot11QoS()/Dot11WEP()) == b'\x00\x00\x08\x00i\x00\x00\x00\x88@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x008(^a'

= Dot11WEP - dissect
~ crypto
conf.wepkey = "test123"
a = PPI(b'\x00\x00\x08\x00i\x00\x00\x00\x88@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x008(^a')
assert a[Dot11QoS][Dot11WEP].icv == 942169697

= Dot11 - answers
a = Dot11()/Dot11Auth(seqnum=1)
b = Dot11()/Dot11Auth(seqnum=2)
assert b.answers(a)
assert not a.answers(b)

assert not (Dot11()/Dot11Ack()).answers(Dot11())
assert (Dot11()/LLC(dsap=2, ctrl=4)).answers(Dot11()/LLC(dsap=1, ctrl=5))

############
############
+ 802.3

= Test detection

assert isinstance(Dot3(raw(Ether())),Ether)
assert isinstance(Ether(raw(Dot3())),Dot3)

a = Ether(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00')
assert isinstance(a,Dot3)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'

a = Dot3(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x90\x00')
assert isinstance(a,Ether)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'


############
############
+ ASN.1

= MIB

import tempfile
fd, fname = tempfile.mkstemp()
os.write(fd, b"-- MIB test\nscapy       OBJECT IDENTIFIER ::= {test 2807}\n")
os.close(fd)

load_mib(fname)
assert(len([k for k in conf.mib.iterkeys() if "scapy" in k]) == 1)

assert(len([oid for oid in conf.mib]) > 100)
assert(conf.mib._my_find("MIB", "keyUsage"))

assert(len(conf.mib._find("MIB", "keyUsage")))

assert(len(conf.mib._recurs_find_all((), "MIB", "keyUsage")))

= DADict tests

a = DADict("test")
a.test_value = "scapy"
with ContextManagerCaptureOutput() as cmco:
    a._show()
    assert(cmco.get_output() == "test_value = 'scapy'\n")

b = DADict("test2")
b.test_value_2 = "hello_world"

a._branch(b, 1)
try:
    a._branch(b, 1)
    assert False
except DADict_Exception:
    pass

assert(len(a._find("test2")))

assert(len(a._find(test_value_2="hello_world")))

assert(len(a._find_all("test2")))

assert(not a._recurs_find((a,)))

assert(not a._recurs_find_all((a,)))

= BER tests

BER_id_enc(42) == '*'
gpotter2's avatar
gpotter2 committed
BER_id_enc(2807) == b'\xbfw'

b = BERcodec_IPADDRESS()
r1 = b.enc("8.8.8.8")
gpotter2's avatar
gpotter2 committed
r1 == b'@\x04\x08\x08\x08\x08'

r2 = b.dec(r1)[0]
r2.val == '8.8.8.8'
+ inet.py

= IPv4 - ICMPTimeStampField
test = ICMPTimeStampField("test", None)
value = test.any2i("", "07:28:28.07")
value == 26908070
test.i2repr("", value) == '7:28:28.70'

= IPv4 - UDP null checksum
IP(raw(IP()/UDP()/Raw(b"\xff\xff\x01\x6a")))[UDP].chksum == 0xFFFF

= IPv4 - (IP|UDP|TCP|ICMP)Error
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/UDP()/DNS()
answer = IP(dst="192.168.0.254", src="192.168.0.2", ttl=1)/ICMP()/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/UDPerror()/DNS()

query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/UDP()/DNS()
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/UDPerror()/DNS()
assert(answer.answers(query) == True)

query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/TCP()
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/TCPerror()

assert(answer.answers(query) == True)

query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/ICMP()/"scapy"
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/ICMPerror()/"scapy"
assert(answer.answers(query) == True)
= IPv4 - mDNS
a = IP(dst="224.0.0.251")
assert a.hashret() == b"\x00"

# TODO add real case here

= IPv4 - utilities
l = overlap_frag(IP(dst="1.2.3.4")/ICMP()/("AB"*8), ICMP()/("CD"*8))
assert([len(raw(p[IP].payload)) for p in l] == [8, 8, 8, 8, 8, 8])
assert([(p.frag, p.flags.MF) for p in [IP(raw(p)) for p in l]] == [(0, True), (1, True), (2, True), (0, True), (1, True), (2, False)])

= IPv4 - traceroute utilities
ip_ttl = [("192.168.0.%d" % i, i) for i in six.moves.range(1, 10)]

tr_packets = [ (IP(dst="192.168.0.1", src="192.168.0.254", ttl=ttl)/TCP(options=[("Timestamp", "00:00:%.2d.00" % ttl)])/"scapy",
                IP(dst="192.168.0.254", src=ip)/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/TCPerror()/"scapy")
               for (ip, ttl) in ip_ttl ]

tr = TracerouteResult(tr_packets)
assert(tr.get_trace() == {'192.168.0.1': {1: ('192.168.0.1', False), 2: ('192.168.0.2', False), 3: ('192.168.0.3', False), 4: ('192.168.0.4', False), 5: ('192.168.0.5', False), 6: ('192.168.0.6', False), 7: ('192.168.0.7', False), 8: ('192.168.0.8', False), 9: ('192.168.0.9', False)}})

def test_show():
    with ContextManagerCaptureOutput() as cmco:
        tr = TracerouteResult(tr_packets)
        tr.show()
        result_show = cmco.get_output()
    expected = "  192.168.0.1:tcp80  \n"
    expected += "1 192.168.0.1     11 \n"
    expected += "2 192.168.0.2     11 \n"
    expected += "3 192.168.0.3     11 \n"
    expected += "4 192.168.0.4     11 \n"
    expected += "5 192.168.0.5     11 \n"
    expected += "6 192.168.0.6     11 \n"
    expected += "7 192.168.0.7     11 \n"
    expected += "8 192.168.0.8     11 \n"
    expected += "9 192.168.0.9     11 \n"
    index_result = result_show.index("1")
    index_expected = expected.index("1")
    assert(result_show[index_result:] == expected[index_expected:])

test_show()

    with ContextManagerCaptureOutput() as cmco:
        tr = TracerouteResult(tr_packets)
        tr.summary()
        result_summary = cmco.get_output()
    assert(len(result_summary.split('\n')) == 10)
    assert(any(
        "IP / TCP 192.168.0.254:ftp_data > 192.168.0.1:%s S / Raw ==> "
        "IP / ICMP 192.168.0.9 > 192.168.0.254 time-exceeded "
        "ttl-zero-during-transit / IPerror / TCPerror / "
        "Raw" % http in result_summary for http in ['http', 'www_http', 'www']
    ))

@mock.patch("scapy.layers.inet.plt")
def test_timeskew_graph(mock_plt):
    def fake_plot(data, **kwargs):
        return data
    mock_plt.plot = fake_plot
    srl = SndRcvList([(a, a) for a in [IP(raw(p[0])) for p in tr_packets]])
    ret = srl.timeskew_graph("192.168.0.254")
    assert(len(ret) == 9)
    assert(ret[0][1] == 0.0)

test_timeskew_graph()

tr = TracerouteResult(tr_packets)
saved_AS_resolver = conf.AS_resolver
conf.AS_resolver = None
tr.make_graph()
assert(len(tr.graphdef) == 491)
tr.graphdef.startswith("digraph trace {") == True
assert(('"192.168.0.9" ->' in tr.graphdef) == True)
conf.AS_resolver = conf.AS_resolver

pl = PacketList(list([Ether()/x for x in itertools.chain(*tr_packets)]))
srl, ul = pl.sr()
assert(len(srl) == 9 and len(ul) == 0)

conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
assert(len(pl.sessions().keys()) == 10)
conf.color_theme = conf_color_theme

new_pl = pl.replace(IP.src, "192.168.0.254", "192.168.0.42")
assert("192.168.0.254" not in [p[IP].src for p in new_pl])

= IPv4 - reporting

@mock.patch("scapy.layers.inet.sr")
def test_report_ports(mock_sr):
    def sr(*args, **kargs):
        return [(IP()/TCP(dport=65081, flags="S"), IP()/TCP(sport=65081, flags="SA")),
                (IP()/TCP(dport=65082, flags="S"), IP()/ICMP(type=3, code=1)),
                (IP()/TCP(dport=65083, flags="S"), IP()/TCP(sport=65083, flags="R"))], [IP()/TCP(dport=65084, flags="S")]
    mock_sr.side_effect = sr
    report = "\\begin{tabular}{|r|l|l|}\n\hline\n65081 & open & SA \\\\\n\hline\n?? & closed & ICMP type dest-unreach/host-unreachable from 127.0.0.1 \\\\\n65083 & closed & TCP R \\\\\n\hline\n65084 & ? & unanswered \\\\\n\hline\n\end{tabular}\n"
    assert(report_ports("www.secdev.org", [65081,65082,65083,65084]) == report)
def test_IPID_count():
    with ContextManagerCaptureOutput() as cmco:
        random.seed(0x2807)
        IPID_count([(IP()/UDP(), IP(id=random.randint(0, 65535))/UDP()) for i in range(3)])
        result_IPID_count = cmco.get_output()
    lines = result_IPID_count.split("\n")
    assert(lines[0] in ["Probably 3 classes: [4613, 53881, 58437]",
                        "Probably 3 classes: [9103, 9227, 46399]"])

test_IPID_count()


############
############
+ Fields

= FieldLenField with BitField
class Test(Packet):
    name = "Test"
    fields_desc = [
        FieldLenField("BitCount", None, fmt="H", count_of="Values"),
        FieldLenField("ByteCount", None, fmt="B", length_of="Values"),
        FieldListField("Values", [], BitField("data", 0x0, size=1),
                       count_from=lambda pkt: pkt.BitCount),
    ]

pkt = Test(raw(Test(Values=[0, 0, 0, 0, 1, 1, 1, 1])))
assert(pkt.BitCount == 8)
assert(pkt.ByteCount == 1)

############
############
+ MPLS tests

= MPLS - build/dissection
from scapy.contrib.mpls import MPLS
p1 = MPLS()/IP()/UDP()
assert(p1[MPLS].s == 1)
p2 = MPLS()/MPLS()/IP()/UDP()
assert(p2[MPLS].s == 0)