Newer
Older
res2 = _inet6_pton(ip6)
except Exception as exc2:
rc = isinstance(exc2, type(exc1))
ip6_good_addrs = [("fe80:1234:abcd::192.168.40.12",
b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\xc0\xa8(\x0c'),
("fe80:1234:abcd::fe06",
b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\x00\x00\xfe\x06'),
("fe80::2e67:ef2d:7ece:ed8a",
b'\xfe\x80\x00\x00\x00\x00\x00\x00.g\xef-~\xce\xed\x8a'),
("::ffff",
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'),
("ffff::",
b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
('::', b'\x00' * 16)]
for ip6, res in ip6_good_addrs:
res1 = inet_pton(socket.AF_INET6, ip6)
res2 = _inet6_pton(ip6)
assert res == res1 == res2
############
############
+ Test Route class
= make_route()
r4 = Route()
tmp_route = r4.make_route(host="10.12.13.14")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561934, 4294967295L, '0.0.0.0')
tmp_route = r4.make_route(net="10.12.13.0/24")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561920, 4294967040L, '0.0.0.0')
r4 = Route()
len_r4 = len(r4.routes)
r4.add(net="192.168.1.0/24", gw="1.2.3.4")
r4.delt(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4
= ifchange()
r4.add(net="192.168.1.0/24", gw="1.2.3.4", dev=get_dummy_interface())
r4.ifchange(get_dummy_interface(), "5.6.7.8")
r4.routes[-1][-1] == "5.6.7.8"
= ifdel()
r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4
= ifadd() & get_if_bcast()
r4 = Route()
len_r4 = len(r4.routes)
r4.ifadd(get_dummy_interface(), "1.2.3.4/24")
len(r4.routes) == len_r4 +1
r4.get_if_bcast(get_dummy_interface()) == "1.2.3.255"
r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4
############
############
+ Random objects
= RandomEnumeration
re = RandomEnumeration(0, 7, seed=0x2807, forever=False)
[x for x in re] == [3, 4, 2, 5, 1, 6, 0, 7]
= RandIP6
random.seed(0x2807)
r6 = RandIP6()
assert(r6 == "d279:1205:e445:5a9f:db28:efc9:afd7:f594")
random.seed(0x2807)
r6 = RandIP6("2001:db8::-")
= RandMAC
random.seed(0x2807)
rm = RandMAC()
assert(rm == "d2:12:e4:5a:db:ef")
assert(rm == "00:01:02:03:04:05")
= RandOID
random.seed(0x2807)
ro = RandOID()
assert(ro == "7.222.44.194.276.116.320.6.84.97.31.5.25.20.13.84.104.18")
assert(ro == "1.2.3.41")
assert(ro == "1.2.3.11")
= RandRegExp
random.seed(0x2807)
re = RandRegExp("[g-v]* @? [0-9]{3} . (g|v)")
re == 'vmuvr @ 906 \x9e g'
= Corrupted(Bytes|Bits)
random.seed(0x2807)
cb = CorruptedBytes("ABCDE", p=0.5)
assert(sane(str(cb)) == ".BCD)")
assert(sane(str(cb)) == "ECk@Y")
= RandEnumKeys
~ not_pypy
random.seed(0x2807)
rek = RandEnumKeys({'a': 1, 'b': 2, 'c': 3}, seed=0x2807)
assert(rek == 'b')
= RandSingNum
~ not_pypy
random.seed(0x2807)
rs = RandSingNum(-28, 07)
assert(rs == -27)
= Rand*
assert(rss == "CON:")
assert(sane(str(rts)) == "...[scapy")
############
############
+ Flags
= IP flags
~ IP
pkt = IP(flags="MF")
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
pkt.flags |= 'evil+MF'
pkt.flags &= 'DF+MF'
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt = IP(flags=3)
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt.flags = 6
assert not pkt.flags.MF
assert pkt.flags.DF
assert pkt.flags.evil
assert repr(pkt.flags) == '<Flag 6 (DF+evil)>'
= TCP flags
~ TCP
pkt = TCP(flags="SA")
assert pkt.flags == 18
assert pkt.flags.S
assert pkt.flags.A
assert not any(getattr(pkt.flags, f) for f in 'FRPUECN')
assert repr(pkt.flags) == '<Flag 18 (SA)>'
pkt.flags.U = True
pkt.flags.S = False
assert pkt.flags.A
assert pkt.flags.U
assert not any(getattr(pkt.flags, f) for f in 'FSRPECN')
assert repr(pkt.flags) == '<Flag 48 (AU)>'
pkt.flags &= 'SFA'
pkt.flags |= 'P'
assert pkt.flags.P
assert pkt.flags.A
assert pkt.flags.PA
assert not any(getattr(pkt.flags, f) for f in 'FSRUECN')
pkt = TCP(flags=56)
assert all(getattr(pkt.flags, f) for f in 'PAU')
assert not any(getattr(pkt.flags, f) for f in 'FSRECN')
assert repr(pkt.flags) == '<Flag 56 (PAU)>'
pkt.flags = 50
assert all(getattr(pkt.flags, f) for f in 'SAU')
assert not any(getattr(pkt.flags, f) for f in 'FRPECN')
assert repr(pkt.flags) == '<Flag 50 (SAU)>'
= Flag values mutation with .raw_packet_cache
~ IP TCP
pkt = IP(str(IP(flags="MF")/TCP(flags="SA")))
assert pkt.raw_packet_cache is not None
assert pkt[TCP].raw_packet_cache is not None
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
assert pkt[TCP].flags.S
assert pkt[TCP].flags.A
assert pkt[TCP].flags.SA
assert not any(getattr(pkt[TCP].flags, f) for f in 'FRPUECN')
assert repr(pkt[TCP].flags) == '<Flag 18 (SA)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
pkt[TCP].flags.U = True
pkt[TCP].flags.S = False
pkt = IP(str(pkt))
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
assert pkt[TCP].flags.A
assert pkt[TCP].flags.U
assert pkt[TCP].flags.AU
assert not any(getattr(pkt[TCP].flags, f) for f in 'FSRPECN')
assert repr(pkt[TCP].flags) == '<Flag 48 (AU)>'
= Operations on flag values
~ TCP
p1, p2 = TCP(flags="SU"), TCP(flags="AU")
assert (p1.flags & p2.flags).U
assert not any(getattr(p1.flags & p2.flags, f) for f in 'FSRPAECN')
assert all(getattr(p1.flags | p2.flags, f) for f in 'SAU')
assert (p1.flags | p2.flags).SAU
assert not any(getattr(p1.flags | p2.flags, f) for f in 'FRPECN')
assert TCP(flags="SA").flags & TCP(flags="S").flags == TCP(flags="S").flags
assert TCP(flags="SA").flags | TCP(flags="S").flags == TCP(flags="SA").flags
= Using tuples and lists as flag values
~ IP TCP
plist = PacketList(list(IP()/TCP(flags=(0, 2**9 - 1))))
assert [p[TCP].flags for p in plist] == [x for x in range(512)]
plist = PacketList(list(IP()/TCP(flags=["S", "SA", "A"])))
assert [p[TCP].flags for p in plist] == [2, 18, 16]
############
############
+ SCTP
= SCTP - Chunk Init - build
s = str(IP()/SCTP()/SCTPChunkInit(params=[SCTPChunkParamIPv4Addr()]))
s == b'E\x00\x00<\x00\x01\x00\x00@\x84|;\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00@,\x0b_\x01\x00\x00\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x08\x7f\x00\x00\x01'
= SCTP - Chunk Init - dissection
p = IP(s)
SCTPChunkParamIPv4Addr in p and p[SCTP].chksum == 0x402c0b5f and p[SCTPChunkParamIPv4Addr].addr == "127.0.0.1"
= SCTP - SCTPChunkSACK - build
s = str(IP()/SCTP()/SCTPChunkSACK(gap_ack_list=["7:28"]))
s == b'E\x00\x004\x00\x01\x00\x00@\x84|C\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00;\x01\xd4\x04\x03\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x07\x00\x1c'
= SCTP - SCTPChunkSACK - dissection
p = IP(s)
SCTPChunkSACK in p and p[SCTP].chksum == 0x3b01d404 and p[SCTPChunkSACK].gap_ack_list[0] == "7:28"
= SCTP - answers
(IP()/SCTP()).answers(IP()/SCTP()) == True
8299
8300
8301
8302
8303
8304
8305
8306
8307
8308
8309
8310
8311
8312
8313
8314
8315
8316
8317
8318
8319
8320
8321
8322
8323
8324
8325
8326
8327
8328
8329
8330
8331
8332
8333
8334
8335
8336
8337
8338
8339
8340
8341
8342
8343
8344
8345
8346
8347
8348
8349
8350
8351
8352
8353
8354
8355
8356
8357
8358
8359
8360
8361
8362
8363
8364
8365
8366
8367
8368
8369
8370
8371
8372
8373
8374
8375
8376
8377
8378
8379
8380
8381
8382
8383
8384
8385
8386
8387
8388
8389
8390
8391
8392
8393
8394
8395
8396
8397
8398
8399
8400
8401
8402
8403
8404
8405
8406
8407
8408
8409
8410
8411
8412
8413
8414
8415
8416
8417
8418
8419
8420
8421
8422
8423
8424
8425
8426
8427
8428
8429
8430
8431
8432
8433
8434
8435
8436
8437
8438
8439
8440
8441
8442
8443
8444
8445
8446
8447
8448
8449
8450
8451
8452
8453
8454
8455
8456
8457
8458
8459
8460
8461
8462
8463
8464
8465
8466
8467
8468
8469
8470
8471
8472
8473
8474
8475
8476
8477
8478
8479
8480
8481
8482
8483
8484
8485
8486
8487
8488
8489
8490
8491
8492
8493
8494
8495
8496
8497
8498
8499
8500
8501
8502
8503
8504
8505
8506
8507
8508
8509
8510
8511
8512
8513
8514
8515
8516
8517
8518
8519
8520
8521
8522
8523
8524
8525
= SCTP basic header - Dissection
~ sctp
blob = b"\x1A\x85\x26\x94\x00\x00\x00\x0D\x00\x00\x04\xD2"
p = SCTP(blob)
assert(p.dport == 9876)
assert(p.sport == 6789)
assert(p.tag == 13)
assert(p.chksum == 1234)
= basic SCTPChunkData - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x64\x61\x74\x61"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkData))
assert(p.reserved == 0)
assert(p.delay_sack == 0)
assert(p.unordered == 0)
assert(p.beginning == 0)
assert(p.ending == 0)
assert(p.tsn == 0)
assert(p.stream_id == 0)
assert(p.stream_seq == 0)
assert(p.len == (len("data") + 16))
assert(p.data == "data")
= basic SCTPChunkInit - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInit))
assert(p.flags == 0)
assert(p.len == 20)
assert(p.init_tag == 0)
assert(p.a_rwnd == 0)
assert(p.n_out_streams == 0)
assert(p.n_in_streams == 0)
assert(p.init_tsn == 0)
assert(p.params == [])
= SCTPChunkInit multiple valid parameters - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x5C\x00\x00\x00\x65\x00\x00\x00\x66\x00\x67\x00\x68\x00\x00\x00\x69\x00\x0C\x00\x06\x00\x05\x00\x00\x80\x00\x00\x04\xC0\x00\x00\x04\x80\x08\x00\x07\x0F\xC1\x80\x00\x80\x03\x00\x04\x80\x02\x00\x24\x87\x77\x21\x29\x3F\xDA\x62\x0C\x06\x6F\x10\xA5\x39\x58\x60\x98\x4C\xD4\x59\xD8\x8A\x00\x85\xFB\x9E\x2E\x66\xBA\x3A\x23\x54\xEF\x80\x04\x00\x06\x00\x01\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInit))
assert(p.flags == 0)
assert(p.len == 92)
assert(p.init_tag == 101)
assert(p.a_rwnd == 102)
assert(p.n_out_streams == 103)
assert(p.n_in_streams == 104)
assert(p.init_tsn == 105)
assert(len(p.params) == 7)
params = {type(param): param for param in p.params}
assert(set(params.keys()) == {SCTPChunkParamECNCapable, SCTPChunkParamFwdTSN,
SCTPChunkParamSupportedExtensions, SCTPChunkParamChunkList,
SCTPChunkParamRandom, SCTPChunkParamRequestedHMACFunctions,
SCTPChunkParamSupportedAddrTypes})
assert(params[SCTPChunkParamECNCapable] == SCTPChunkParamECNCapable())
assert(params[SCTPChunkParamFwdTSN] == SCTPChunkParamFwdTSN())
assert(params[SCTPChunkParamSupportedExtensions] == SCTPChunkParamSupportedExtensions(len=7))
assert(params[SCTPChunkParamChunkList] == SCTPChunkParamChunkList(len=4))
assert(params[SCTPChunkParamRandom].len == 4+32)
assert(len(params[SCTPChunkParamRandom].random) == 32)
assert(params[SCTPChunkParamRequestedHMACFunctions] == SCTPChunkParamRequestedHMACFunctions(len=6))
assert(params[SCTPChunkParamSupportedAddrTypes] == SCTPChunkParamSupportedAddrTypes(len=6))
= basic SCTPChunkInitAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInitAck))
assert(p.flags == 0)
assert(p.len == 20)
assert(p.init_tag == 0)
assert(p.a_rwnd == 0)
assert(p.n_out_streams == 0)
assert(p.n_in_streams == 0)
assert(p.init_tsn == 0)
assert(p.params == [])
= SCTPChunkInitAck with state cookie - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x4C\x00\x00\x00\x65\x00\x00\x00\x66\x00\x67\x00\x68\x00\x00\x00\x69\x80\x00\x00\x04\x00\x0B\x00\x0D\x6C\x6F\x63\x61\x6C\x68\x6F\x73\x74\x00\x00\x00\xC0\x00\x00\x04\x80\x08\x00\x07\x0F\xC1\x80\x00\x00\x07\x00\x14\x00\x10\x9E\xB2\x86\xCE\xE1\x7D\x0F\x6A\xAD\xFD\xB3\x5D\xBC\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkInitAck))
assert(p.flags == 0)
assert(p.len == 76)
assert(p.init_tag == 101)
assert(p.a_rwnd == 102)
assert(p.n_out_streams == 103)
assert(p.n_in_streams == 104)
assert(p.init_tsn == 105)
assert(len(p.params) == 5)
params = {type(param): param for param in p.params}
assert(set(params.keys()) == {SCTPChunkParamECNCapable, SCTPChunkParamHostname,
SCTPChunkParamFwdTSN, SCTPChunkParamSupportedExtensions,
SCTPChunkParamStateCookie})
assert(params[SCTPChunkParamECNCapable] == SCTPChunkParamECNCapable())
assert(params[SCTPChunkParamHostname] == SCTPChunkParamHostname(len=13, hostname="localhost"))
assert(params[SCTPChunkParamFwdTSN] == SCTPChunkParamFwdTSN())
assert(params[SCTPChunkParamSupportedExtensions] == SCTPChunkParamSupportedExtensions(len=7))
assert(params[SCTPChunkParamStateCookie].len == 4+16)
assert(len(params[SCTPChunkParamStateCookie].cookie) == 16)
= basic SCTPChunkSACK - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkSACK))
assert(p.flags == 0)
assert(p.len == 16)
assert(p.cumul_tsn_ack == 0)
assert(p.a_rwnd == 0)
assert(p.n_gap_ack == 0)
assert(p.n_dup_tsn == 0)
assert(p.gap_ack_list == [])
assert(p.dup_tsn_list == [])
= basic SCTPChunkHeartbeatReq - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkHeartbeatReq))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.params == [])
= basic SCTPChunkHeartbeatAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkHeartbeatAck))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.params == [])
= basic SCTPChunkAbort - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAbort))
assert(p.reserved == 0)
assert(p.TCB == 0)
assert(p.len == 4)
assert(p.error_causes == "")
= basic SCTPChunkShutDown - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkShutdown))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.cumul_tsn_ack == 0)
= basic SCTPChunkShutDownAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkShutdownAck))
assert(p.flags == 0)
assert(p.len == 4)
= basic SCTPChunkError - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x09\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkError))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.error_causes == "")
= basic SCTPChunkCookieEcho - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0A\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkCookieEcho))
assert(p.flags == 0)
assert(p.len == 4)
assert(p.cookie == "")
= basic SCTPChunkCookieAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0B\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkCookieAck))
assert(p.flags == 0)
assert(p.len == 4)
= basic SCTPChunkShutdownComplete - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0E\x00\x00\x04"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkShutdownComplete))
assert(p.reserved == 0)
assert(p.TCB == 0)
assert(p.len == 4)
= basic SCTPChunkAuthentication - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0f\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAuthentication))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.shared_key_id == 0)
assert(p.HMAC_function == 0)
= basic SCTPChunkAddressConf - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xc1\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAddressConf))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.seq == 0)
assert(p.params == [])
= basic SCTPChunkAddressConfAck - Dissection
~ sctp
blob = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x00\x00\x08\x00\x00\x00\x00"
p = SCTP(blob).lastlayer()
assert(isinstance(p, SCTPChunkAddressConfAck))
assert(p.flags == 0)
assert(p.len == 8)
assert(p.seq == 0)
assert(p.params == [])
= SCTPChunkParamRandom - Consecutive calls
~ sctp
param1, param2 = SCTPChunkParamRandom(), SCTPChunkParamRandom()
assert(param1.random != param2.random)
############
############
+ DHCP
= BOOTP - misc
BOOTP().answers(BOOTP()) == True
BOOTP().hashret() == b"\x00\x00\x00\x00"
import random
random.seed(0x2807)
str(RandDHCPOptions()) == "[('WWW_server', '90.219.239.175')]"
value = ("hostname", "scapy")
dof = DHCPOptionsField("options", value)
dof.i2repr("", value) == '[hostname scapy]'
unknown_value_end = b"\xfe" + b"\xff"*257
udof = DHCPOptionsField("options", unknown_value_end)
udof.m2i("", unknown_value_end) == [(254, '\xff'*255), 'end']
unknown_value_pad = b"\xfe" + b"\xff"*256 + b"\x00"
udof = DHCPOptionsField("options", unknown_value_pad)
udof.m2i("", unknown_value_pad) == [(254, '\xff'*255), 'pad']
= DHCP - build
s = str(IP()/UDP()/BOOTP(chaddr="00:01:02:03:04:05")/DHCP(options=[("message-type","discover"),"end"]))
s == b'E\x00\x01\x10\x00\x01\x00\x00@\x11{\xda\x7f\x00\x00\x01\x7f\x00\x00\x01\x00C\x00D\x00\xfcf\xea\x01\x01\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0000:01:02:03:04:0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00c\x82Sc5\x01\x01\xff'
= DHCP - dissection
p = IP(s)
DHCP in p and p[DHCP].options[0] == ('message-type', 1)
############
############
+ 802.11
= 802.11 - misc
PrismHeader().answers(PrismHeader()) == True
dpl = Dot11PacketList([Dot11()/LLC()/SNAP()/IP()/UDP()])
len(dpl) == 1
dpl_ether = dpl.toEthernet()
len(dpl_ether) == 1 and Ether in dpl_ether[0]
= Dot11 - build
s = str(Dot11())
s == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
= Dot11 - dissection
p = Dot11(s)
Dot11 in p and p.addr3 == "00:00:00:00:00:00"
p.mysummary() == '802.11 Management 0 00:00:00:00:00:00 > 00:00:00:00:00:00'
s = str(Dot11(type=2, subtype=8)/Dot11QoS(TID=4))
s == b'\x88\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00'
= Dot11QoS - dissection
p = Dot11(s)
Dot11QoS in p
= Dot11 - answers
query = Dot11(type=0, subtype=0)
Dot11(type=0, subtype=1).answers(query) == True
= Dot11 - misc
8601
8602
8603
8604
8605
8606
8607
8608
8609
8610
8611
8612
8613
8614
8615
8616
8617
8618
8619
8620
8621
8622
8623
8624
8625
assert Dot11Elt(info="scapy").summary() == "SSID='scapy'"
assert Dot11Elt(ID=1).mysummary() == ""
= Dot11WEP - build
~ crypto
conf.wepkey = ""
assert str(PPI()/Dot11(FCfield=0x40)/Dot11WEP()) == b'\x00\x00\x08\x00i\x00\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
conf.wepkey = "test123"
assert str(PPI()/Dot11(type=2, subtype=8, FCfield=0x40)/Dot11QoS()/Dot11WEP()) == b'\x00\x00\x08\x00i\x00\x00\x00\x88@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x008(^a'
= Dot11WEP - dissect
~ crypto
conf.wepkey = "test123"
a = PPI(b'\x00\x00\x08\x00i\x00\x00\x00\x88@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x008(^a')
assert a[Dot11QoS][Dot11WEP].icv == 942169697
= Dot11 - answers
a = Dot11()/Dot11Auth(seqnum=1)
b = Dot11()/Dot11Auth(seqnum=2)
assert b.answers(a)
assert not a.answers(b)
assert not (Dot11()/Dot11Ack()).answers(Dot11())
assert (Dot11()/LLC(dsap=2, ctrl=4)).answers(Dot11()/LLC(dsap=1, ctrl=5))
############
############
+ 802.3
= Test detection
assert isinstance(Dot3(str(Ether())),Ether)
assert isinstance(Ether(str(Dot3())),Dot3)
a = Ether(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00')
assert isinstance(a,Dot3)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'
a = Dot3(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x90\x00')
assert isinstance(a,Ether)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'
############
############
+ ASN.1
= MIB
import tempfile
fd, fname = tempfile.mkstemp()
os.write(fd, "-- MIB test\nscapy OBJECT IDENTIFIER ::= {test 2807}\n")
os.close(fd)
load_mib(fname)
assert(len([k for k in conf.mib.iterkeys() if "scapy" in k]) == 1)
assert(len([oid for oid in conf.mib]) > 100)
assert(conf.mib._my_find("MIB", "keyUsage"))
assert(len(conf.mib._find("MIB", "keyUsage")))
assert(len(conf.mib._recurs_find_all((), "MIB", "keyUsage")))
8669
8670
8671
8672
8673
8674
8675
8676
8677
8678
8679
8680
8681
8682
8683
8684
8685
8686
8687
8688
8689
8690
8691
8692
8693
8694
8695
8696
= DADict tests
a = DADict("test")
a.test_value = "scapy"
with ContextManagerCaptureOutput() as cmco:
a._show()
assert(cmco.get_output() == "test_value = 'scapy'\n")
b = DADict("test2")
b.test_value_2 = "hello_world"
a._branch(b, 1)
try:
a._branch(b, 1)
assert False
except DADict_Exception:
pass
assert(len(a._find("test2")))
assert(len(a._find(test_value_2="hello_world")))
assert(len(a._find_all("test2")))
assert(not a._recurs_find((a,)))
assert(not a._recurs_find_all((a,)))
b = BERcodec_IPADDRESS()
r1 = b.enc("8.8.8.8")
r2 = b.dec(r1)[0]
r2.val == '8.8.8.8'
############
############
+ inet.py
= IPv4 - ICMPTimeStampField
test = ICMPTimeStampField("test", None)
value = test.any2i("", "07:28:28.07")
value == 26908070
test.i2repr("", value) == '7:28:28.70'
= IPv4 - UDP null checksum
IP(str(IP()/UDP()/Raw(b"\xff\xff\x01\x6a")))[UDP].chksum == 0xFFFF
= IPv4 - (IP|UDP|TCP|ICMP)Error
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/UDP()/DNS()
answer = IP(dst="192.168.0.254", src="192.168.0.2", ttl=1)/ICMP()/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/UDPerror()/DNS()
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/UDP()/DNS()
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/UDPerror()/DNS()
assert(answer.answers(query) == True)
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/TCP()
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/TCPerror()
assert(answer.answers(query) == True)
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/ICMP()/"scapy"
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/ICMPerror()/"scapy"
assert(answer.answers(query) == True)
= IPv4 - mDNS
a = IP(dst="224.0.0.251")
assert a.hashret() == b"\x00"
# TODO add real case here
= IPv4 - utilities
l = overlap_frag(IP(dst="1.2.3.4")/ICMP()/("AB"*8), ICMP()/("CD"*8))
assert(len(l) == 6)
assert([len(str(p[IP].payload)) for p in l] == [8, 8, 8, 8, 8, 8])
assert([(p.frag, p.flags.MF) for p in [IP(str(p)) for p in l]] == [(0, True), (1, True), (2, True), (0, True), (1, True), (2, False)])
= IPv4 - traceroute utilities
ip_ttl = [("192.168.0.%d" % i, i) for i in xrange(1, 10)]
tr_packets = [ (IP(dst="192.168.0.1", src="192.168.0.254", ttl=ttl)/TCP(options=[("Timestamp", "00:00:%.2d.00" % ttl)])/"scapy",
IP(dst="192.168.0.254", src=ip)/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/TCPerror()/"scapy")
for (ip, ttl) in ip_ttl ]
tr = TracerouteResult(tr_packets)
assert(tr.get_trace() == {'192.168.0.1': {1: ('192.168.0.1', False), 2: ('192.168.0.2', False), 3: ('192.168.0.3', False), 4: ('192.168.0.4', False), 5: ('192.168.0.5', False), 6: ('192.168.0.6', False), 7: ('192.168.0.7', False), 8: ('192.168.0.8', False), 9: ('192.168.0.9', False)}})
with ContextManagerCaptureOutput() as cmco:
tr = TracerouteResult(tr_packets)
tr.show()
result_show = cmco.get_output()
expected = " 192.168.0.1:tcp80 \n"
expected += "1 192.168.0.1 11 \n"
expected += "2 192.168.0.2 11 \n"
expected += "3 192.168.0.3 11 \n"
expected += "4 192.168.0.4 11 \n"
expected += "5 192.168.0.5 11 \n"
expected += "6 192.168.0.6 11 \n"
expected += "7 192.168.0.7 11 \n"
expected += "8 192.168.0.8 11 \n"
expected += "9 192.168.0.9 11 \n"
index_result = result_show.index("1")
index_expected = expected.index("1")
assert(result_show[index_result:] == expected[index_expected:])
test_show()
def test_summary():
with ContextManagerCaptureOutput() as cmco:
tr = TracerouteResult(tr_packets)
tr.summary()
result_summary = cmco.get_output()
assert(len(result_summary.split('\n')) == 10)
assert("IP / TCP 192.168.0.254:ftp_data > 192.168.0.1:http S / Raw ==> IP / ICMP 192.168.0.9 > 192.168.0.254 time-exceeded ttl-zero-during-transit / IPerror / TCPerror / Raw" in result_summary)
test_summary()
@mock.patch("scapy.layers.inet.plt")
def test_timeskew_graph(mock_plt):
def fake_plot(data, **kwargs):
return data
mock_plt.plot = fake_plot
srl = SndRcvList([(a, a) for a in [IP(str(p[0])) for p in tr_packets]])
ret = srl.timeskew_graph("192.168.0.254")
assert(len(ret) == 9)
assert(ret[0][1] == 0.0)
test_timeskew_graph()
tr = TracerouteResult(tr_packets)
saved_AS_resolver = conf.AS_resolver
conf.AS_resolver = None
tr.make_graph()
assert(len(tr.graphdef) == 491)
tr.graphdef.startswith("digraph trace {") == True
assert(('"192.168.0.9" ->' in tr.graphdef) == True)
conf.AS_resolver = conf.AS_resolver
pl = PacketList(list([Ether()/x for x in itertools.chain(*tr_packets)]))
srl, ul = pl.sr()
assert(len(srl) == 9 and len(ul) == 0)
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
assert(len(pl.sessions().keys()) == 10)
conf.color_theme = conf_color_theme
new_pl = pl.replace(IP.src, "192.168.0.254", "192.168.0.42")
assert("192.168.0.254" not in [p[IP].src for p in new_pl])
@mock.patch("scapy.layers.inet.sr")
def test_report_ports(mock_sr):
def sr(*args, **kargs):
return [(IP()/TCP(dport=65081, flags="S"), IP()/TCP(sport=65081, flags="SA")),
(IP()/TCP(dport=65082, flags="S"), IP()/ICMP(type=3, code=1)),
(IP()/TCP(dport=65083, flags="S"), IP()/TCP(sport=65083, flags="R"))], [IP()/TCP(dport=65084, flags="S")]
report = "\\begin{tabular}{|r|l|l|}\n\hline\n65081 & open & SA \\\\\n\hline\n?? & closed & ICMP type dest-unreach/host-unreachable from 127.0.0.1 \\\\\n65083 & closed & TCP R \\\\\n\hline\n65084 & ? & unanswered \\\\\n\hline\n\end{tabular}\n"
assert(report_ports("www.secdev.org", [65081,65082,65083,65084]) == report)
with ContextManagerCaptureOutput() as cmco:
random.seed(0x2807)
IPID_count([(IP()/UDP(), IP(id=random.randint(0, 65535))/UDP()) for i in range(3)])
result_IPID_count = cmco.get_output()
lines = result_IPID_count.split("\n")
assert(len(lines) == 5)
assert(lines[0].endswith("Probably 3 classes: [4613, 53881, 58437]"))
############
############
+ Fields
= FieldLenField with BitField
class Test(Packet):
name = "Test"
fields_desc = [
FieldLenField("BitCount", None, fmt="H", count_of="Values"),
FieldLenField("ByteCount", None, fmt="B", length_of="Values"),
FieldListField("Values", [], BitField("data", 0x0, size=1),
count_from=lambda pkt: pkt.BitCount),
]
pkt = Test(str(Test(Values=[0, 0, 0, 0, 1, 1, 1, 1])))
assert(pkt.BitCount == 8)
assert(pkt.ByteCount == 1)
############
############
+ MPLS tests
= MPLS - build/dissection
from scapy.contrib.mpls import MPLS
p1 = MPLS()/IP()/UDP()
assert(p1[MPLS].s == 1)
p2 = MPLS()/MPLS()/IP()/UDP()
assert(p2[MPLS].s == 0)
p1[MPLS]
p1[IP]
p2[MPLS]
p2[MPLS:1]
p2[IP]