Newer
Older
% Regression tests for Scapy
# More informations at http://www.secdev.org/projects/UTscapy/
############
############
+ Informations on Scapy
= Get conf
~ conf command
* Dump the current configuration
conf
= List layers
~ conf command
ls()
= List commands
~ conf command
lsc()
= List contribs
import mock
result_list_contrib = ""
def test_list_contrib():
def write(s):
global result_list_contrib
result_list_contrib += s
mock_stdout = mock.Mock()
mock_stdout.write = write
bck_stdout = sys.stdout
sys.stdout = mock_stdout
list_contrib()
sys.stdout = bck_stdout
assert("http2 : HTTP/2 (RFC 7540, RFC 7541) status=loads" in result_list_contrib)
assert(result_list_contrib.split('\n') > 40)
test_list_contrib()
conf.debug_dissector = True
############
############
+ Scapy functions tests
= Interface related functions
get_if_raw_hwaddr(conf.iface)
get_if_raw_addr(conf.iface).encode("hex")
def get_dummy_interface():
"""Returns a dummy network interface"""
if WINDOWS:
data = {}
data["name"] = "dummy0"
data["description"] = "Does not exist"
data["win_index"] = -1
data["guid"] = "{0XX00000-X000-0X0X-X00X-00XXXX000XXX}"
data["invalid"] = True
return NetworkInterface(data)
else:
return "dummy0"
get_if_raw_addr(get_dummy_interface())
get_if_raw_addr6(conf.iface6)
= Test read_routes6() - default output
routes6 = read_routes6()
if WINDOWS:
route_add_loopback(routes6, True)
# Expected results:
# - one route if there is only the loopback interface
# - three routes if there is a network interface
if len(routes6):
iflist = get_if_list()
if WINDOWS:
route_add_loopback(ipv6=True, iflist=iflist)
# IPv6 seems disabled. Force a route to ::1
conf.route6.routes.append(("::1", 128, "::", LOOPBACK_NAME, ["::1"]))
True
= Test read_routes6() - check mandatory routes
if len(routes6):
assert(len(filter(lambda r: r[0] == "::1" and r[-1] == ["::1"], routes6)) >= 1)
if iflist >= 2:
assert(len(filter(lambda r: r[0] == "fe80::" and r[1] == 64, routes6)) >= 1)
len(filter(lambda r: in6_islladdr(r[0]) and r[1] == 128 and r[-1] == ["::1"], routes6)) >= 1
= Test ifchange()
conf.route6.ifchange(LOOPBACK_NAME, "::1/128")
True
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
= Prepare emulator
import mock
from mock import Mock
if WINDOWS:
# This fix when the windows doesn't have a size (run with a windows server)
mock.patch("pyreadline.console.console.Console.size").return_value = (300, 300)
@mock.patch("scapy.config.conf.readfunc")
def emulate_main_input(data, mock_readfunc):
global index
index = 0 # reset var
def readlineScapy(*args, **kargs):
global index
if len(data) == index:
r_data = "exit(1 if hasattr(sys, 'last_value') and sys.last_value is not None else 0)"
else:
r_data = data[index]
index +=1
print r_data
return r_data
mock_readfunc.side_effect = readlineScapy
def reduce_mock(self):
return (Mock, ())
mock_readfunc.__reduce__ = reduce_mock
sys.argv = ['']
def console_exit(code):
raise SystemExit(code)
exit_code = -1
try:
interact(mydict={"exit": console_exit})
except SystemExit as e:
exit_code = str(e)
pass
assert exit_code == "0"
= Test basic save_session and load_session
data = ["init_session(\"scapySession1\")",\
"test_value = \"8.8.8.8\"",\
"save_session()",\
"del test_value",\
"load_session()",\
"assert test_value == \"8.8.8.8\""]
emulate_main_input(data)
= Test save_session and load_session with fname
data = ["init_session(\"scapySession2\")",\
"test_value = 7",\
"save_session(fname=\"scapySaveSession.dat\")",\
"del test_value",\
"load_session(fname=\"scapySaveSession.dat\")",\
"assert test_value == 7"]
emulate_main_input(data)
= Test pickling with packets
data = ["init_session(\"scapySession1\")",\
"test_value = IP(src=\"127.0.0.1\", dst=\"8.8.8.8\")",\
"test_value2 = ICMPv6EchoReply(data=\"testData@%!\")",\
"save_session()",\
"del test_value",\
"load_session()",\
"assert test_value.src == \"127.0.0.1\"",\
"assert test_value.dst == \"8.8.8.8\"",\
"assert test_value2.data == \"testData@%!\""]
emulate_main_input(data)
= Clean up session files
try:
os.remove("scapySaveSession.dat")
except OSError:
pass
= Test temporary file creation
tmpfile = get_temp_file(autoext=".ut")
if WINDOWS:
assert("scapy" in tmpfile and tmpfile.startswith('C:\\Users\\appveyor\\AppData\\Local\\Temp'))
else:
import platform
IS_PYPY = platform.python_implementation() == "PyPy"
assert("scapy" in tmpfile and (IS_PYPY == True or "/tmp/" in tmpfile))
assert(conf.temp_files[0].endswith(".ut"))
assert(conf.temp_files.pop())
assert(len(conf.temp_files) == 0)
= Test sane function
sane("A\x00\xFFB") == "A..B"
= Test linehexdump function
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
assert(linehexdump(Ether(src="00:01:02:03:04:05"), dump=True) == "FFFFFFFFFFFF0001020304059000 ..............")
conf.color_theme = conf_color_theme
= Test chexdump function
chexdump(Ether(src="00:01:02:02:04:05"), dump=True) == "0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x02, 0x02, 0x04, 0x05, 0x90, 0x00"
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
= Test hexdiff function
~ not_pypy
import mock
result_hexdiff = ""
def test_hexdiff():
def write(s):
global result_hexdiff
result_hexdiff += s
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
mock_stdout = mock.Mock()
mock_stdout.write = write
bck_stdout = sys.stdout
sys.stdout = mock_stdout
hexdiff("abcde", "abCde")
sys.stdout = bck_stdout
conf.interactive = True
conf.color_theme = conf_color_theme
expected = "0000 61 62 63 64 65 abcde\n"
expected += " 0000 61 62 43 64 65 abCde\n"
assert(result_hexdiff == expected)
test_hexdiff()
= Test fletcher16_* functions
assert(fletcher16_checksum("\x28\x07") == 22319)
assert(fletcher16_checkbytes("ABCDEF", 2) == "\x89\x67")
= Test zerofree_randstring function
random.seed(0x2807)
zerofree_randstring(4) == "\xd2\x12\xe4\x5b"
= Test export_object and import_object functions
import mock
result_export_object = ""
def test_export_import_object():
def write(s):
global result_export_object
result_export_object += s
mock_stdout = mock.Mock()
mock_stdout.write = write
bck_stdout = sys.stdout
sys.stdout = mock_stdout
export_object(2807)
sys.stdout = bck_stdout
assert(result_export_object.endswith("eNprYPL9zqUHAAdrAf8=\n\n"))
assert(import_object(result_export_object) == 2807)
test_export_import_object()
= Test tex_escape function
= Test colgen function
f = colgen(range(3))
len([f.next() for i in range(2)]) == 2
= Test incremental_label function
f = incremental_label()
[f.next() for i in range(2)] == ["tag00000", "tag00001"]
= Test corrupt_* functions
assert(corrupt_bytes("ABCDE") == "ABCDW")
assert(sane(corrupt_bytes("ABCDE", n=3)) == "A.8D4")
assert(corrupt_bits("ABCDE") == "EBCDE")
assert(sane(corrupt_bits("ABCDE", n=3)) == "AF.EE")
= Test save_object and load_object functions
import tempfile
fd, fname = tempfile.mkstemp()
save_object(fname, 2807)
assert(load_object(fname) == 2807)
= Test whois function
if not WINDOWS:
result = whois("193.0.6.139")
assert("inetnum" in result and "Amsterdam" in result)
= Test manuf DB methods
~ manufdb
assert(MANUFDB._resolve_MAC("00:00:0F:01:02:03") == "Next:01:02:03")
assert(MANUFDB._get_short_manuf("00:00:0F:01:02:03") == "Next")
= Test utility functions - network related
~ netaccess
atol("www.secdev.org") == 3642339845
############
############
+ Basic tests
* Those test are here mainly to check nothing has been broken
* and to catch Exceptions
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
= Packet class methods
p = IP()/ICMP()
ret = p.do_build_ps()
assert(ret[0] == "@\x00\x00\x00\x00\x01\x00\x00@\x01\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\x00\x00\x00\x00\x00\x00")
assert(len(ret[1]) == 2)
assert(p[ICMP].firstlayer() == p)
assert(p.command() == "IP()/ICMP()")
p.decode_payload_as(UDP)
assert(p.sport == 2048 and p.dport == 63487)
= hide_defaults
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
p = IP(ttl=64)/ICMP()
assert(repr(p) == "<IP frag=0 ttl=64 proto=icmp |<ICMP |>>")
p.hide_defaults()
assert(repr(p) == "<IP frag=0 proto=icmp |<ICMP |>>")
conf.color_theme = conf_color_theme
= split_layers
p = IP()/ICMP()
s = str(p)
split_layers(IP, ICMP, proto=1)
assert(Raw in IP(s))
bind_layers(IP, ICMP, frag=0, proto=1)
= fuzz
~ not_pypy
random.seed(0x2807)
str(fuzz(IP()/ICMP())) == '\xe5S\x00\x1c\x9dC \x007\x01(H\x7f\x00\x00\x01\x7f\x00\x00\x01*\xdb\xf7,9\x8e\xa4i'
~ basic IP TCP UDP NTP LLC SNAP Dot11
IP()/TCP()
Ether()/IP()/UDP()/NTP()
Dot11()/LLC()/SNAP()/IP()/TCP()/"XXX"
IP(ttl=25)/TCP(sport=12, dport=42)
= Manipulating some packets
~ basic IP TCP
a=IP(ttl=4)/TCP()
a.ttl
a.ttl=10
del(a.ttl)
a.ttl
TCP in a
a[TCP]
a[TCP].dport=[80,443]
a
assert(a.copy().time == a.time)
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
a=3
= Checking overloads
~ basic IP TCP Ether
a=Ether()/IP()/TCP()
a.proto
_ == 6
= sprintf() function
~ basic sprintf Ether IP UDP NTP
a=Ether()/IP()/IP(ttl=4)/UDP()/NTP()
a.sprintf("%type% %IP.ttl% %#05xr,UDP.sport% %IP:2.ttl%")
_ in [ '0x800 64 0x07b 4', 'IPv4 64 0x07b 4']
= sprintf() function
~ basic sprintf IP TCP SNAP LLC Dot11
* This test is on the conditionnal substring feature of <tt>sprintf()</tt>
a=Dot11()/LLC()/SNAP()/IP()/TCP()
a.sprintf("{IP:{TCP:flags=%TCP.flags%}{UDP:port=%UDP.ports%} %IP.src%}")
_ == 'flags=S 127.0.0.1'
= haslayer function
~ basic haslayer IP TCP ICMP ISAKMP
x=IP(id=1)/ISAKMP_payload_SA(prop=ISAKMP_payload_SA(prop=IP()/ICMP()))/TCP()
TCP in x, ICMP in x, IP in x, UDP in x
_ == (True,True,True,False)
= getlayer function
~ basic getlayer IP ISAKMP UDP
x=IP(id=1)/ISAKMP_payload_SA(prop=IP(id=2)/UDP(dport=1))/IP(id=3)/UDP(dport=2)
x[IP]
x[IP:2]
x[IP:3]
x.getlayer(IP,3)
x.getlayer(IP,4)
x[UDP]
x[UDP:1]
x[UDP:2]
assert(x[IP].id == 1 and x[IP:2].id == 2 and x[IP:3].id == 3 and
x.getlayer(IP).id == 1 and x.getlayer(IP,3).id == 3 and
x.getlayer(IP,4) == None and
x[UDP].dport == 1 and x[UDP:2].dport == 2)
try:
x[IP:4]
except IndexError:
True
else:
False
= equality
~ basic
w=Ether()/IP()/UDP(dport=53)
y=Ether()/IP()/UDP(dport=4)
z=Ether()/IP()/UDP()/NTP()
t=Ether()/IP()/TCP()
x==y, x==z, x==t, y==z, y==t, z==t, w==x
_ == (False, False, False, False, False, False, True)
= answers
~ basic
a1, a2 = "1.2.3.4", "5.6.7.8"
p1 = IP(src=a1, dst=a2)/ICMP(type=8)
p2 = IP(src=a2, dst=a1)/ICMP(type=0)
assert p1.hashret() == p2.hashret()
assert not p1.answers(p2)
assert p2.answers(p1)
assert p1 > p2
assert p2 < p1
assert p1 == p1
conf_back = conf.checkIPinIP
conf.checkIPinIP = True
px = [IP()/p1, IPv6()/p1]
assert not any(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert not any(p2.answers(p) for p in px)
conf.checkIPinIP = False
assert all(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert all(p2.answers(p) for p in px)
conf.checkIPinIP = conf_back
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
prt1, prt2 = 12345, 54321
s1, s2 = 2767216324, 3845532842
p1 = IP(src=a1, dst=a2)/TCP(flags='SA', seq=s1, ack=s2, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='R', seq=s2, ack=0, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='RA', seq=0, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+10, sport=prt2, dport=prt1)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1+9, ack=s2+10, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)
############
############
+ Tests on padding
= Padding assembly
str(Padding("abc"))
assert( _ == "abc" )
str(Padding("abc")/Padding("def"))
assert( _ == "abcdef" )
str(Raw("ABC")/Padding("abc")/Padding("def"))
assert( _ == "ABCabcdef" )
str(Raw("ABC")/Padding("abc")/Raw("DEF")/Padding("def"))
= Padding and length computation
IP(str(IP()/Padding("abc")))
assert( _.len == 20 and len(_) == 23 )
IP(str(IP()/Raw("ABC")/Padding("abc")))
assert( _.len == 23 and len(_) == 26 )
IP(str(IP()/Raw("ABC")/Padding("abc")/Padding("def")))
assert( _.len == 23 and len(_) == 29 )
= PadField test
~ PadField padding
class TestPad(Packet):
fields_desc = [ PadField(StrNullField("st", ""),4), StrField("id", "")]
TestPad() == TestPad(str(TestPad()))
+ Tests on default value changes mechanism
= Creation of an IPv3 class from IP class with different default values
class IPv3(IP):
version = 3
ttl = 32
= Test of IPv3 class
a = IPv3()
a.version, a.ttl
assert(_ == (3,32))
str(a)
assert(_ == b'5\x00\x00\x14\x00\x01\x00\x00 \x00\xac\xe7\x7f\x00\x00\x01\x7f\x00\x00\x01')
############
############
+ ISAKMP transforms test
= ISAKMP creation
~ IP UDP ISAKMP
p=IP(src='192.168.8.14',dst='10.0.0.1')/UDP()/ISAKMP()/ISAKMP_payload_SA(prop=ISAKMP_payload_Proposal(trans=ISAKMP_payload_Transform(transforms=[('Encryption', 'AES-CBC'), ('Hash', 'MD5'), ('Authentication', 'PSK'), ('GroupDesc', '1536MODPgr'), ('KeyLength', 256), ('LifeType', 'Seconds'), ('LifeDuration', 86400L)])/ISAKMP_payload_Transform(res2=12345,transforms=[('Encryption', '3DES-CBC'), ('Hash', 'SHA'), ('Authentication', 'PSK'), ('GroupDesc', '1024MODPgr'), ('LifeType', 'Seconds'), ('LifeDuration', 86400L)])))
p.show()
p
= ISAKMP manipulation
~ ISAKMP
p[ISAKMP_payload_Transform:2]
_.res2 == 12345
= ISAKMP assembly
~ ISAKMP
hexdump(p)
str(p) == b"E\x00\x00\x96\x00\x01\x00\x00@\x11\xa7\x9f\xc0\xa8\x08\x0e\n\x00\x00\x01\x01\xf4\x01\xf4\x00\x82\xbf\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00z\x00\x00\x00^\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00R\x01\x01\x00\x00\x03\x00\x00'\x00\x01\x00\x00\x80\x01\x00\x07\x80\x02\x00\x01\x80\x03\x00\x01\x80\x04\x00\x05\x80\x0e\x01\x00\x80\x0b\x00\x01\x00\x0c\x00\x03\x01Q\x80\x00\x00\x00#\x00\x0109\x80\x01\x00\x05\x80\x02\x00\x02\x80\x03\x00\x01\x80\x04\x00\x02\x80\x0b\x00\x01\x00\x0c\x00\x03\x01Q\x80"
= ISAKMP disassembly
~ ISAKMP
q=IP(str(p))
q.show()
q[ISAKMP_payload_Transform:2]
_.res2 == 12345
############
############
+ TFTP tests
= TFTP Options
x=IP()/UDP(sport=12345)/TFTP()/TFTP_RRQ(filename="fname")/TFTP_Options(options=[TFTP_Option(oname="blksize", value="8192"),TFTP_Option(oname="other", value="othervalue")])
assert( str(x) == b'E\x00\x00H\x00\x01\x00\x00@\x11|\xa2\x7f\x00\x00\x01\x7f\x00\x00\x0109\x00E\x004B6\x00\x01fname\x00octet\x00blksize\x008192\x00other\x00othervalue\x00' )
y=IP(str(x))
y[TFTP_Option].oname
y[TFTP_Option:2].oname
assert(len(y[TFTP_Options].options) == 2 and y[TFTP_Option].oname == "blksize")
############
############
+ Dot11 tests
= WEP tests
assert(_ == b'\x00\x00\x00\x00\xe3OjYLw\xc3x_%\xd0\xcf\xdeu-\xc3pH#\x1eK\xae\xf5\xde\xe7\xb8\x1d,\xa1\xfe\xe83\xca\xe1\xfe\xbd\xfe\xec\x00)T`\xde.\x93Td\x95C\x0f\x07\xdd')
Dot11WEP(_)
assert(TCP in _ and _[TCP].seq == 12345678)
############
############
+ SNMP tests
= SNMP assembling
~ SNMP ASN1
str(SNMP())
assert(_ == b'0\x18\x02\x01\x01\x04\x06public\xa0\x0b\x02\x01\x00\x02\x01\x00\x02\x01\x000\x00')
SNMP(version="v2c", community="ABC", PDU=SNMPbulk(id=4,varbindlist=[SNMPvarbind(oid="1.2.3.4",value=ASN1_INTEGER(7)),SNMPvarbind(oid="4.3.2.1.2.3",value=ASN1_IA5_STRING("testing123"))]))
str(_)
assert(_ == b'05\x02\x01\x01\x04\x03ABC\xa5+\x02\x01\x04\x02\x01\x00\x02\x01\x000 0\x08\x06\x03*\x03\x04\x02\x01\x070\x14\x06\x06\x81#\x02\x01\x02\x03\x16\ntesting123')
x=SNMP(b'0y\x02\x01\x00\x04\x06public\xa2l\x02\x01)\x02\x01\x00\x02\x01\x000a0!\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x07\n\x86\xde\xb78\x04\x0b172.31.19.20#\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x07\n\x86\xde\xb76\x04\r255.255.255.00\x17\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x05\n\x86\xde\xb9`\x02\x01\x01')
x.show()
assert(x.community=="public" and x.version == 0)
assert(x.PDU.id == 41 and len(x.PDU.varbindlist) == 3)
assert(x.PDU.varbindlist[0].oid == "1.3.6.1.4.1.253.8.64.4.2.1.7.10.14130104")
assert(x.PDU.varbindlist[0].value == "172.31.19.2")
assert(x.PDU.varbindlist[2].oid == "1.3.6.1.4.1.253.8.64.4.2.1.5.10.14130400")
assert(x.PDU.varbindlist[2].value == 1)
############
############
+ Network tests
* Those tests need network access
= Sending and receiving an ICMP
~ netaccess IP ICMP
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3)
conf.debug_dissector = old_debug_dissector
assert x[IP].ottl() in [32, 64, 128, 255]
assert 0 <= x[IP].hops() <= 126
x is not None and ICMP in x and x[ICMP].type == 0
= DNS request
~ netaccess IP UDP DNS
* A possible cause of failure could be that the open DNS (resolver1.opendns.com)
* is not reachable or down.
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
dns_ans = sr1(IP(dst="resolver1.opendns.com")/UDP()/DNS(rd=1,qd=DNSQR(qname="www.slashdot.com")),timeout=5)
* This test retries on failure because it often fails
import time
import socket
success = False
for i in xrange(5):
try:
IP(src="8.8.8.8").whois()
time.sleep(2)
else:
success = True
break
assert success
* This test retries on failure because it often fails
success = False
for i in xrange(5):
try:
ret = conf.AS_resolver.resolve("8.8.8.8", "8.8.4.4")
time.sleep(2)
else:
success = True
break
= AS resolver - IPv6
~ netaccess IP
* This test retries on failure because it often fails
ret = list()
success = False
as_resolver6 = AS_resolver6()
for i in xrange(5):
try:
ret = as_resolver6.resolve("2001:4860:4860::8888", "2001:4860:4860::4444")
except socket.error:
time.sleep(2)
else:
success = True
break
assert (len(ret) == 2)
all(x[1] == 15169 for x in ret)
success = False
for i in xrange(5):
try:
ret = AS_resolver_riswhois().resolve("8.8.8.8")
except socket.error:
time.sleep(2)
else:
success = True
break
assert (len(ret) == 1)
all(x[1] == "AS15169" for x in ret)
############
############
+ More complex tests
= Implicit logic
~ IP TCP
a=IP(ttl=(5,10))/TCP(dport=[80,443])
[p for p in a]
len(_) == 12
############
############
+ Real usages
= Port scan
~ netaccess IP TCP
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
ans,unans=sr(IP(dst="www.google.com/30")/TCP(dport=[80,443]),timeout=2)
ans.make_table(lambda (s,r): (s.dst, s.dport, r.sprintf("{TCP:%TCP.flags%}{ICMP:%ICMP.code%}")))
= Traceroute function
~ netaccess
* Let's test traceroute
traceroute("www.slashdot.org")
ans,unans=_
= Result manipulation
~ netaccess
ans.nsummary()
s,r=ans[0]
s.show()
s.show(2)
= DNS packet manipulation
~ netaccess IP UDP DNS
dns_ans.show()
dns_ans.show2()
dns_ans[DNS].an.show()
dns_ans2 = IP(str(dns_ans))
DNS in dns_ans2
assert(str(dns_ans2) == str(dns_ans))
dns_ans2.qd.qname = "www.secdev.org."
* We need to recalculate these values
del(dns_ans2[IP].len)
del(dns_ans2[IP].chksum)
del(dns_ans2[UDP].len)
del(dns_ans2[UDP].chksum)
assert(b"\x03www\x06secdev\x03org\x00" in str(dns_ans2))
assert(DNS in IP(str(dns_ans2)))
= Arping
~ netaccess
* This test assumes the local network is a /24. This is bad.
conf.route.route("0.0.0.0")[2]
arping(_+"/24")
def _send_or_sniff(pkt, timeout, flt, pid, fork, t_other=None):
assert pid != -1
if pid == 0:
time.sleep(1)
(sendp if isinstance(pkt, (Ether, Dot3)) else send)(pkt)
if fork:
os._exit(0)
else:
return
# We do not want to crash when a packet cannot be parsed
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
timeout=timeout, filter=flt,
stop_filter=lambda p: pkt.__class__ in p and str(p[pkt.__class__]) == spkt
)
conf.debug_dissector = old_debug_dissector
if fork:
os.waitpid(pid, 0)
else:
t_other.join()
assert str(pkt) in (str(p[pkt.__class__]) for p in pkts if pkt.__class__ in p)
def send_and_sniff(pkt, timeout=2, flt=None):
"""Send a packet, sniff, and check the packet has been seen"""
if hasattr(os, "fork"):
_send_or_sniff(pkt, timeout, flt, os.fork(), True)
else:
from threading import Thread
def run_function(pkt, timeout, flt, pid, thread, results):
_send_or_sniff(pkt, timeout, flt, pid, False, t_other=thread)
results.put(True)
results = Queue.Queue()
t_parent = Thread(target=run_function, args=(pkt, timeout, flt, 0, None, results))
t_child = Thread(target=run_function, args=(pkt, timeout, flt, 1, t_parent, results))
t_parent.start()
t_child.start()
t_parent.join()
t_child.join()
assert results.qsize() >= 2
while not results.empty():
assert results.get()
send_and_sniff(IP(dst="secdev.org")/ICMP())
send_and_sniff(IP(dst="secdev.org")/ICMP(), flt="icmp")
send_and_sniff(Ether()/IP(dst="secdev.org")/ICMP())
############
############
+ ManuFDB tests
= __repr__
if conf.manufdb:
assert conf.manufdb._resolve_MAC("00:00:63") == "HP"
else:
True
############
############
+ Automaton tests
= Simple automaton
~ automaton
class ATMT1(Automaton):
def parse_args(self, init, *args, **kargs):
Automaton.parse_args(self, *args, **kargs)
self.init = init
@ATMT.state(initial=1)
def BEGIN(self):
@ATMT.state()
def MAIN(self, s):
return s
@ATMT.condition(MAIN, prio=-1)
def go_to_END(self, s):
if len(s) > 20:
raise self.END(s).action_parameters(s)
@ATMT.condition(MAIN)
def trA(self, s):
if s.endswith("b"):
raise self.MAIN(s+"a")
@ATMT.condition(MAIN)
def trB(self, s):
if s.endswith("a"):
raise self.MAIN(s*2+"b")
@ATMT.state(final=1)
def END(self, s):
return s
@ATMT.action(go_to_END)
def action_test(self, s):
self.result = s
= Simple automaton Tests
~ automaton
a=ATMT1(init="a", ll=lambda: None, recvsock=lambda: None)
a.run()
assert( _ == 'aabaaababaaabaaababab' )
a.result
assert( _ == 'aabaaababaaabaaababab' )
a=ATMT1(init="b", ll=lambda: None, recvsock=lambda: None)
a.run()
assert( _ == 'babababababababababababababab' )
a.result
assert( _ == 'babababababababababababababab' )
= Simple automaton stuck test
~ automaton
try:
ATMT1(init="", ll=lambda: None, recvsock=lambda: None).run()
except Automaton.Stuck:
True
else:
False
= Automaton state overloading
~ automaton
class ATMT2(ATMT1):
@ATMT.state()
def MAIN(self, s):
return "c"+ATMT1.MAIN(self, s).run()
a=ATMT2(init="a", ll=lambda: None, recvsock=lambda: None)
a.run()
assert( _ == 'ccccccacabacccacababacccccacabacccacababab' )
a.result
assert( _ == 'ccccccacabacccacababacccccacabacccacababab' )
a=ATMT2(init="b", ll=lambda: None, recvsock=lambda: None)
a.run()
assert( _ == 'cccccbaccbabaccccbaccbabab')
a.result
assert( _ == 'cccccbaccbabaccccbaccbabab')
= Automaton condition overloading
~ automaton
class ATMT3(ATMT2):
@ATMT.condition(ATMT1.MAIN)
def trA(self, s):
if s.endswith("b"):
raise self.MAIN(s+"da")
a=ATMT3(init="a", debug=2, ll=lambda: None, recvsock=lambda: None)
a.run()
assert( _ == 'cccccacabdacccacabdabda')
a.result
assert( _ == 'cccccacabdacccacabdabda')
a=ATMT3(init="b", ll=lambda: None, recvsock=lambda: None)
a.run()
assert( _ == 'cccccbdaccbdabdaccccbdaccbdabdab' )
a.result
assert( _ == 'cccccbdaccbdabdaccccbdaccbdabdab' )
= Automaton action overloading
~ automaton
class ATMT4(ATMT3):
@ATMT.action(ATMT1.go_to_END)
def action_test(self, s):
self.result = "e"+s+"e"
a=ATMT4(init="a", ll=lambda: None, recvsock=lambda: None)
a.run()
assert( _ == 'cccccacabdacccacabdabda')
a.result
assert( _ == 'ecccccacabdacccacabdabdae')
a=ATMT4(init="b", ll=lambda: None, recvsock=lambda: None)
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
a.run()
assert( _ == 'cccccbdaccbdabdaccccbdaccbdabdab' )
a.result
assert( _ == 'ecccccbdaccbdabdaccccbdaccbdabdabe' )
= Automaton priorities
~ automaton
class ATMT5(Automaton):
@ATMT.state(initial=1)
def BEGIN(self):
self.res = "J"
@ATMT.condition(BEGIN, prio=1)
def tr1(self):
self.res += "i"
raise self.END()
@ATMT.condition(BEGIN)
def tr2(self):
self.res += "p"
@ATMT.condition(BEGIN, prio=-1)
def tr3(self):
self.res += "u"
@ATMT.action(tr1)
def ac1(self):
self.res += "e"
@ATMT.action(tr1, prio=-1)
def ac2(self):
self.res += "t"
@ATMT.action(tr1, prio=1)
def ac3(self):
self.res += "r"
@ATMT.state(final=1)
def END(self):