Skip to content
Snippets Groups Projects
regression.uts 405 KiB
Newer Older
Phil's avatar
Phil committed
% Regression tests for Scapy

# More informations at http://www.secdev.org/projects/UTscapy/

############
############
+ Informations on Scapy

= Get conf
~ conf command
* Dump the current configuration
conf

gpotter2's avatar
gpotter2 committed
IP().src

Phil's avatar
Phil committed
= List layers
~ conf command
ls()

= List commands
~ conf command
lsc()

= List contribs
def test_list_contrib():
    with ContextManagerCaptureOutput() as cmco:
        list_contrib()
        result_list_contrib = cmco.get_output()
    assert("http2               : HTTP/2 (RFC 7540, RFC 7541)              status=loads" in result_list_contrib)
    assert(len(result_list_contrib.split('\n')) > 40)
Phil's avatar
Phil committed
= Configuration
~ conf
conf.debug_dissector = True
Phil's avatar
Phil committed


############
############
+ Scapy functions tests

= Interface related functions

get_if_raw_hwaddr(conf.iface)

bytes_hex(get_if_raw_addr(conf.iface))
def get_dummy_interface():
    """Returns a dummy network interface"""
    if WINDOWS:
        data = {}
        data["name"] = "dummy0"
        data["description"] = "Does not exist"
        data["win_index"] = -1
        data["guid"] = "{1XX00000-X000-0X0X-X00X-00XXXX000XXX}"
        data["invalid"] = True
        return NetworkInterface(data)
    else:
        return "dummy0"

get_if_raw_addr(get_dummy_interface())
get_if_list()

get_if_raw_addr6(conf.iface6)

= Test read_routes6() - default output

routes6 = read_routes6()
if WINDOWS:
    route_add_loopback(routes6, True)

# Expected results:
# - one route if there is only the loopback interface
# - three routes if there is a network interface

if routes6:
    iflist = get_if_list()
    if WINDOWS:
        route_add_loopback(ipv6=True, iflist=iflist)
    if iflist == [LOOPBACK_NAME]:
gpotter2's avatar
gpotter2 committed
        len(routes6) == 1
    elif len(iflist) >= 2:
gpotter2's avatar
gpotter2 committed
        len(routes6) >= 3
gpotter2's avatar
gpotter2 committed
        False
    # IPv6 seems disabled. Force a route to ::1
    conf.route6.routes.append(("::1", 128, "::", LOOPBACK_NAME, ["::1"]))
    True

= Test read_routes6() - check mandatory routes

if len(routes6):
    assert(len([r for r in routes6 if r[0] == "::1" and r[-1] == ["::1"]]) >= 1)
    if len(iflist) >= 2:
        assert(len([r for r in routes6 if r[0] == "fe80::" and r[1] == 64]) >= 1)
        len([r for r in routes6 if in6_islladdr(r[0]) and r[1] == 128 and r[-1] == ["::1"]]) >= 1
= Test ifchange()
conf.route6.ifchange(LOOPBACK_NAME, "::1/128")
True

gpotter2's avatar
gpotter2 committed
############
############
+ Main.py tests

= Pickle and unpickle a packet
import scapy.modules.six as six
a = IP(dst="192.168.0.1")/UDP()
b = six.moves.cPickle.dumps(a)
c = six.moves.cPickle.loads(b)
assert c[IP].dst == "192.168.0.1"
assert str(c) == str(a)
from scapy.main import _usage
try:
    _usage()
    assert False
except SystemExit:
    assert True

= Session test

# This is automatic when using the console
def get_var(var):
    return six.moves.builtins.__dict__["scapy_session"][var]

def set_var(var, value):
    six.moves.builtins.__dict__["scapy_session"][var] = value

def del_var(var):
    del(six.moves.builtins.__dict__["scapy_session"][var])

init_session(None, {"init_value": 123})
set_var("test_value", "8.8.8.8") # test_value = "8.8.8.8"
save_session()
del_var("test_value")
load_session()
update_session()
assert get_var("test_value") == "8.8.8.8" #test_value == "8.8.8.8"
assert get_var("init_value") == 123
 
= Session test with fname
init_session("scapySession2")
set_var("test_value", IP(dst="192.168.0.1")) # test_value = IP(dst="192.168.0.1")
save_session(fname="scapySession1.dat")
del_var("test_value")
gpotter2's avatar
gpotter2 committed

set_var("z", True) #z = True
load_session(fname="scapySession1.dat")
try:
    get_var("z")
    assert False
except:
    pass
gpotter2's avatar
gpotter2 committed

set_var("z", False) #z = False
update_session(fname="scapySession1.dat")
assert get_var("test_value").dst == "192.168.0.1" #test_value.dst == "192.168.0.1"
assert not get_var("z")
gpotter2's avatar
gpotter2 committed

= Clear session files
gpotter2's avatar
gpotter2 committed

os.remove("scapySession1.dat")
gpotter2's avatar
gpotter2 committed

= Test temporary file creation
tmpfile = get_temp_file(autoext=".ut")
    assert("scapy" in tmpfile and tmpfile.lower().startswith('c:\\users\\appveyor\\appdata\\local\\temp'))
    BYPASS_TMP = platform.python_implementation().lower() == "pypy" or DARWIN
    assert("scapy" in tmpfile and (BYPASS_TMP == True or "/tmp/" in tmpfile))
assert(conf.temp_files[0].endswith(".ut"))
scapy_delete_temp_files()
assert(len(conf.temp_files) == 0)
= Emulate interact()

import mock, sys
try:
    import IPython
except:
    code_interact_import = "scapy.main.code.interact"
else:
    code_interact_import = "scapy.main.IPython.terminal.embed.InteractiveShellEmbed"

@mock.patch(code_interact_import)
def interact_emulator(code_int):
    try:
        code_int = lambda *args, **kwargs: lambda: None
        interact(argv=["-s scapy1"], mybanner="What a test")
        return True
    except:
        raise
        return False

assert interact_emulator()
sys.ps1 = ">>> "

= Test sane function
sane("A\x00\xFFB") == "A..B"
= Test lhex function
assert(lhex(42) == "0x2a")
assert(lhex((28,7)) == "(0x1c, 0x7)")
assert(lhex([28,7]) == "[0x1c, 0x7]")
= Test linehexdump function
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
assert(linehexdump(Ether(src="00:01:02:03:04:05"), dump=True) == "FFFFFFFFFFFF0001020304059000 ..............")
conf.color_theme = conf_color_theme
= Test chexdump function
chexdump(Ether(src="00:01:02:02:04:05"), dump=True) == "0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x02, 0x02, 0x04, 0x05, 0x90, 0x00"
= Test repr_hex function
repr_hex("scapy") == "7363617079"

= Test hexstr function
gpotter2's avatar
gpotter2 committed
hexstr(b"A\x00\xFFB") == "41 00 ff 42  A..B"
= Test fletcher16 functions
assert(fletcher16_checksum(b"\x28\x07") == 22319)
assert(fletcher16_checkbytes(b"\x28\x07", 1) == "\xaf(")
= Test hexdiff function
~ not_pypy
def test_hexdiff():
    conf_color_theme = conf.color_theme
    conf.color_theme = BlackAndWhite()
    with ContextManagerCaptureOutput() as cmco:
        hexdiff("abcde", "abCde")
        result_hexdiff = cmco.get_output()
    conf.interactive = True
    conf.color_theme = conf_color_theme
    expected  = "0000        61 62 63 64 65                                     abcde\n"
    expected += "     0000   61 62 43 64 65                                     abCde\n"
    assert(result_hexdiff == expected)

test_hexdiff()

= Test mysummary functions - Ether

Ether(dst="ff:ff:ff:ff:ff:ff", src="ff:ff:ff:ff:ff:ff", type=0x9000)
assert _.mysummary() in ['ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (%s)' % loop
                         for loop in ['0x9000', 'LOOP']]
assert(fletcher16_checksum(b"\x28\x07") == 22319)
assert(fletcher16_checkbytes(b"ABCDEF", 2) == "\x89\x67")

= Test zerofree_randstring function
random.seed(0x2807)
zerofree_randstring(4) in [b"\xd2\x12\xe4\x5b", b'\xd3\x8b\x13\x12']

= Test export_object and import_object functions
import mock
def test_export_import_object():
    with ContextManagerCaptureOutput() as cmco:
        export_object(2807)
gpotter2's avatar
gpotter2 committed
        result_export_object = cmco.get_output(eval_bytes=True)
    assert(result_export_object.startswith("eNprYPL9zqUHAAdrAf8=\n"))
    assert(import_object(result_export_object) == 2807)

test_export_import_object()

= Test tex_escape function
tex_escape("$#_") == "\\$\\#\\_"

f = colgen(range(3))
assert(len([next(f) for i in range(2)]) == 2)
= Test incremental_label function
f = incremental_label()
assert([next(f) for i in range(2)] == ["tag00000", "tag00001"])
import random
random.seed(0x2807)
assert(corrupt_bytes("ABCDE") in [b"ABCDW", b"ABCDX"])
assert(sane(corrupt_bytes("ABCDE", n=3)) in ["A.8D4", ".2.DE"])
assert(corrupt_bits("ABCDE") in [b"EBCDE", b"ABCDG"])
assert(sane(corrupt_bits("ABCDE", n=3)) in ["AF.EE", "QB.TE"])

= Test save_object and load_object functions
import tempfile
fd, fname = tempfile.mkstemp()
save_object(fname, 2807)
assert(load_object(fname) == 2807)

= Test whois function
if not WINDOWS:
    result = whois("193.0.6.139")
    assert(b"inetnum" in result and b"Amsterdam" in result)
assert(conf.manufdb._resolve_MAC("00:00:0F:01:02:03") == "Next:01:02:03")
assert(conf.manufdb._get_short_manuf("00:00:0F:01:02:03") == "Next")
assert(in6_addrtovendor("fe80::0200:0fff:fe01:0203").lower().startswith("next"))

= Test utility functions - network related
~ netaccess

atol("www.secdev.org") == 3642339845

= Test autorun functions

ret = autorun_get_text_interactive_session("IP().src")
assert(ret == (">>> IP().src\n'127.0.0.1'\n", '127.0.0.1'))

ret = autorun_get_html_interactive_session("IP().src")
assert(ret == ("<span class=prompt>&gt;&gt;&gt; </span>IP().src\n'127.0.0.1'\n", '127.0.0.1'))

ret = autorun_get_latex_interactive_session("IP().src")
assert(ret == ("\\textcolor{blue}{{\\tt\\char62}{\\tt\\char62}{\\tt\\char62} }IP().src\n'127.0.0.1'\n", '127.0.0.1'))

= Test utility TEX functions

assert tex_escape("{scapy}\\^$~#_&%|><") == "{\\tt\\char123}scapy{\\tt\\char125}{\\tt\\char92}\\^{}\\${\\tt\\char126}\\#\\_\\&\\%{\\tt\\char124}{\\tt\\char62}{\\tt\\char60}"

a = colgen(1, 2, 3)
assert next(a) == (1, 2, 2)
assert next(a) == (1, 3, 3)
assert next(a) == (2, 2, 1)
assert next(a) == (2, 3, 2)
assert next(a) == (2, 1, 3)
assert next(a) == (3, 3, 1)
assert next(a) == (3, 1, 2)
assert next(a) == (3, 2, 3)
= Test config file functions

saved_conf_verb = conf.verb
fd, fname = tempfile.mkstemp()
os.write(fd, b"conf.verb = 42\n")
os.close(fd)
from scapy.main import _read_config_file
_read_config_file(fname, globals(), locals())
assert(conf.verb == 42)
conf.verb = saved_conf_verb

gpotter2's avatar
gpotter2 committed
= Test CacheInstance repr

conf.netcache
Phil's avatar
Phil committed
############
############
+ Basic tests

* Those test are here mainly to check nothing has been broken
* and to catch Exceptions

= Packet class methods
p = IP()/ICMP()
ret = p.do_build_ps()                                                                                                                             
assert(ret[0] == b"@\x00\x00\x00\x00\x01\x00\x00@\x01\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\x00\x00\x00\x00\x00\x00")
assert(len(ret[1]) == 2)

assert(p[ICMP].firstlayer() == p)

assert(p.command() == "IP()/ICMP()")

p.decode_payload_as(UDP)
assert(p.sport == 2048 and p.dport == 63487)

= hide_defaults
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
p = IP(ttl=64)/ICMP()
assert(repr(p) == "<IP  frag=0 ttl=64 proto=icmp |<ICMP  |>>")
p.hide_defaults()
assert(repr(p) == "<IP  frag=0 proto=icmp |<ICMP  |>>")
conf.color_theme = conf_color_theme

= split_layers
p = IP()/ICMP()
s = raw(p)
split_layers(IP, ICMP, proto=1)
assert(Raw in IP(s))
bind_layers(IP, ICMP, frag=0, proto=1)

= fuzz
~ not_pypy
random.seed(0x2807)
raw(fuzz(IP()/ICMP())) in [
    b'u\x14\x00\x1c\xc2\xf6\x80\x00\xde\x01k\xd3\x7f\x00\x00\x01\x7f\x00\x00\x01y\xc9>\xa6\x84\xd8\xc2\xb7',
    b'E\xa7\x00\x1c\xb0c\xc0\x00\xf6\x01U\xd3\x7f\x00\x00\x01\x7f\x00\x00\x01\xfex\xb3\x92B<\x0b\xb8',
]
= Building some packets
Phil's avatar
Phil committed
~ basic IP TCP UDP NTP LLC SNAP Dot11
IP()/TCP()
Ether()/IP()/UDP()/NTP()
Dot11()/LLC()/SNAP()/IP()/TCP()/"XXX"
IP(ttl=25)/TCP(sport=12, dport=42)
IP().summary()
Phil's avatar
Phil committed

= Manipulating some packets
~ basic IP TCP
a=IP(ttl=4)/TCP()
a.ttl
a.ttl=10
del(a.ttl)
a.ttl
TCP in a
a[TCP]
a[TCP].dport=[80,443]
a
assert(a.copy().time == a.time)
Phil's avatar
Phil committed
a=3


= Checking overloads
~ basic IP TCP Ether
a=Ether()/IP()/TCP()
a.proto
_ == 6


= sprintf() function
~ basic sprintf Ether IP UDP NTP
a=Ether()/IP()/IP(ttl=4)/UDP()/NTP()
a.sprintf("%type% %IP.ttl% %#05xr,UDP.sport% %IP:2.ttl%")
_ in [ '0x800 64 0x07b 4', 'IPv4 64 0x07b 4']


= sprintf() function 
~ basic sprintf IP TCP SNAP LLC Dot11
* This test is on the conditionnal substring feature of <tt>sprintf()</tt>
a=Dot11()/LLC()/SNAP()/IP()/TCP()
a.sprintf("{IP:{TCP:flags=%TCP.flags%}{UDP:port=%UDP.ports%} %IP.src%}")
_ == 'flags=S 127.0.0.1'


= haslayer function
~ basic haslayer IP TCP ICMP ISAKMP
x=IP(id=1)/ISAKMP_payload_SA(prop=ISAKMP_payload_SA(prop=IP()/ICMP()))/TCP()
TCP in x, ICMP in x, IP in x, UDP in x
_ == (True,True,True,False)

= getlayer function
~ basic getlayer IP ISAKMP UDP
x=IP(id=1)/ISAKMP_payload_SA(prop=IP(id=2)/UDP(dport=1))/IP(id=3)/UDP(dport=2)
x[IP]
x[IP:2]
x[IP:3]
x.getlayer(IP,3)
x.getlayer(IP,4)
x[UDP]
x[UDP:1]
x[UDP:2]
assert(x[IP].id == 1 and x[IP:2].id == 2 and x[IP:3].id == 3 and 
       x.getlayer(IP).id == 1 and x.getlayer(IP,3).id == 3 and
       x.getlayer(IP,4) == None and
       x[UDP].dport == 1 and x[UDP:2].dport == 2)
try:
    x[IP:4]
except IndexError:
    True
else:
    False

= getlayer with a filter
~ getlayer IP
pkt = IP() / IP(ttl=3) / IP()
assert pkt[IP::{"ttl":3}].ttl == 3
assert pkt.getlayer(IP, ttl=3).ttl == 3

= specific haslayer and getlayer implementations for NTP
~ haslayer getlayer NTP
pkt = IP() / UDP() / NTPHeader()
assert NTP in pkt
assert pkt.haslayer(NTP)
assert isinstance(pkt[NTP], NTPHeader)
assert isinstance(pkt.getlayer(NTP), NTPHeader)

= specific haslayer and getlayer implementations for EAP
~ haslayer getlayer EAP
pkt = Ether() / EAPOL() / EAP_MD5()
assert EAP in pkt
assert pkt.haslayer(EAP)
assert isinstance(pkt[EAP], EAP_MD5)
assert isinstance(pkt.getlayer(EAP), EAP_MD5)

= specific haslayer and getlayer implementations for RadiusAttribute
~ haslayer getlayer RadiusAttribute
pkt = RadiusAttr_EAP_Message()
assert RadiusAttribute in pkt
assert pkt.haslayer(RadiusAttribute)
assert isinstance(pkt[RadiusAttribute], RadiusAttr_EAP_Message)
assert isinstance(pkt.getlayer(RadiusAttribute), RadiusAttr_EAP_Message)


Phil's avatar
Phil committed
= equality
~ basic
w=Ether()/IP()/UDP(dport=53)
x=Ether()/IP(version=4)/UDP()
Phil's avatar
Phil committed
y=Ether()/IP()/UDP(dport=4)
z=Ether()/IP()/UDP()/NTP()
t=Ether()/IP()/TCP()
x==y, x==z, x==t, y==z, y==t, z==t, w==x
_ == (False, False, False, False, False, False, True)

= answers
~ basic
a1, a2 = "1.2.3.4", "5.6.7.8"
p1 = IP(src=a1, dst=a2)/ICMP(type=8)
p2 = IP(src=a2, dst=a1)/ICMP(type=0)
assert p1.hashret() == p2.hashret()
assert not p1.answers(p2)
assert p2.answers(p1)
assert p1 > p2
assert p2 < p1
assert p1 == p1
conf_back = conf.checkIPinIP
conf.checkIPinIP = True
px = [IP()/p1, IPv6()/p1]
assert not any(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert not any(p2.answers(p) for p in px)
conf.checkIPinIP = False
assert all(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert all(p2.answers(p) for p in px)
conf.checkIPinIP = conf_back

a1, a2 = Net("www.google.com"), Net("www.secdev.org")
prt1, prt2 = 12345, 54321
s1, s2 = 2767216324, 3845532842
p1 = IP(src=a1, dst=a2)/TCP(flags='SA', seq=s1, ack=s2, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='R', seq=s2, ack=0, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
# Not available yet because of IPv6
# a1, a2 = Net6("www.google.com"), Net6("www.secdev.org")
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='RA', seq=0, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+10, sport=prt2, dport=prt1)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1+9, ack=s2+10, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)
Phil's avatar
Phil committed

############
############
+ Tests on padding

= Padding assembly
raw(Padding("abc"))
assert( _ == b"abc" )
raw(Padding("abc")/Padding("def"))
assert( _ == b"abcdef" )
raw(Raw("ABC")/Padding("abc")/Padding("def"))
assert( _ == b"ABCabcdef" )
raw(Raw("ABC")/Padding("abc")/Raw("DEF")/Padding("def"))
assert( _ == b"ABCDEFabcdef" )
Phil's avatar
Phil committed

= Padding and length computation
IP(raw(IP()/Padding("abc")))
Phil's avatar
Phil committed
assert( _.len == 20 and len(_) == 23 )
IP(raw(IP()/Raw("ABC")/Padding("abc")))
Phil's avatar
Phil committed
assert( _.len == 23 and len(_) == 26 )
IP(raw(IP()/Raw("ABC")/Padding("abc")/Padding("def")))
Phil's avatar
Phil committed
assert( _.len == 23 and len(_) == 29 )

= PadField test
~ PadField padding

class TestPad(Packet):
    fields_desc = [ PadField(StrNullField("st", b""),4), StrField("id", b"")]
TestPad() == TestPad(raw(TestPad()))
Phil's avatar
Phil committed


############
############
+ Tests on default value changes mechanism
Phil's avatar
Phil committed

= Creation of an IPv3 class from IP class with different default values
class IPv3(IP):
    version = 3
    ttl = 32

= Test of IPv3 class
a = IPv3()
a.version, a.ttl
assert(_ == (3,32))
gpotter2's avatar
gpotter2 committed
assert(_ == b'5\x00\x00\x14\x00\x01\x00\x00 \x00\xac\xe7\x7f\x00\x00\x01\x7f\x00\x00\x01')
Phil's avatar
Phil committed


############
############
+ ISAKMP transforms test

= ISAKMP creation
~ IP UDP ISAKMP 
p=IP(src='192.168.8.14',dst='10.0.0.1')/UDP()/ISAKMP()/ISAKMP_payload_SA(prop=ISAKMP_payload_Proposal(trans=ISAKMP_payload_Transform(transforms=[('Encryption', 'AES-CBC'), ('Hash', 'MD5'), ('Authentication', 'PSK'), ('GroupDesc', '1536MODPgr'), ('KeyLength', 256), ('LifeType', 'Seconds'), ('LifeDuration', 86400)])/ISAKMP_payload_Transform(res2=12345,transforms=[('Encryption', '3DES-CBC'), ('Hash', 'SHA'), ('Authentication', 'PSK'), ('GroupDesc', '1024MODPgr'), ('LifeType', 'Seconds'), ('LifeDuration', 86400)])))
Phil's avatar
Phil committed
p.show()
p


= ISAKMP manipulation
~ ISAKMP
p[ISAKMP_payload_Transform:2]
_.res2 == 12345

= ISAKMP assembly
~ ISAKMP 
hexdump(p)
raw(p) == b"E\x00\x00\x96\x00\x01\x00\x00@\x11\xa7\x9f\xc0\xa8\x08\x0e\n\x00\x00\x01\x01\xf4\x01\xf4\x00\x82\xbf\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00z\x00\x00\x00^\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00R\x01\x01\x00\x00\x03\x00\x00'\x00\x01\x00\x00\x80\x01\x00\x07\x80\x02\x00\x01\x80\x03\x00\x01\x80\x04\x00\x05\x80\x0e\x01\x00\x80\x0b\x00\x01\x00\x0c\x00\x03\x01Q\x80\x00\x00\x00#\x00\x0109\x80\x01\x00\x05\x80\x02\x00\x02\x80\x03\x00\x01\x80\x04\x00\x02\x80\x0b\x00\x01\x00\x0c\x00\x03\x01Q\x80"
Phil's avatar
Phil committed


= ISAKMP disassembly
~ ISAKMP
q=IP(raw(p))
Phil's avatar
Phil committed
q.show()
q[ISAKMP_payload_Transform:2]
_.res2 == 12345


############
############
+ TFTP tests

= TFTP Options
x=IP()/UDP(sport=12345)/TFTP()/TFTP_RRQ(filename="fname")/TFTP_Options(options=[TFTP_Option(oname="blksize", value="8192"),TFTP_Option(oname="other", value="othervalue")])
assert( raw(x) == b'E\x00\x00H\x00\x01\x00\x00@\x11|\xa2\x7f\x00\x00\x01\x7f\x00\x00\x0109\x00E\x004B6\x00\x01fname\x00octet\x00blksize\x008192\x00other\x00othervalue\x00' )
y=IP(raw(x))
Phil's avatar
Phil committed
y[TFTP_Option].oname
y[TFTP_Option:2].oname
assert(len(y[TFTP_Options].options) == 2 and y[TFTP_Option].oname == b"blksize")
Phil's avatar
Phil committed


############
############
+ Dot11 tests


= WEP tests
~ wifi crypto Dot11 LLC SNAP IP TCP
conf.wepkey = "Fobar"
raw(Dot11WEP()/LLC()/SNAP()/IP()/TCP(seq=12345678))
gpotter2's avatar
gpotter2 committed
assert(_ == b'\x00\x00\x00\x00\xe3OjYLw\xc3x_%\xd0\xcf\xdeu-\xc3pH#\x1eK\xae\xf5\xde\xe7\xb8\x1d,\xa1\xfe\xe83\xca\xe1\xfe\xbd\xfe\xec\x00)T`\xde.\x93Td\x95C\x0f\x07\xdd')
Phil's avatar
Phil committed
Dot11WEP(_)
assert(TCP in _ and _[TCP].seq == 12345678)

= RadioTap Big-Small endian dissection
data = b'\x00\x00\x1a\x00/H\x00\x00\xe1\xd3\xcb\x05\x00\x00\x00\x00@0x\x14@\x01\xac\x00\x00\x00'
r = RadioTap(data)
r.show()
assert r.present == 18479

Phil's avatar
Phil committed

############
############
+ SNMP tests

= SNMP assembling
~ SNMP ASN1
raw(SNMP())
gpotter2's avatar
gpotter2 committed
assert(_ == b'0\x18\x02\x01\x01\x04\x06public\xa0\x0b\x02\x01\x00\x02\x01\x00\x02\x01\x000\x00')
Phil's avatar
Phil committed
SNMP(version="v2c", community="ABC", PDU=SNMPbulk(id=4,varbindlist=[SNMPvarbind(oid="1.2.3.4",value=ASN1_INTEGER(7)),SNMPvarbind(oid="4.3.2.1.2.3",value=ASN1_IA5_STRING("testing123"))]))
gpotter2's avatar
gpotter2 committed
assert(_ == b'05\x02\x01\x01\x04\x03ABC\xa5+\x02\x01\x04\x02\x01\x00\x02\x01\x000 0\x08\x06\x03*\x03\x04\x02\x01\x070\x14\x06\x06\x81#\x02\x01\x02\x03\x16\ntesting123')
Phil's avatar
Phil committed

= SNMP disassembling
~ SNMP ASN1
gpotter2's avatar
gpotter2 committed
x=SNMP(b'0y\x02\x01\x00\x04\x06public\xa2l\x02\x01)\x02\x01\x00\x02\x01\x000a0!\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x07\n\x86\xde\xb78\x04\x0b172.31.19.20#\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x07\n\x86\xde\xb76\x04\r255.255.255.00\x17\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x05\n\x86\xde\xb9`\x02\x01\x01')
Phil's avatar
Phil committed
x.show()
assert(x.community==b"public" and x.version == 0)
Phil's avatar
Phil committed
assert(x.PDU.id == 41 and len(x.PDU.varbindlist) == 3)
assert(x.PDU.varbindlist[0].oid == "1.3.6.1.4.1.253.8.64.4.2.1.7.10.14130104")
assert(x.PDU.varbindlist[0].value == b"172.31.19.2")
assert(x.PDU.varbindlist[2].oid == "1.3.6.1.4.1.253.8.64.4.2.1.5.10.14130400")
Phil's avatar
Phil committed
assert(x.PDU.varbindlist[2].value == 1)


############
############
+ Network tests

* Those tests need network access

= Sending and receiving an ICMP
~ netaccess IP ICMP
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3)
conf.debug_dissector = old_debug_dissector
Phil's avatar
Phil committed
x
Pierre LALET's avatar
Pierre LALET committed
assert x[IP].ottl() in [32, 64, 128, 255]
assert 0 <= x[IP].hops() <= 126
Phil's avatar
Phil committed
x is not None and ICMP in x and x[ICMP].type == 0

gpotter2's avatar
gpotter2 committed
= Sending and receiving an ICMP with flooding methods
~ netaccess IP ICMP
# flooding methods do not support timeout. Packing the test for security
def _test_flood():
    old_debug_dissector = conf.debug_dissector
    conf.debug_dissector = False
    x = sr1flood(IP(dst="www.google.com")/ICMP())
    conf.debug_dissector = old_debug_dissector
    x
    assert x[IP].ottl() in [32, 64, 128, 255]
    assert 0 <= x[IP].hops() <= 126
    x is not None and ICMP in x and x[ICMP].type == 0

t = Thread(target=_test_flood)
t.start()
t.join(3)
assert not t.is_alive()

= Sending and receiving an ICMPv6EchoRequest
~ netaccess ipv6
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
x = sr1(IPv6(dst="www.google.com")/ICMPv6EchoRequest(),timeout=3)
conf.debug_dissector = old_debug_dissector
x
assert x[IPv6].ottl() in [32, 64, 128, 255]
assert 0 <= x[IPv6].hops() <= 126
x is not None and ICMPv6EchoReply in x and x[ICMPv6EchoReply].type == 129
Phil's avatar
Phil committed
= DNS request
~ netaccess IP UDP DNS
* A possible cause of failure could be that the open DNS (resolver1.opendns.com)
* is not reachable or down.
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
Phil's avatar
Phil committed
dns_ans = sr1(IP(dst="resolver1.opendns.com")/UDP()/DNS(rd=1,qd=DNSQR(qname="www.slashdot.com")),timeout=5)
conf.debug_dissector = old_debug_dissector
Phil's avatar
Phil committed
DNS in dns_ans

Pierre LALET's avatar
Pierre LALET committed
= Whois request
~ netaccess IP
* This test retries on failure because it often fails
import time
import socket
success = False
for i in six.moves.range(5):
    try:
        IP(src="8.8.8.8").whois()
Pierre LALET's avatar
Pierre LALET committed
    except socket.error:
        time.sleep(2)
    else:
        success = True
        break

assert success
Guillaume Valadon's avatar
Guillaume Valadon committed
= AS resolvers
~ netaccess IP
* This test retries on failure because it often fails

ret = list()
success = False
for i in six.moves.range(5):
    try:
        ret = conf.AS_resolver.resolve("8.8.8.8", "8.8.4.4")
gpotter2's avatar
gpotter2 committed
    except RuntimeError:
        time.sleep(2)
    else:
        success = True
        break
Guillaume Valadon's avatar
Guillaume Valadon committed

assert (len(ret) == 2)
gpotter2's avatar
gpotter2 committed
all(x[1] == "AS15169" for x in ret)
ret = list()
for i in six.moves.range(5):
    try:
        ret = AS_resolver_riswhois().resolve("8.8.8.8")
    except socket.error:
        time.sleep(2)
    else:
        success = True
        break

assert (len(ret) == 1)
assert all(x[1] == "AS15169" for x in ret)

# This test is too buggy, and is simulated below
#ret = list()
#success = False
#for i in six.moves.range(5):
#    try:
#        ret = AS_resolver_cymru().resolve("8.8.8.8")
#    except socket.error:
#        time.sleep(2)
#    else:
#        success = True
#        break
#
#assert (len(ret) == 1)
#
#all(x[1] == "AS15169" for x in ret)
cymru_bulk_data = """
Bulk mode; whois.cymru.com [2017-10-03 08:38:08 +0000]
24776   | 217.25.178.5     | INFOCLIP-AS, FR
36459   | 192.30.253.112   | GITHUB - GitHub, Inc., US
26496   | 68.178.213.61    | AS-26496-GO-DADDY-COM-LLC - GoDaddy.com, LLC, US
"""
tmp = AS_resolver_cymru().parse(cymru_bulk_data)
assert(len(tmp) == 3)
assert([l[1] for l in tmp] == ['AS24776', 'AS36459', 'AS26496'])

= AS resolver - IPv6
~ netaccess IP
* This test retries on failure because it often fails

ret = list()
success = False
as_resolver6 = AS_resolver6()
for i in six.moves.range(5):
    try:
        ret = as_resolver6.resolve("2001:4860:4860::8888", "2001:4860:4860::4444")
    except socket.error:
        time.sleep(2)
    else:
        success = True
        break

assert (len(ret) == 2)
assert all(x[1] == 15169 for x in ret)

Phil's avatar
Phil committed

############
############
+ More complex tests

= Implicit logic
~ IP TCP
a=IP(ttl=(5,10))/TCP(dport=[80,443])
Phil's avatar
Phil committed
[p for p in a]
len(_) == 12


############
############
+ Real usages

= Port scan
~ netaccess IP TCP
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
Phil's avatar
Phil committed
ans,unans=sr(IP(dst="www.google.com/30")/TCP(dport=[80,443]),timeout=2)
conf.debug_dissector = old_debug_dissector
ans.make_table(lambda s_r: (s_r[0].dst, s_r[0].dport, s_r[1].sprintf("{TCP:%TCP.flags%}{ICMP:%ICMP.code%}")))
Phil's avatar
Phil committed

= Traceroute function
~ netaccess
* Let's test traceroute
traceroute("www.slashdot.org")
ans,unans=_

= Result manipulation
~ netaccess
ans.nsummary()
s,r=ans[0]
s.show()
s.show(2)

= DNS packet manipulation
Phil's avatar
Phil committed
dns_ans.show()
dns_ans.show2()
dns_ans[DNS].an.show()
dns_ans2 = IP(raw(dns_ans))
assert(raw(dns_ans2) == raw(dns_ans))
dns_ans2.qd.qname = "www.secdev.org."
* We need to recalculate these values
del(dns_ans2[IP].len)
del(dns_ans2[IP].chksum)
del(dns_ans2[UDP].len)
del(dns_ans2[UDP].chksum)
assert(b"\x03www\x06secdev\x03org\x00" in raw(dns_ans2))
assert(DNS in IP(raw(dns_ans2)))
Phil's avatar
Phil committed

= Arping
~ netaccess
* This test assumes the local network is a /24. This is bad.
conf.route.route("0.0.0.0")[2]
arping(_+"/24")

= send() and sniff()
import time
import os
try:
    from Queue import Queue as queue
except:
    from queue import Queue as queue

gpotter2's avatar
gpotter2 committed
def _send_or_sniff(pkt, timeout, flt, pid, fork, t_other=None, opened_socket=None):
    assert pid != -1
    if pid == 0:
        time.sleep(1)
        (sendp if isinstance(pkt, (Ether, Dot3)) else send)(pkt)
        if fork:
            os._exit(0)
        else:
            return
        spkt = raw(pkt)
        # We do not want to crash when a packet cannot be parsed
        old_debug_dissector = conf.debug_dissector
        conf.debug_dissector = False
        pkts = sniff(
gpotter2's avatar
gpotter2 committed
            timeout=timeout, filter=flt, opened_socket=opened_socket,
            stop_filter=lambda p: pkt.__class__ in p and raw(p[pkt.__class__]) == spkt
        conf.debug_dissector = old_debug_dissector
        if fork:
            os.waitpid(pid, 0)
        else:
            t_other.join()
    assert raw(pkt) in (raw(p[pkt.__class__]) for p in pkts if pkt.__class__ in p)
gpotter2's avatar
gpotter2 committed
def send_and_sniff(pkt, timeout=2, flt=None, opened_socket=None):
    """Send a packet, sniff, and check the packet has been seen"""
    if hasattr(os, "fork"):
        _send_or_sniff(pkt, timeout, flt, os.fork(), True)
    else:
        from threading import Thread
gpotter2's avatar
gpotter2 committed
        def run_function(pkt, timeout, flt, pid, thread, results, opened_socket):
            _send_or_sniff(pkt, timeout, flt, pid, False, t_other=thread, opened_socket=opened_socket)
        results = queue()
gpotter2's avatar
gpotter2 committed
        t_parent = Thread(target=run_function, args=(pkt, timeout, flt, 0, None, results, None))
        t_child = Thread(target=run_function, args=(pkt, timeout, flt, 1, t_parent, results, opened_socket))
        t_parent.start()
        t_child.start()
        t_parent.join()
        t_child.join()
        assert results.qsize() >= 2
        while not results.empty():
            assert results.get()

send_and_sniff(IP(dst="secdev.org")/ICMP())
send_and_sniff(IP(dst="secdev.org")/ICMP(), flt="icmp")
send_and_sniff(Ether()/IP(dst="secdev.org")/ICMP())

gpotter2's avatar
gpotter2 committed
a = L2ListenTcpdump()
icmp_r = IP(b'E\x00\x00\x1c\x00\x01\x00\x00@\x01p\xc0\x7f\x00\x00\x01\xd9\x19\xb2\x05\x08\x00\xf7\xff\x00\x00\x00\x00')
send_and_sniff(icmp_r, timeout=10, opened_socket=a)
a.close()

############
############
+ ManuFDB tests

= __repr__

if conf.manufdb:
    len(conf.manufdb)
else:
    True