Newer
Older
7001
7002
7003
7004
7005
7006
7007
7008
7009
7010
7011
7012
7013
7014
7015
7016
7017
7018
7019
7020
7021
7022
7023
7024
7025
7026
7027
7028
7029
7030
7031
7032
7033
7034
7035
7036
7037
7038
7039
7040
7041
7042
7043
7044
7045
7046
7047
7048
7049
7050
7051
7052
7053
7054
7055
7056
7057
7058
7059
7060
7061
7062
7063
7064
7065
7066
7067
7068
7069
p = ss.recv()
assert(p.data == 3)
p = ss.recv()
assert(p.data == 4)
try:
ss.recv()
ret = False
except socket.error:
ret = True
assert(ret)
= Test with recv() calls that return not enough data
~ sslstreamsocket
import socket
class MockSocket(object):
def __init__(self):
self.l = [ '\x00\x00', '\x00\x01', '\x00\x00\x00', '\x02', '\x00\x00', '\x00', '\x03' ]
def recv(self, x):
if len(self.l) == 0:
raise socket.error(100, 'EOF')
return self.l.pop(0)
class TestPacket(Packet):
name = 'TestPacket'
fields_desc = [
IntField('data', 0)
]
def guess_payload_class(self, p):
return conf.padding_layer
s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)
try:
p = ss.recv()
ret = False
except:
ret = True
assert(ret)
p = ss.recv()
assert(p.data == 1)
try:
p = ss.recv()
ret = False
except:
ret = True
assert(ret)
p = ss.recv()
assert(p.data == 2)
try:
p = ss.recv()
ret = False
except:
ret = True
assert(ret)
try:
p = ss.recv()
ret = False
except:
ret = True
assert(ret)
p = ss.recv()
assert(p.data == 3)
############
############
+ Test correct conversion from binary to string of IPv6 addresses
= IPv6 bin to string conversion
from scapy.pton_ntop import _inet6_ntop, inet_ntop
import socket
7079
7080
7081
7082
7083
7084
7085
7086
7087
7088
7089
7090
7091
7092
7093
7094
7095
7096
7097
7098
7099
7100
for binfrm, address in [
('\x00' * 16, '::'),
('\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88',
'1111:2222:3333:4444:5555:6666:7777:8888'),
('\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x00\x00\x00\x00\x00\x00',
'1111:2222:3333:4444:5555::'),
('\x00\x00\x00\x00\x00\x00\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88',
'::4444:5555:6666:7777:8888'),
('\x00\x00\x00\x00\x33\x33\x44\x44\x00\x00\x00\x00\x00\x00\x88\x88',
'0:0:3333:4444::8888'),
('\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
'1::'),
('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01',
'::1'),
('\x11\x11\x00\x00\x00\x00\x44\x44\x00\x00\x00\x00\x77\x77\x88\x88',
'1111::4444:0:0:7777:8888'),
('\x10\x00\x02\x00\x00\x30\x00\x04\x00\x05\x00\x60\x07\x00\x80\x00',
'1000:200:30:4:5:60:700:8000'),
]:
addr1 = inet_ntop(socket.AF_INET6, binfrm)
addr2 = _inet6_ntop(binfrm)
assert address == addr1 == addr2
= IPv6 bin to string conversion - Zero-block of length 1
7103
7104
7105
7106
7107
7108
7109
7110
7111
7112
7113
7114
7115
7116
7117
7118
7119
7120
7121
7122
7123
7124
binfrm = '\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x00\x00\x88\x88'
addr1, addr2 = inet_ntop(socket.AF_INET6, binfrm), _inet6_ntop(binfrm)
# On Mac OS socket.inet_ntop is not fully compliant with RFC 5952 and
# shortens the single zero block to '::'. This is a valid IPv6 address
# representation anyway.
assert(addr1 in ['1111:2222:3333:4444:5555:6666:0:8888',
'1111:2222:3333:4444:5555:6666::8888'])
assert(addr2 == '1111:2222:3333:4444:5555:6666:0:8888')
= IPv6 bin to string conversion - Illegal sizes
for binfrm in ["\x00" * 15, "\x00" * 17]:
rc = False
try:
inet_ntop(socket.AF_INET6, binfrm)
except Exception as exc1:
rc = True
assert rc
try:
_inet6_ntop(binfrm)
except Exception as exc2:
rc = isinstance(exc2, type(exc1))
assert rc
7125
7126
7127
7128
7129
7130
7131
7132
7133
7134
7135
7136
7137
7138
7139
7140
7141
7142
7143
7144
7145
7146
7147
7148
7149
7150
7151
7152
7153
7154
7155
7156
############
############
+ VRRP tests
= VRRP - build
s = str(IP()/VRRP())
s == 'E\x00\x00$\x00\x01\x00\x00@p|g\x7f\x00\x00\x01\x7f\x00\x00\x01!\x01d\x00\x00\x01z\xfd\x00\x00\x00\x00\x00\x00\x00\x00'
= VRRP - dissection
p = IP(s)
VRRP in p and p[VRRP].chksum == 0x7afd
############
############
+ L2TP tests
= L2TP - build
s = str(IP()/UDP()/L2TP())
s == 'E\x00\x00*\x00\x01\x00\x00@\x11|\xc0\x7f\x00\x00\x01\x7f\x00\x00\x01\x06\xa5\x06\xa5\x00\x16\xf4e\x00\x02\x00\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
= L2TP - dissection
p = IP(s)
L2TP in p and p[L2TP].len == 14 and p.tunnel_id == 0 and p[UDP].chksum == 0xf465
############
############
+ HSRP tests
= HSRP - build & dissection
defaddr = conf.route.route('0.0.0.0')[1]
pkt = IP(str(IP()/UDP(dport=1985, sport=1985)/HSRP()/HSRPmd5()))
assert pkt[IP].dst == "224.0.0.2" and pkt[UDP].sport == pkt[UDP].dport == 1985
assert pkt[HSRP].opcode == 0 and pkt[HSRP].state == 16
assert pkt[HSRPmd5].type == 4 and pkt[HSRPmd5].sourceip == defaddr
7163
7164
7165
7166
7167
7168
7169
7170
7171
7172
7173
7174
7175
7176
7177
7178
7179
7180
7181
7182
7183
7184
7185
7186
7187
7188
############
############
+ RIP tests
= RIP - build
s = str(IP()/UDP(sport=520)/RIP()/RIPEntry()/RIPAuth(authtype=2, password="scapy"))
s == 'E\x00\x00H\x00\x01\x00\x00@\x11|\xa2\x7f\x00\x00\x01\x7f\x00\x00\x01\x02\x08\x02\x08\x004\xae\x99\x01\x01\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\xff\xff\x00\x02scapy\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
= RIP - dissection
p = IP(s)
RIPEntry in p and RIPAuth in p and p[RIPAuth].password.startswith("scapy")
############
############
+ Radius tests
= Radius - build
s = str(IP()/UDP(sport=1812)/Radius(authenticator="scapy")/RadiusAttribute(value="scapy"))
s == 'E\x00\x007\x00\x01\x00\x00@\x11|\xb3\x7f\x00\x00\x01\x7f\x00\x00\x01\x07\x14\x07\x15\x00#U\xb2\x01\x00\x00\x1bscapy\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x07scapy'
= Radius - dissection
p = IP(s)
Radius in p and len(p[Radius].attributes) == 1 and p[Radius].attributes[0].value == "scapy"
7189
7190
7191
7192
7193
7194
7195
7196
7197
7198
7199
7200
7201
7202
7203
7204
7205
7206
7207
7208
7209
7210
7211
7212
7213
7214
7215
7216
7217
7218
7219
7220
7221
7222
7223
7224
7225
7226
7227
############
############
+ Addresses generators
= Net
n1 = Net("192.168.0.0/31")
[ip for ip in n1] == ["192.168.0.0", "192.168.0.1"]
n2 = Net("192.168.0.*")
len([ip for ip in n2]) == 256
n3 = Net("192.168.0.1-5")
len([ip for ip in n3]) == 5
(n1 == n3) == False
(n3 in n2) == True
= OID
oid = OID("1.2.3.4.5.6-8")
len([ o for o in oid ]) == 3
= Net6
n1 = Net6("2001:db8::/127")
len([ip for ip in n1]) == 2
############
############
+ IPv6 helpers
= in6_getLocalUniquePrefix()
p = in6_getLocalUniquePrefix()
len(inet_pton(socket.AF_INET6, p)) == 16 and p.startswith("fd")
= Misc addresses manipulation functions
teredoAddrExtractInfo("2001:0:0a0b:0c0d:0028:f508:f508:08f5") == ("10.11.12.13", 40, "10.247.247.10", 2807)
ip6 = IP6Field("test", None)
ip6.i2repr("", "2001:0:0a0b:0c0d:0028:f508:f508:08f5") == "2001:0:0a0b:0c0d:0028:f508:f508:08f5 [Teredo srv: 10.11.12.13 cli: 10.247.247.10:2807]"
ip6.i2repr("", "2002:0102:0304::1") == "2002:0102:0304::1 [6to4 GW: 1.2.3.4]"
7238
7239
7240
7241
7242
7243
7244
7245
7246
7247
7248
7249
7250
7251
7252
7253
7254
7255
7256
7257
7258
7259
7260
7261
7262
7263
7264
7265
7266
7267
in6_iseui64("fe80::bae8:58ff:fed4:e5f6") == True
in6_isanycast("2001:db8::fdff:ffff:ffff:ff80") == True
a = inet_pton(socket.AF_INET6, "2001:db8::2807")
in6_xor(a, a) == "\x00" * 16
a = inet_pton(socket.AF_INET6, "fe80::bae8:58ff:fed4:e5f6")
r = inet_ntop(socket.AF_INET6, in6_getnsma(a))
r == "ff02::1:ffd4:e5f6"
in6_isllsnmaddr(r) == True
in6_isdocaddr("2001:db8::2807") == True
in6_isaddrllallnodes("ff02::1") == True
in6_isaddrllallservers("ff02::2") == True
= in6_getscope()
in6_getscope("2001:db8::2807") == IPV6_ADDR_GLOBAL
in6_getscope("fec0::2807") == IPV6_ADDR_SITELOCAL
in6_getscope("fe80::2807") == IPV6_ADDR_LINKLOCAL
in6_getscope("ff02::2807") == IPV6_ADDR_LINKLOCAL
in6_getscope("ff0e::2807") == IPV6_ADDR_GLOBAL
in6_getscope("ff05::2807") == IPV6_ADDR_SITELOCAL
in6_getscope("ff01::2807") == IPV6_ADDR_LOOPBACK
in6_getscope("::1") == IPV6_ADDR_LOOPBACK
= inet_pton()
from scapy.pton_ntop import _inet6_pton, inet_pton
import socket
ip6_bad_addrs = ["fe80::2e67:ef2d:7eca::ed8a",
"fe80:1234:abcd::192.168.40.12:abcd",
"fe80:1234:abcd::192.168.40",
"fe80:1234:abcd::192.168.400.12",
"1234:5678:9abc:def0:1234:5678:9abc:def0:",
"1234:5678:9abc:def0:1234:5678:9abc:def0:1234"]
for ip6 in ip6_bad_addrs:
rc = False
try:
res1 = inet_pton(socket.AF_INET6, ip6)
except Exception as exc1:
rc = True
assert rc
res2 = _inet6_pton(ip6)
except Exception as exc2:
rc = isinstance(exc2, type(exc1))
ip6_good_addrs = [("fe80:1234:abcd::192.168.40.12",
'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\xc0\xa8(\x0c'),
("fe80:1234:abcd::fe06",
'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\x00\x00\xfe\x06'),
("fe80::2e67:ef2d:7ece:ed8a",
'\xfe\x80\x00\x00\x00\x00\x00\x00.g\xef-~\xce\xed\x8a'),
("::ffff",
'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'),
("ffff::",
'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
('::', '\x00' * 16)]
for ip6, res in ip6_good_addrs:
res1 = inet_pton(socket.AF_INET6, ip6)
res2 = _inet6_pton(ip6)
assert res == res1 == res2
############
############
+ Test Route class
= make_route()
r4 = Route()
tmp_route = r4.make_route(host="10.12.13.14")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561934, 4294967295L, '0.0.0.0')
tmp_route = r4.make_route(net="10.12.13.0/24")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561920, 4294967040L, '0.0.0.0')
r4 = Route()
len_r4 = len(r4.routes)
r4.add(net="192.168.1.0/24", gw="1.2.3.4")
r4.delt(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4
= ifchange()
r4.add(net="192.168.1.0/24", gw="1.2.3.4", dev=get_dummy_interface())
r4.ifchange(get_dummy_interface(), "5.6.7.8")
r4.routes[-1][-1] == "5.6.7.8"
= ifdel()
r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4
= ifadd() & get_if_bcast()
r4 = Route()
len_r4 = len(r4.routes)
r4.ifadd(get_dummy_interface(), "1.2.3.4/24")
len(r4.routes) == len_r4 +1
r4.get_if_bcast(get_dummy_interface()) == "1.2.3.255"
r4.ifdel(get_dummy_interface())
print len(r4.routes), len_r4
7355
7356
7357
7358
7359
7360
7361
7362
7363
7364
7365
7366
7367
7368
7369
7370
7371
7372
7373
7374
7375
7376
7377
7378
7379
7380
7381
7382
7383
len(r4.routes) == len_r4
############
############
+ Random objects
= RandomEnumeration
re = RandomEnumeration(0, 7, seed=0x2807, forever=False)
[x for x in re] == [3, 4, 2, 5, 1, 6, 0, 7]
= RandIP6
random.seed(0x2807)
r6 = RandIP6()
str(r6) == "d279:1205:e445:5a9f:db28:efc9:afd7:f594"
random.seed(0x2807)
r6 = RandIP6("2001:db8::-")
print r6 == "2001:0db8::9e9c"
r6 = RandIP6("2001:db8::*")
print r6 == "2001:0db8::9ccb"
= RandMAC
random.seed(0x2807)
rm = RandMAC()
7389
7390
7391
7392
7393
7394
7395
7396
7397
7398
7399
7400
7401
7402
7403
7404
7405
7406
7407
7408
7409
7410
7411
= RandOID
random.seed(0x2807)
ro = RandOID()
ro == "7.222.44.194.276.116.320.6.84.97.31.5.25.20.13.84.104.18"
ro = RandOID("1.2.3.*")
ro == "1.2.3.41"
ro = RandOID("1.2.3.0-28")
ro == "1.2.3.11"
= RandRegExp
random.seed(0x2807)
re = RandRegExp("[g-v]* @? [0-9]{3} . (g|v)")
print re == "vmuvr @ 906 g"
= Corrupted(Bytes|Bits)
random.seed(0x2807)
cb = CorruptedBytes("ABCDE", p=0.5)
= Rand*
random.seed(0x2807)
rs = RandSingNum(-28, 07)
rss = RandSingString()
rss == "CON:"
############
############
+ Flags
= IP flags
~ IP
pkt = IP(flags="MF")
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
pkt.flags |= 'evil+MF'
pkt.flags &= 'DF+MF'
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt = IP(flags=3)
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt.flags = 6
assert not pkt.flags.MF
assert pkt.flags.DF
assert pkt.flags.evil
assert repr(pkt.flags) == '<Flag 6 (DF+evil)>'
= TCP flags
~ TCP
pkt = TCP(flags="SA")
assert pkt.flags == 18
assert pkt.flags.S
assert pkt.flags.A
assert not any(getattr(pkt.flags, f) for f in 'FRPUECN')
assert repr(pkt.flags) == '<Flag 18 (SA)>'
pkt.flags.U = True
pkt.flags.S = False
assert pkt.flags.A
assert pkt.flags.U
assert not any(getattr(pkt.flags, f) for f in 'FSRPECN')
assert repr(pkt.flags) == '<Flag 48 (AU)>'
pkt.flags &= 'SFA'
pkt.flags |= 'P'
assert pkt.flags.P
assert pkt.flags.A
assert pkt.flags.PA
assert not any(getattr(pkt.flags, f) for f in 'FSRUECN')
pkt = TCP(flags=56)
assert all(getattr(pkt.flags, f) for f in 'PAU')
assert not any(getattr(pkt.flags, f) for f in 'FSRECN')
assert repr(pkt.flags) == '<Flag 56 (PAU)>'
pkt.flags = 50
assert all(getattr(pkt.flags, f) for f in 'SAU')
assert not any(getattr(pkt.flags, f) for f in 'FRPECN')
assert repr(pkt.flags) == '<Flag 50 (SAU)>'
= Flag values mutation with .raw_packet_cache
~ IP TCP
pkt = IP(str(IP(flags="MF")/TCP(flags="SA")))
assert pkt.raw_packet_cache is not None
assert pkt[TCP].raw_packet_cache is not None
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
assert pkt[TCP].flags.S
assert pkt[TCP].flags.A
assert pkt[TCP].flags.SA
assert not any(getattr(pkt[TCP].flags, f) for f in 'FRPUECN')
assert repr(pkt[TCP].flags) == '<Flag 18 (SA)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
pkt[TCP].flags.U = True
pkt[TCP].flags.S = False
pkt = IP(str(pkt))
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
assert pkt[TCP].flags.A
assert pkt[TCP].flags.U
assert pkt[TCP].flags.AU
assert not any(getattr(pkt[TCP].flags, f) for f in 'FSRPECN')
assert repr(pkt[TCP].flags) == '<Flag 48 (AU)>'
= Operations on flag values
~ TCP
p1, p2 = TCP(flags="SU"), TCP(flags="AU")
assert (p1.flags & p2.flags).U
assert not any(getattr(p1.flags & p2.flags, f) for f in 'FSRPAECN')
assert all(getattr(p1.flags | p2.flags, f) for f in 'SAU')
assert (p1.flags | p2.flags).SAU
assert not any(getattr(p1.flags | p2.flags, f) for f in 'FRPECN')
assert TCP(flags="SA").flags & TCP(flags="S").flags == TCP(flags="S").flags
assert TCP(flags="SA").flags | TCP(flags="S").flags == TCP(flags="SA").flags
= Using tuples and lists as flag values
~ IP TCP
plist = PacketList(list(IP()/TCP(flags=(0, 2**9 - 1))))
assert [p[TCP].flags for p in plist] == range(512)
plist = PacketList(list(IP()/TCP(flags=["S", "SA", "A"])))
assert [p[TCP].flags for p in plist] == [2, 18, 16]
7558
7559
7560
7561
7562
7563
7564
7565
7566
7567
7568
7569
7570
7571
7572
7573
7574
7575
7576
7577
7578
7579
7580
7581
############
############
+ SCTP
= SCTP - Chunk Init - build
s = str(IP()/SCTP()/SCTPChunkInit(params=[SCTPChunkParamIPv4Addr()]))
s == 'E\x00\x00<\x00\x01\x00\x00@\x84|;\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00@,\x0b_\x01\x00\x00\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x08\x7f\x00\x00\x01'
= SCTP - Chunk Init - dissection
p = IP(s)
SCTPChunkParamIPv4Addr in p and p[SCTP].chksum == 0x402c0b5f and p[SCTPChunkParamIPv4Addr].addr == "127.0.0.1"
= SCTP - SCTPChunkSACK - build
s = str(IP()/SCTP()/SCTPChunkSACK(gap_ack_list=["7:28"]))
s == 'E\x00\x004\x00\x01\x00\x00@\x84|C\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00;\x01\xd4\x04\x03\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x07\x00\x1c'
= SCTP - SCTPChunkSACK - dissection
p = IP(s)
SCTPChunkSACK in p and p[SCTP].chksum == 0x3b01d404 and p[SCTPChunkSACK].gap_ack_list[0] == "7:28"
= SCTP - answers
(IP()/SCTP()).answers(IP()/SCTP()) == True
7584
7585
7586
7587
7588
7589
7590
7591
7592
7593
7594
7595
7596
7597
7598
7599
7600
7601
7602
7603
7604
7605
7606
7607
############
############
+ DHCP
= BOOTP - misc
BOOTP().answers(BOOTP()) == True
import random
random.seed(0x2807)
str(RandDHCPOptions()) == "[('WWW_server', '90.219.239.175')]"
value = ("hostname", "scapy")
dof = DHCPOptionsField("options", value)
dof.i2repr("", value) == '[hostname scapy]'
dof.i2m("", value) == '\x0cscapy'
= DHCP - build
s = str(IP()/UDP()/BOOTP(chaddr="00:01:02:03:04:05")/DHCP(options=[("message-type","discover"),"end"]))
s == 'E\x00\x01\x10\x00\x01\x00\x00@\x11{\xda\x7f\x00\x00\x01\x7f\x00\x00\x01\x00C\x00D\x00\xfcf\xea\x01\x01\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0000:01:02:03:04:0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00c\x82Sc5\x01\x01\xff'
= DHCP - dissection
p = IP(s)
DHCP in p and p[DHCP].options[0] == ('message-type', 1)
7608
7609
7610
7611
7612
7613
7614
7615
7616
7617
7618
7619
7620
7621
7622
7623
7624
7625
7626
7627
7628
7629
7630
7631
7632
7633
7634
7635
7636
7637
7638
7639
7640
7641
7642
7643
7644
7645
############
############
+ 802.11
= 802.11 - misc
PrismHeader().answers(PrismHeader()) == True
dpl = Dot11PacketList([Dot11()/LLC()/SNAP()/IP()/UDP()])
len(dpl) == 1
dpl_ether = dpl.toEthernet()
len(dpl_ether) == 1 and Ether in dpl_ether[0]
= Dot11 - build
s = str(Dot11())
s == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
= Dot11 - dissection
p = Dot11(s)
Dot11 in p and p.addr3 == "00:00:00:00:00:00"
p.mysummary() == '802.11 Management 0L 00:00:00:00:00:00 > 00:00:00:00:00:00'
= Dot11QoS - build
s = str(Dot11(type=2, subtype=8)/Dot11QoS())
s == '\x88\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
= Dot11QoS - dissection
p = Dot11(s)
Dot11QoS in p
= Dot11 - answers
query = Dot11(type=0, subtype=0)
Dot11(type=0, subtype=1).answers(query) == True
= Dot11 - misc
Dot11Elt(info="scapy").summary() == "SSID='scapy'"
7646
7647
7648
7649
7650
7651
7652
7653
7654
7655
7656
7657
7658
7659
7660
7661
7662
7663
7664
7665
7666
7667
7668
7669
7670
7671
7672
############
############
+ ASN.1
= MIB
import tempfile
fd, fname = tempfile.mkstemp()
os.write(fd, "-- MIB test\nscapy OBJECT IDENTIFIER ::= {test 2807}\n")
os.close(fd)
load_mib(fname)
len([k for k in conf.mib.iterkeys() if "scapy" in k]) == 1
= BER tests
BER_id_enc(42) == '*'
BER_id_enc(2807) == '\xbfw'
b = BERcodec_IPADDRESS()
r1 = b.enc("8.8.8.8")
r1 == '@\x04\x08\x08\x08\x08'
r2 = b.dec(r1)[0]
r2.val == '8.8.8.8'
7673
7674
7675
7676
7677
7678
7679
7680
7681
7682
7683
7684
7685
7686
7687
7688
7689
7690
7691
7692
7693
7694
7695
7696
7697
7698
7699
7700
7701
7702
7703
7704
7705
7706
7707
7708
7709
7710
7711
7712
7713
7714
7715
7716
7717
7718
7719
7720
7721
7722
7723
7724
7725
7726
7727
+ inet.py
= IPv4 - ICMPTimeStampField
test = ICMPTimeStampField("test", None)
value = test.any2i("", "07:28:28.07")
value == 26908070
test.i2repr("", value) == '7:28:28.70'
= IPv4 - UDP null checksum
IP(str(IP()/UDP()/Raw("\xff\xff\x01\x6a")))[UDP].chksum == 0xFFFF
= IPv4 - (IP|UDP|TCP|ICMP)Error
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/UDP()/DNS()
answer = IP(dst="192.168.0.254", src="192.168.0.2", ttl=1)/ICMP()/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/UDPerror()/DNS()
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/UDP()/DNS()
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/UDPerror()/DNS()
answer.answers(query) == True
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/TCP()
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/TCPerror()
answer.answers(query) == True
query = IP(dst="192.168.0.1", src="192.168.0.254", ttl=1)/ICMP()/"scapy"
answer = IP(dst="192.168.0.254", src="192.168.0.2")/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/ICMPerror()/"scapy"
answer.answers(query) == True
= IPv4 - utilities
l = overlap_frag(IP(dst="1.2.3.4")/ICMP()/("AB"*8), ICMP()/("CD"*8))
len(l) == 6
[len(str(IP.payload)) for p in l] == [38, 38, 38, 38, 38, 38]
[(p.frag, p.flags.MF) for p in [IP(str(p)) for p in l]] == [(0, True), (1, True), (2, True), (0, True), (1, True), (2, False)]
= IPv4 - traceroute utilities
ip_ttl = [("192.168.0.%d" % i, i) for i in xrange(1, 10)]
tr_packets = [ (IP(dst="192.168.0.1", src="192.168.0.254", ttl=ttl)/TCP(options=[("Timestamp", "00:00:%.2d.00" % ttl)])/"scapy",
IP(dst="192.168.0.254", src=ip)/ICMP(type=11)/IPerror(dst="192.168.0.1", src="192.168.0.254", ttl=0)/TCPerror()/"scapy")
for (ip, ttl) in ip_ttl ]
tr = TracerouteResult(tr_packets)
tr.get_trace() == {'192.168.0.1': {1: ('192.168.0.1', False), 2: ('192.168.0.2', False), 3: ('192.168.0.3', False), 4: ('192.168.0.4', False), 5: ('192.168.0.5', False), 6: ('192.168.0.6', False), 7: ('192.168.0.7', False), 8: ('192.168.0.8', False), 9: ('192.168.0.9', False)}}
result_show = ""
def test_show():
def write(s):
global result_show
result_show += s
mock_stdout = mock.Mock()
mock_stdout.write = write
sys.stdout = mock_stdout
tr = TracerouteResult(tr_packets)
tr.show()
7733
7734
7735
7736
7737
7738
7739
7740
7741
7742
7743
7744
7745
7746
7747
7748
7749
7750
7751
7752
7753
7754
7755
7756
7757
7758
7759
7760
7761
7762
7763
7764
7765
7766
7767
7768
7769
7770
7771
7772
7773
7774
7775
7776
7777
7778
7779
7780
expected = " 192.168.0.1:tcp80 \n"
expected += "1 192.168.0.1 11 \n"
expected += "2 192.168.0.2 11 \n"
expected += "3 192.168.0.3 11 \n"
expected += "4 192.168.0.4 11 \n"
expected += "5 192.168.0.5 11 \n"
expected += "6 192.168.0.6 11 \n"
expected += "7 192.168.0.7 11 \n"
expected += "8 192.168.0.8 11 \n"
expected += "9 192.168.0.9 11 \n"
index_result = result_show.index("1")
index_expected = expected.index("1")
assert(result_show[index_result:] == expected[index_expected:])
test_show()
@mock.patch("scapy.layers.inet.plt")
def test_timeskew_graph(mock_plt):
def fake_plot(data, **kwargs):
return data
mock_plt.plot = fake_plot
srl = SndRcvList([(a, a) for a in [IP(str(p[0])) for p in tr_packets]])
ret = srl.timeskew_graph("192.168.0.254")
len(ret) == 9
ret[0][1] == 0.0
test_timeskew_graph()
tr = TracerouteResult(tr_packets)
saved_AS_resolver = conf.AS_resolver
conf.AS_resolver = None
tr.make_graph()
print len(tr.graphdef) == 491
print tr.graphdef.startswith("digraph trace {") == True
print '"192.168.0.9" ->' in tr.graphdef == True
conf.AS_resolver = conf.AS_resolver
= IPv4 - reporting
result_IPID_count = ""
def test_IPID_count():
def write(s):
global result_IPID_count
result_IPID_count += s
mock_stdout = mock.Mock()
mock_stdout.write = write
sys.stdout = mock_stdout
random.seed(0x2807)
IPID_count([(IP()/UDP(), IP(id=random.randint(0, 65535))/UDP()) for i in range(3)])
lines = result_IPID_count.split("\n")
print len(lines) == 5
print lines[0] == "Probably 3 classes: [4613, 53881, 58437]"
test_IPID_count()