Newer
Older
n1 = ip.dst
assert isinstance(n1, Net)
assert n1.ip_regex.match(str(n1))
ip.show()
= OID
oid = OID("1.2.3.4.5.6-8")
len([ o for o in oid ]) == 3
= Net6
n1 = Net6("2001:db8::/127")
len([ip for ip in n1]) == 2
= Net6 using web address
~ netaccess ipv6
ip = IPv6(dst="www.google.com")
n1 = ip.dst
assert isinstance(n1, Net6)
assert n1.ip_regex.match(str(n1))
ip.show()
############
############
+ IPv6 helpers
= in6_getLocalUniquePrefix()
p = in6_getLocalUniquePrefix()
len(inet_pton(socket.AF_INET6, p)) == 16 and p.startswith("fd")
= Misc addresses manipulation functions
teredoAddrExtractInfo("2001:0:0a0b:0c0d:0028:f508:f508:08f5") == ("10.11.12.13", 40, "10.247.247.10", 2807)
ip6 = IP6Field("test", None)
ip6.i2repr("", "2001:0:0a0b:0c0d:0028:f508:f508:08f5") == "2001:0:0a0b:0c0d:0028:f508:f508:08f5 [Teredo srv: 10.11.12.13 cli: 10.247.247.10:2807]"
ip6.i2repr("", "2002:0102:0304::1") == "2002:0102:0304::1 [6to4 GW: 1.2.3.4]"
in6_iseui64("fe80::bae8:58ff:fed4:e5f6") == True
in6_isanycast("2001:db8::fdff:ffff:ffff:ff80") == True
a = inet_pton(socket.AF_INET6, "2001:db8::2807")
a = inet_pton(socket.AF_INET6, "fe80::bae8:58ff:fed4:e5f6")
r = inet_ntop(socket.AF_INET6, in6_getnsma(a))
r == "ff02::1:ffd4:e5f6"
in6_isllsnmaddr(r) == True
in6_isdocaddr("2001:db8::2807") == True
in6_isaddrllallnodes("ff02::1") == True
in6_isaddrllallservers("ff02::2") == True
= in6_getscope()
8063
8064
8065
8066
8067
8068
8069
8070
8071
8072
8073
8074
8075
8076
8077
8078
8079
8080
8081
8082
8083
assert in6_getscope("2001:db8::2807") == IPV6_ADDR_GLOBAL
assert in6_getscope("fec0::2807") == IPV6_ADDR_SITELOCAL
assert in6_getscope("fe80::2807") == IPV6_ADDR_LINKLOCAL
assert in6_getscope("ff02::2807") == IPV6_ADDR_LINKLOCAL
assert in6_getscope("ff0e::2807") == IPV6_ADDR_GLOBAL
assert in6_getscope("ff05::2807") == IPV6_ADDR_SITELOCAL
assert in6_getscope("ff01::2807") == IPV6_ADDR_LOOPBACK
assert in6_getscope("::1") == IPV6_ADDR_LOOPBACK
= construct_source_candidate_set()
dev_addresses = [('fe80::', IPV6_ADDR_LINKLOCAL, "linklocal"),('fec0::', IPV6_ADDR_SITELOCAL, "sitelocal"),('ff0e::', IPV6_ADDR_GLOBAL, "global")]
assert construct_source_candidate_set("2001:db8::2807", 0, dev_addresses) == ["ff0e::"]
assert construct_source_candidate_set("fec0::2807", 0, dev_addresses) == ["fec0::"]
assert construct_source_candidate_set("fe80::2807", 0, dev_addresses) == ["fe80::"]
assert construct_source_candidate_set("ff02::2807", 0, dev_addresses) == ["fe80::"]
assert construct_source_candidate_set("ff0e::2807", 0, dev_addresses) == ["ff0e::"]
assert construct_source_candidate_set("ff05::2807", 0, dev_addresses) == ["fec0::"]
assert construct_source_candidate_set("ff01::2807", 0, dev_addresses) == ["::1"]
assert construct_source_candidate_set("::", 0, dev_addresses) == ["ff0e::"]
= inet_pton()
from scapy.pton_ntop import _inet6_pton, inet_pton
import socket
ip6_bad_addrs = ["fe80::2e67:ef2d:7eca::ed8a",
"fe80:1234:abcd::192.168.40.12:abcd",
"fe80:1234:abcd::192.168.40",
"fe80:1234:abcd::192.168.400.12",
"1234:5678:9abc:def0:1234:5678:9abc:def0:",
"1234:5678:9abc:def0:1234:5678:9abc:def0:1234"]
for ip6 in ip6_bad_addrs:
rc = False
exc1 = None
res1 = inet_pton(socket.AF_INET6, ip6)
except Exception as e:
res2 = _inet6_pton(ip6)
except Exception as exc2:
rc = isinstance(exc2, type(exc1))
ip6_good_addrs = [("fe80:1234:abcd::192.168.40.12",
b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\xc0\xa8(\x0c'),
("fe80:1234:abcd::fe06",
b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\x00\x00\xfe\x06'),
("fe80::2e67:ef2d:7ece:ed8a",
b'\xfe\x80\x00\x00\x00\x00\x00\x00.g\xef-~\xce\xed\x8a'),
("::ffff",
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'),
("ffff::",
b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
('::', b'\x00' * 16)]
for ip6, res in ip6_good_addrs:
res1 = inet_pton(socket.AF_INET6, ip6)
res2 = _inet6_pton(ip6)
assert res == res1 == res2
############
############
+ Test Route class
= make_route()
r4 = Route()
tmp_route = r4.make_route(host="10.12.13.14")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561934, 4294967295, '0.0.0.0')
tmp_route = r4.make_route(net="10.12.13.0/24")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561920, 4294967040, '0.0.0.0')
r4 = Route()
len_r4 = len(r4.routes)
r4.add(net="192.168.1.0/24", gw="1.2.3.4")
r4.delt(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4
= ifchange()
r4.add(net="192.168.1.0/24", gw="1.2.3.4", dev=get_dummy_interface())
r4.ifchange(get_dummy_interface(), "5.6.7.8")
r4.routes[-1][-1] == "5.6.7.8"
= ifdel()
r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4
= ifadd() & get_if_bcast()
r4 = Route()
len_r4 = len(r4.routes)
r4.ifadd(get_dummy_interface(), "1.2.3.4/24")
len(r4.routes) == len_r4 +1
r4.get_if_bcast(get_dummy_interface()) == "1.2.3.255"
r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4
############
############
+ Random objects
= RandomEnumeration
re = RandomEnumeration(0, 7, seed=0x2807, forever=False)
[x for x in re] == ([3, 4, 2, 5, 1, 6, 0, 7] if six.PY2 else [5, 0, 2, 7, 6, 3, 1, 4])
= RandIP6
random.seed(0x2807)
r6 = RandIP6()
assert(r6 == ("d279:1205:e445:5a9f:db28:efc9:afd7:f594" if six.PY2 else
"240b:238f:b53f:b727:d0f9:bfc4:2007:e265"))
random.seed(0x2807)
r6 = RandIP6("2001:db8::-")
assert(r6 == ("2001:0db8::e445" if six.PY2 else "2001:0db8::b53f"))
assert(r6 == ("2001:0db8::efc9" if six.PY2 else "2001:0db8::bfc4"))
= RandMAC
random.seed(0x2807)
rm = RandMAC()
assert(rm == ("d2:12:e4:5a:db:ef" if six.PY2 else "24:23:b5:b7:d0:bf"))
assert(rm == ("00:01:02:03:04:05" if six.PY2 else "00:01:02:03:04:01"))
= RandOID
random.seed(0x2807)
ro = RandOID()
assert(ro == "7.222.44.194.276.116.320.6.84.97.31.5.25.20.13.84.104.18")
assert(ro == "1.2.3.41")
assert(ro == ("1.2.3.11" if six.PY2 else "1.2.3.12"))
= RandRegExp
random.seed(0x2807)
re = RandRegExp("[g-v]* @? [0-9]{3} . (g|v)")
re == ('vmuvr @ 906 \x9e g' if six.PY2 else 'irrtv @ 517 ¸ v')
= Corrupted(Bytes|Bits)
random.seed(0x2807)
cb = CorruptedBytes("ABCDE", p=0.5)
assert(sane(raw(cb)) in [".BCD)", "&BCDW"])
assert(sane(raw(cb)) in ["ECk@Y", "QB.P."])
= RandEnumKeys
~ not_pypy
random.seed(0x2807)
rek = RandEnumKeys({'a': 1, 'b': 2, 'c': 3}, seed=0x2807)
= RandSingNum
~ not_pypy
assert(rs == (3 if six.PY2 else 2))
assert(rs == (-27 if six.PY2 else -17))
= Rand*
assert(rss == ("CON:" if six.PY2 else "foo.exe:"))
assert(sane(raw(rts)) in ["...[scapy", "......scapy"])
############
############
+ Flags
= IP flags
~ IP
pkt = IP(flags="MF")
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
pkt.flags |= 'evil+MF'
pkt.flags &= 'DF+MF'
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt = IP(flags=3)
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt.flags = 6
assert not pkt.flags.MF
assert pkt.flags.DF
assert pkt.flags.evil
assert repr(pkt.flags) == '<Flag 6 (DF+evil)>'
assert len({IP().flags, IP().flags}) == 1
= TCP flags
~ TCP
pkt = TCP(flags="SA")
assert pkt.flags == 18
assert pkt.flags.S
assert pkt.flags.A
assert not any(getattr(pkt.flags, f) for f in 'FRPUECN')
assert repr(pkt.flags) == '<Flag 18 (SA)>'
pkt.flags.U = True
pkt.flags.S = False
assert pkt.flags.A
assert pkt.flags.U
assert not any(getattr(pkt.flags, f) for f in 'FSRPECN')
assert repr(pkt.flags) == '<Flag 48 (AU)>'
pkt.flags &= 'SFA'
pkt.flags |= 'P'
assert pkt.flags.P
assert pkt.flags.A
assert pkt.flags.PA
assert not any(getattr(pkt.flags, f) for f in 'FSRUECN')
pkt = TCP(flags=56)
assert all(getattr(pkt.flags, f) for f in 'PAU')
assert not any(getattr(pkt.flags, f) for f in 'FSRECN')
assert repr(pkt.flags) == '<Flag 56 (PAU)>'
pkt.flags = 50
assert all(getattr(pkt.flags, f) for f in 'SAU')
assert not any(getattr(pkt.flags, f) for f in 'FRPECN')
assert repr(pkt.flags) == '<Flag 50 (SAU)>'
= Flag values mutation with .raw_packet_cache
~ IP TCP
pkt = IP(raw(IP(flags="MF")/TCP(flags="SA")))
assert pkt.raw_packet_cache is not None
assert pkt[TCP].raw_packet_cache is not None
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
assert pkt[TCP].flags.S
assert pkt[TCP].flags.A
assert pkt[TCP].flags.SA
assert not any(getattr(pkt[TCP].flags, f) for f in 'FRPUECN')
assert repr(pkt[TCP].flags) == '<Flag 18 (SA)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
pkt[TCP].flags.U = True
pkt[TCP].flags.S = False
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
assert pkt[TCP].flags.A
assert pkt[TCP].flags.U
assert pkt[TCP].flags.AU
assert not any(getattr(pkt[TCP].flags, f) for f in 'FSRPECN')
assert repr(pkt[TCP].flags) == '<Flag 48 (AU)>'
= Operations on flag values
~ TCP
p1, p2 = TCP(flags="SU"), TCP(flags="AU")
assert (p1.flags & p2.flags).U
assert not any(getattr(p1.flags & p2.flags, f) for f in 'FSRPAECN')
assert all(getattr(p1.flags | p2.flags, f) for f in 'SAU')
assert (p1.flags | p2.flags).SAU
assert not any(getattr(p1.flags | p2.flags, f) for f in 'FRPECN')
assert TCP(flags="SA").flags & TCP(flags="S").flags == TCP(flags="S").flags
assert TCP(flags="SA").flags | TCP(flags="S").flags == TCP(flags="SA").flags
= Using tuples and lists as flag values
~ IP TCP
plist = PacketList(list(IP()/TCP(flags=(0, 2**9 - 1))))
assert [p[TCP].flags for p in plist] == [x for x in range(512)]
plist = PacketList(list(IP()/TCP(flags=["S", "SA", "A"])))
assert [p[TCP].flags for p in plist] == [2, 18, 16]
############
############
+ SCTP
= SCTP - Chunk Init - build
s = raw(IP()/SCTP()/SCTPChunkInit(params=[SCTPChunkParamIPv4Addr()]))
s == b'E\x00\x00<\x00\x01\x00\x00@\x84|;\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00@,\x0b_\x01\x00\x00\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x08\x7f\x00\x00\x01'
= SCTP - Chunk Init - dissection
p = IP(s)
SCTPChunkParamIPv4Addr in p and p[SCTP].chksum == 0x402c0b5f and p[SCTPChunkParamIPv4Addr].addr == "127.0.0.1"
= SCTP - SCTPChunkSACK - build
s = raw(IP()/SCTP()/SCTPChunkSACK(gap_ack_list=["7:28"]))
s == b'E\x00\x004\x00\x01\x00\x00@\x84|C\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00;\x01\xd4\x04\x03\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x07\x00\x1c'
= SCTP - SCTPChunkSACK - dissection
p = IP(s)
SCTPChunkSACK in p and p[SCTP].chksum == 0x3b01d404 and p[SCTPChunkSACK].gap_ack_list[0] == "7:28"
= SCTP - answers
(IP()/SCTP()).answers(IP()/SCTP()) == True
8408
8409
8410
8411
8412
8413
8414
8415
8416
8417
8418
8419
8420
8421
8422
8423
8424
8425
8426
8427
8428
8429
8430
= SCTP basic header - Dissection
~ sctp
blob = b"\x1A\x85\x26\x94\x00\x00\x00\x0D\x00\x00\x04\xD2"
p = SCTP(blob)
assert(p.dport == 9876)
assert(p.sport == 6789)
assert(p.tag == 13)
assert(p.chksum == 1234)
= basic SCTPChunkData - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x64\x61\x74\x61"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkData))
assert(p.reserved == 0)
assert(p.delay_sack == 0)
assert(p.unordered == 0)
assert(p.beginning == 0)
assert(p.ending == 0)
assert(p.tsn == 0)
assert(p.stream_id == 0)
assert(p.stream_seq == 0)
assert(p.len == (len("data") + 16))
assert(p.data == b"data")
8432
8433
8434
8435
8436
8437
8438
8439
8440
8441
8442
8443
8444
8445
8446
8447
8448
8449
8450
8451
8452
8453
8454
8455
8456
8457
8458
8459
8460
8461
8462
8463
8464
8465
8466
8467
8468
8469
8470
8471
8472
8473
8474
8475
8476
8477
8478
8479
8480
8481
8482
8483
8484
8485
8486
8487
8488
8489
8490
8491
8492
8493
8494
8495
8496
8497
8498
8499
8500
8501
8502
8503
8504
8505
= basic SCTPChunkInit - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInit))
assert(p.flags == 0)
assert(p.len == 20)
assert(p.init_tag == 0)
assert(p.a_rwnd == 0)
assert(p.n_out_streams == 0)
assert(p.n_in_streams == 0)
assert(p.init_tsn == 0)
assert(p.params == [])
= SCTPChunkInit multiple valid parameters - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x5C\x00\x00\x00\x65\x00\x00\x00\x66\x00\x67\x00\x68\x00\x00\x00\x69\x00\x0C\x00\x06\x00\x05\x00\x00\x80\x00\x00\x04\xC0\x00\x00\x04\x80\x08\x00\x07\x0F\xC1\x80\x00\x80\x03\x00\x04\x80\x02\x00\x24\x87\x77\x21\x29\x3F\xDA\x62\x0C\x06\x6F\x10\xA5\x39\x58\x60\x98\x4C\xD4\x59\xD8\x8A\x00\x85\xFB\x9E\x2E\x66\xBA\x3A\x23\x54\xEF\x80\x04\x00\x06\x00\x01\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInit))
assert(p.flags == 0)
assert(p.len == 92)
assert(p.init_tag == 101)
assert(p.a_rwnd == 102)
assert(p.n_out_streams == 103)
assert(p.n_in_streams == 104)
assert(p.init_tsn == 105)
assert(len(p.params) == 7)
params = {type(param): param for param in p.params}
assert(set(params.keys()) == {SCTPChunkParamECNCapable, SCTPChunkParamFwdTSN,
SCTPChunkParamSupportedExtensions, SCTPChunkParamChunkList,
SCTPChunkParamRandom, SCTPChunkParamRequestedHMACFunctions,
SCTPChunkParamSupportedAddrTypes})
assert(params[SCTPChunkParamECNCapable] == SCTPChunkParamECNCapable())
assert(params[SCTPChunkParamFwdTSN] == SCTPChunkParamFwdTSN())
assert(params[SCTPChunkParamSupportedExtensions] == SCTPChunkParamSupportedExtensions(len=7))
assert(params[SCTPChunkParamChunkList] == SCTPChunkParamChunkList(len=4))
assert(params[SCTPChunkParamRandom].len == 4+32)
assert(len(params[SCTPChunkParamRandom].random) == 32)
assert(params[SCTPChunkParamRequestedHMACFunctions] == SCTPChunkParamRequestedHMACFunctions(len=6))
assert(params[SCTPChunkParamSupportedAddrTypes] == SCTPChunkParamSupportedAddrTypes(len=6))
= basic SCTPChunkInitAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInitAck))
assert(p.flags == 0)
assert(p.len == 20)
assert(p.init_tag == 0)
assert(p.a_rwnd == 0)
assert(p.n_out_streams == 0)
assert(p.n_in_streams == 0)
assert(p.init_tsn == 0)
assert(p.params == [])
= SCTPChunkInitAck with state cookie - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x4C\x00\x00\x00\x65\x00\x00\x00\x66\x00\x67\x00\x68\x00\x00\x00\x69\x80\x00\x00\x04\x00\x0B\x00\x0D\x6C\x6F\x63\x61\x6C\x68\x6F\x73\x74\x00\x00\x00\xC0\x00\x00\x04\x80\x08\x00\x07\x0F\xC1\x80\x00\x00\x07\x00\x14\x00\x10\x9E\xB2\x86\xCE\xE1\x7D\x0F\x6A\xAD\xFD\xB3\x5D\xBC\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInitAck))
assert(p.flags == 0)
assert(p.len == 76)
assert(p.init_tag == 101)
assert(p.a_rwnd == 102)
assert(p.n_out_streams == 103)
assert(p.n_in_streams == 104)
assert(p.init_tsn == 105)
assert(len(p.params) == 5)
params = {type(param): param for param in p.params}
assert(set(params.keys()) == {SCTPChunkParamECNCapable, SCTPChunkParamHostname,
SCTPChunkParamFwdTSN, SCTPChunkParamSupportedExtensions,
SCTPChunkParamStateCookie})
assert(params[SCTPChunkParamECNCapable] == SCTPChunkParamECNCapable())
assert(raw(params[SCTPChunkParamHostname]) == raw(SCTPChunkParamHostname(len=13, hostname="localhost")))
8507
8508
8509
8510
8511
8512
8513
8514
8515
8516
8517
8518
8519
8520
8521
8522
8523
8524
8525
8526
8527
8528
8529
8530
8531
8532
8533
8534
8535
8536
8537
8538
8539
8540
8541
8542
8543
8544
8545
8546
8547
8548
8549
8550
8551
8552
8553
8554
8555
8556
8557
8558
8559
8560
8561
8562
8563
8564
8565
8566
8567
8568
8569
8570
8571
8572
8573
8574
8575
8576
8577
8578
8579
8580
8581
8582
8583
8584
8585
8586
8587
8588
8589
8590
8591
8592
8593
8594
8595
8596
8597
8598
8599
8600
8601
8602
8603
8604
8605
8606
8607
8608
8609
8610
8611
8612
8613
8614
8615
8616
8617
8618
8619
8620
8621
8622
8623
8624
8625
8626
8627
8628
8629
8630
8631
8632
8633
8634
assert(params[SCTPChunkParamFwdTSN] == SCTPChunkParamFwdTSN())
assert(params[SCTPChunkParamSupportedExtensions] == SCTPChunkParamSupportedExtensions(len=7))
assert(params[SCTPChunkParamStateCookie].len == 4+16)
assert(len(params[SCTPChunkParamStateCookie].cookie) == 16)
= basic SCTPChunkSACK - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkSACK))
assert(p.flags == 0)
assert(p.len == 16)
assert(p.cumul_tsn_ack == 0)
assert(p.a_rwnd == 0)
assert(p.n_gap_ack == 0)
assert(p.n_dup_tsn == 0)
assert(p.gap_ack_list == [])
assert(p.dup_tsn_list == [])
= basic SCTPChunkHeartbeatReq - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkHeartbeatReq))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.params == [])
= basic SCTPChunkHeartbeatAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkHeartbeatAck))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.params == [])
= basic SCTPChunkAbort - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAbort))
assert(p.reserved == 0)
assert(p.TCB == 0)
assert(p.len == 4)
assert(p.error_causes == "")
= basic SCTPChunkShutDown - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkShutdown))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.cumul_tsn_ack == 0)
= basic SCTPChunkShutDownAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkShutdownAck))
assert(p.flags == 0)
assert(p.len == 4)
= basic SCTPChunkError - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x09\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkError))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.error_causes == "")
= basic SCTPChunkCookieEcho - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0A\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkCookieEcho))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.cookie == "")
= basic SCTPChunkCookieAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0B\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkCookieAck))
assert(p.flags == 0)
assert(p.len == 4)
= basic SCTPChunkShutdownComplete - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0E\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkShutdownComplete))
assert(p.reserved == 0)
assert(p.TCB == 0)
assert(p.len == 4)
= basic SCTPChunkAuthentication - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0f\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAuthentication))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.shared_key_id == 0)
assert(p.HMAC_function == 0)
= basic SCTPChunkAddressConf - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xc1\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAddressConf))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.seq == 0)
assert(p.params == [])
= basic SCTPChunkAddressConfAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAddressConfAck))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.seq == 0)
assert(p.params == [])
= SCTPChunkParamRandom - Consecutive calls
~ sctp
param1, param2 = SCTPChunkParamRandom(), SCTPChunkParamRandom()
assert(param1.random != param2.random)
############
############
+ DHCP
= BOOTP - misc
BOOTP().answers(BOOTP()) == True
BOOTP().hashret() == b"\x00\x00\x00\x00"
str(RandDHCPOptions()) == "[('WWW_server', '90.219.239.175')]"
value = ("hostname", "scapy")
dof = DHCPOptionsField("options", value)
dof.i2repr("", value) == '[hostname scapy]'
unknown_value_end = b"\xfe" + b"\xff"*257
udof = DHCPOptionsField("options", unknown_value_end)
udof.m2i("", unknown_value_end) == [(254, b'\xff'*255), 'end']
unknown_value_pad = b"\xfe" + b"\xff"*256 + b"\x00"
udof = DHCPOptionsField("options", unknown_value_pad)
udof.m2i("", unknown_value_pad) == [(254, b'\xff'*255), 'pad']
s = raw(IP(src="127.0.0.1")/UDP()/BOOTP(chaddr="00:01:02:03:04:05")/DHCP(options=[("message-type","discover"),"end"]))
s == b'E\x00\x01\x10\x00\x01\x00\x00@\x11{\xda\x7f\x00\x00\x01\x7f\x00\x00\x01\x00C\x00D\x00\xfcf\xea\x01\x01\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0000:01:02:03:04:0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00c\x82Sc5\x01\x01\xff'
= DHCP - dissection
p = IP(s)
DHCP in p and p[DHCP].options[0] == ('message-type', 1)
############
############
+ 802.11
= 802.11 - misc
PrismHeader().answers(PrismHeader()) == True
dpl = Dot11PacketList([Dot11()/LLC()/SNAP()/IP()/UDP()])
len(dpl) == 1
dpl_ether = dpl.toEthernet()
len(dpl_ether) == 1 and Ether in dpl_ether[0]
= Dot11 - build
s == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
= Dot11 - dissection
p = Dot11(s)
Dot11 in p and p.addr3 == "00:00:00:00:00:00"
p.mysummary() == '802.11 Management 0 00:00:00:00:00:00 > 00:00:00:00:00:00'
s = raw(Dot11(type=2, subtype=8)/Dot11QoS(TID=4))
s == b'\x88\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00'
= Dot11QoS - dissection
p = Dot11(s)
Dot11QoS in p
= Dot11 - answers
query = Dot11(type=0, subtype=0)
Dot11(type=0, subtype=1).answers(query) == True
= Dot11 - misc
assert Dot11Elt(info="scapy").summary() == "SSID='scapy'"
assert Dot11Elt(ID=1).mysummary() == ""
= Multiple Dot11Elt layers
pkt = Dot11() / Dot11Beacon() / Dot11Elt(ID="Rates") / Dot11Elt(ID="SSID", info="Scapy")
assert pkt[Dot11Elt::{"ID": 0}].info == "Scapy"
assert pkt.getlayer(Dot11Elt, ID=0).info == "Scapy"
= Dot11WEP - build
~ crypto
conf.wepkey = ""
assert raw(PPI()/Dot11(FCfield=0x40)/Dot11WEP()) == b'\x00\x00\x08\x00i\x00\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
assert raw(PPI()/Dot11(type=2, subtype=8, FCfield=0x40)/Dot11QoS()/Dot11WEP()) == b'\x00\x00\x08\x00i\x00\x00\x00\x88@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x008(^a'
= Dot11WEP - dissect
~ crypto
conf.wepkey = "test123"
a = PPI(b'\x00\x00\x08\x00i\x00\x00\x00\x88@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x008(^a')
assert a[Dot11QoS][Dot11WEP].icv == 942169697
= Dot11 - answers
a = Dot11()/Dot11Auth(seqnum=1)
b = Dot11()/Dot11Auth(seqnum=2)
assert b.answers(a)
assert not a.answers(b)
assert not (Dot11()/Dot11Ack()).answers(Dot11())
assert (Dot11()/LLC(dsap=2, ctrl=4)).answers(Dot11()/LLC(dsap=1, ctrl=5))
############
############
+ 802.3
= Test detection
assert isinstance(Dot3(raw(Ether())),Ether)
assert isinstance(Ether(raw(Dot3())),Dot3)
a = Ether(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00')
assert isinstance(a,Dot3)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'
a = Dot3(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x90\x00')
assert isinstance(a,Ether)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'
############
############
+ ASN.1
= MIB
import tempfile
fd, fname = tempfile.mkstemp()
os.write(fd, b"-- MIB test\nscapy OBJECT IDENTIFIER ::= {test 2807}\n")
assert(len([k for k in conf.mib.iterkeys() if "scapy" in k]) == 1)
assert(len([oid for oid in conf.mib]) > 100)
assert(conf.mib._my_find("MIB", "keyUsage"))
assert(len(conf.mib._find("MIB", "keyUsage")))
assert(len(conf.mib._recurs_find_all((), "MIB", "keyUsage")))
8783
8784
8785
8786
8787
8788
8789
8790
8791
8792
8793
8794
8795
8796
8797
8798
8799
8800
8801
8802
8803
8804
8805
8806
8807
8808
8809
8810
= DADict tests
a = DADict("test")
a.test_value = "scapy"
with ContextManagerCaptureOutput() as cmco:
a._show()
assert(cmco.get_output() == "test_value = 'scapy'\n")
b = DADict("test2")
b.test_value_2 = "hello_world"
a._branch(b, 1)
try:
a._branch(b, 1)
assert False
except DADict_Exception:
pass
assert(len(a._find("test2")))
assert(len(a._find(test_value_2="hello_world")))
assert(len(a._find_all("test2")))
assert(not a._recurs_find((a,)))
assert(not a._recurs_find_all((a,)))
b = BERcodec_IPADDRESS()
r1 = b.enc("8.8.8.8")
r2 = b.dec(r1)[0]
r2.val == '8.8.8.8'
############
############
+ inet.py
= IPv4 - ICMPTimeStampField
test = ICMPTimeStampField("test", None)
value = test.any2i("", "07:28:28.07")
value == 26908070
test.i2repr("", value) == '7:28:28.70'
= IPv4 - UDP null checksum
IP(raw(IP()/UDP()/Raw(b"\xff\xff\x01\x6a")))[UDP].chksum == 0xFFFF
= IPv4 - (IP|UDP|TCP|ICMP)Error
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/UDP()/DNS()
answer = IP(dst="192.168.0.254", src="192.168.0.2", ttl=1)/ICMP()/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/UDPerror()/DNS()
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/UDP()/DNS()
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/UDPerror()/DNS()
assert(answer.answers(query) == True)
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/TCP()
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/TCPerror()
assert(answer.answers(query) == True)
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/ICMP()/"scapy"
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/ICMPerror()/"scapy"
assert(answer.answers(query) == True)
= IPv4 - mDNS
a = IP(dst="224.0.0.251")
assert a.hashret() == b"\x00"
# TODO add real case here
= IPv4 - utilities
l = overlap_frag(IP(dst="1.2.3.4")/ICMP()/("AB"*8), ICMP()/("CD"*8))
assert(len(l) == 6)
assert([len(raw(p[IP].payload)) for p in l] == [8, 8, 8, 8, 8, 8])
assert([(p.frag, p.flags.MF) for p in [IP(raw(p)) for p in l]] == [(0, True), (1, True), (2, True), (0, True), (1, True), (2, False)])
ip_ttl = [("192.168.0.%d" % i, i) for i in six.moves.range(1, 10)]
tr_packets = [ (IP(dst="192.168.0.1", src="192.168.0.254", ttl=ttl)/TCP(options=[("Timestamp", "00:00:%.2d.00" % ttl)])/"scapy",
IP(dst="192.168.0.254", src=ip)/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/TCPerror()/"scapy")
for (ip, ttl) in ip_ttl ]
tr = TracerouteResult(tr_packets)
assert(tr.get_trace() == {'192.168.0.1': {1: ('192.168.0.1', False), 2: ('192.168.0.2', False), 3: ('192.168.0.3', False), 4: ('192.168.0.4', False), 5: ('192.168.0.5', False), 6: ('192.168.0.6', False), 7: ('192.168.0.7', False), 8: ('192.168.0.8', False), 9: ('192.168.0.9', False)}})
with ContextManagerCaptureOutput() as cmco:
tr = TracerouteResult(tr_packets)
tr.show()
result_show = cmco.get_output()
expected = " 192.168.0.1:tcp80 \n"
expected += "1 192.168.0.1 11 \n"
expected += "2 192.168.0.2 11 \n"
expected += "3 192.168.0.3 11 \n"
expected += "4 192.168.0.4 11 \n"
expected += "5 192.168.0.5 11 \n"
expected += "6 192.168.0.6 11 \n"
expected += "7 192.168.0.7 11 \n"
expected += "8 192.168.0.8 11 \n"
expected += "9 192.168.0.9 11 \n"
index_result = result_show.index("1")
index_expected = expected.index("1")
assert(result_show[index_result:] == expected[index_expected:])
test_show()
def test_summary():
with ContextManagerCaptureOutput() as cmco:
tr = TracerouteResult(tr_packets)
tr.summary()
result_summary = cmco.get_output()
assert(len(result_summary.split('\n')) == 10)
assert(any(
"IP / TCP 192.168.0.254:ftp_data > 192.168.0.1:%s S / Raw ==> "
"IP / ICMP 192.168.0.9 > 192.168.0.254 time-exceeded "
"ttl-zero-during-transit / IPerror / TCPerror / "
"Raw" % http in result_summary for http in ['http', 'www_http', 'www']
))
test_summary()
@mock.patch("scapy.layers.inet.plt")
def test_timeskew_graph(mock_plt):
def fake_plot(data, **kwargs):
return data
mock_plt.plot = fake_plot
srl = SndRcvList([(a, a) for a in [IP(raw(p[0])) for p in tr_packets]])
ret = srl.timeskew_graph("192.168.0.254")
assert(len(ret) == 9)
assert(ret[0][1] == 0.0)
test_timeskew_graph()
tr = TracerouteResult(tr_packets)
saved_AS_resolver = conf.AS_resolver
conf.AS_resolver = None
tr.make_graph()
assert(len(tr.graphdef) == 491)
tr.graphdef.startswith("digraph trace {") == True
assert(('"192.168.0.9" ->' in tr.graphdef) == True)
conf.AS_resolver = conf.AS_resolver
pl = PacketList(list([Ether()/x for x in itertools.chain(*tr_packets)]))
srl, ul = pl.sr()
assert(len(srl) == 9 and len(ul) == 0)
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
assert(len(pl.sessions().keys()) == 10)
conf.color_theme = conf_color_theme
new_pl = pl.replace(IP.src, "192.168.0.254", "192.168.0.42")
assert("192.168.0.254" not in [p[IP].src for p in new_pl])
@mock.patch("scapy.layers.inet.sr")
def test_report_ports(mock_sr):
def sr(*args, **kargs):
return [(IP()/TCP(dport=65081, flags="S"), IP()/TCP(sport=65081, flags="SA")),
(IP()/TCP(dport=65082, flags="S"), IP()/ICMP(type=3, code=1)),
(IP()/TCP(dport=65083, flags="S"), IP()/TCP(sport=65083, flags="R"))], [IP()/TCP(dport=65084, flags="S")]
report = "\\begin{tabular}{|r|l|l|}\n\hline\n65081 & open & SA \\\\\n\hline\n?? & closed & ICMP type dest-unreach/host-unreachable from 127.0.0.1 \\\\\n65083 & closed & TCP R \\\\\n\hline\n65084 & ? & unanswered \\\\\n\hline\n\end{tabular}\n"
assert(report_ports("www.secdev.org", [65081,65082,65083,65084]) == report)
with ContextManagerCaptureOutput() as cmco:
random.seed(0x2807)
IPID_count([(IP()/UDP(), IP(id=random.randint(0, 65535))/UDP()) for i in range(3)])
result_IPID_count = cmco.get_output()
lines = result_IPID_count.split("\n")
assert(len(lines) == 5)
assert(lines[0] in ["Probably 3 classes: [4613, 53881, 58437]",
"Probably 3 classes: [9103, 9227, 46399]"])
############
############
+ Fields
= FieldLenField with BitField
class Test(Packet):
name = "Test"
fields_desc = [
FieldLenField("BitCount", None, fmt="H", count_of="Values"),
FieldLenField("ByteCount", None, fmt="B", length_of="Values"),
FieldListField("Values", [], BitField("data", 0x0, size=1),
count_from=lambda pkt: pkt.BitCount),
]
pkt = Test(raw(Test(Values=[0, 0, 0, 0, 1, 1, 1, 1])))
assert(pkt.BitCount == 8)
assert(pkt.ByteCount == 1)
############
############
+ MPLS tests
= MPLS - build/dissection
from scapy.contrib.mpls import MPLS
p1 = MPLS()/IP()/UDP()
assert(p1[MPLS].s == 1)
p2 = MPLS()/MPLS()/IP()/UDP()
assert(p2[MPLS].s == 0)