Skip to content
Snippets Groups Projects
regression.uts 389 KiB
Newer Older
Phil's avatar
Phil committed
% Regression tests for Scapy

# More informations at http://www.secdev.org/projects/UTscapy/

############
############
+ Informations on Scapy

= Get conf
~ conf command
* Dump the current configuration
conf

gpotter2's avatar
gpotter2 committed
IP().src

Phil's avatar
Phil committed
= List layers
~ conf command
ls()

= List commands
~ conf command
lsc()

= List contribs
import mock
result_list_contrib = ""
def test_list_contrib():
    def write(s):
        global result_list_contrib
        result_list_contrib += s
    mock_stdout = mock.Mock()
    mock_stdout.write = write
    bck_stdout = sys.stdout
    sys.stdout = mock_stdout
    list_contrib()
    sys.stdout = bck_stdout
    assert("http2               : HTTP/2 (RFC 7540, RFC 7541)              status=loads" in result_list_contrib)
    assert(result_list_contrib.split('\n') > 40)

test_list_contrib()

Phil's avatar
Phil committed
= Configuration
~ conf
conf.debug_dissector = True
Phil's avatar
Phil committed


############
############
+ Scapy functions tests

= Interface related functions

get_if_raw_hwaddr(conf.iface)

get_if_raw_addr(conf.iface).encode("hex")

def get_dummy_interface():
    """Returns a dummy network interface"""
    if WINDOWS:
        data = {}
        data["name"] = "dummy0"
        data["description"] = "Does not exist"
        data["win_index"] = -1
        data["guid"] = "{0XX00000-X000-0X0X-X00X-00XXXX000XXX}"
        data["invalid"] = True
        return NetworkInterface(data)
    else:
        return "dummy0"

get_if_raw_addr(get_dummy_interface())
get_if_list()

get_if_raw_addr6(conf.iface6)

= Test read_routes6() - default output

routes6 = read_routes6()
if WINDOWS:
    route_add_loopback(routes6, True)

# Expected results:
# - one route if there is only the loopback interface
# - three routes if there is a network interface

if len(routes6):
    iflist = get_if_list()
    if WINDOWS:
        route_add_loopback(ipv6=True, iflist=iflist)
    if iflist == [LOOPBACK_NAME]:
gpotter2's avatar
gpotter2 committed
        len(routes6) == 1
    elif len(iflist) >= 2:
gpotter2's avatar
gpotter2 committed
        len(routes6) >= 3
gpotter2's avatar
gpotter2 committed
        False
    # IPv6 seems disabled. Force a route to ::1
    conf.route6.routes.append(("::1", 128, "::", LOOPBACK_NAME, ["::1"]))
    True

= Test read_routes6() - check mandatory routes

if len(routes6):
    assert(len(filter(lambda r: r[0] == "::1" and r[-1] == ["::1"], routes6)) >= 1)
    if iflist >= 2:
gpotter2's avatar
gpotter2 committed
        assert(len(filter(lambda r: r[0] == "fe80::" and r[1] == 64, routes6)) >= 1)
        len(filter(lambda r: in6_islladdr(r[0]) and r[1] == 128 and r[-1] == ["::1"], routes6)) >= 1
= Test ifchange()
conf.route6.ifchange(LOOPBACK_NAME, "::1/128")
True

gpotter2's avatar
gpotter2 committed
= UTscapy route check
* Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise

if WINDOWS:
    route_add_loopback()

IP().src
assert _ == "127.0.0.1"

gpotter2's avatar
gpotter2 committed
############
############
+ Main.py tests

= Prepare emulator
import mock
from mock import Mock

if WINDOWS:
    # This fix when the windows doesn't have a size (run with a windows server)
    mock.patch("pyreadline.console.console.Console.size").return_value = (300, 300)

@mock.patch("scapy.config.conf.readfunc")
def emulate_main_input(data, mock_readfunc):
    global index
    index = 0 # reset var
    def readlineScapy(*args, **kargs):
        global index
        if len(data) == index:
            r_data = "exit(1 if hasattr(sys, 'last_value') and sys.last_value is not None else 0)"
        else:
            r_data = data[index]
        index +=1
        print r_data
        return r_data
    mock_readfunc.side_effect = readlineScapy
    def reduce_mock(self):
        return (Mock, ())
    mock_readfunc.__reduce__ = reduce_mock
    sys.argv = ['']
    def console_exit(code):
        raise SystemExit(code)
    exit_code = -1
    try:
        interact(mydict={"exit": console_exit})
    except SystemExit as e:
        exit_code = str(e)
        pass
    assert exit_code == "0"

= Test basic save_session, load_session and update_session

data = ["init_session(\"scapySession1\")",\
"test_value = \"8.8.8.8\"",\
"save_session()",\
"del test_value",\
"load_session()",\
"update_session()",\
"assert test_value == \"8.8.8.8\""]

emulate_main_input(data)

= Test save_session, load_session and update_session with fname

data = ["init_session(\"scapySession2\")",\
"test_value = 7",\
"save_session(fname=\"scapySaveSession.dat\")",\
"del test_value",\
"load_session(fname=\"scapySaveSession.dat\")",\
"update_session(fname=\"scapySaveSession.dat\")",\
"assert test_value == 7"]

emulate_main_input(data)
gpotter2's avatar
gpotter2 committed
= Test pickling with packets

data = ["init_session(\"scapySession1\")",\
"test_value = IP(src=\"127.0.0.1\", dst=\"8.8.8.8\")",\
"test_value2 = ICMPv6EchoReply(data=\"testData@%!\")",\
"save_session()",\
"del test_value",\
"load_session()",\
"assert test_value.src == \"127.0.0.1\"",\
"assert test_value.dst == \"8.8.8.8\"",\
"assert test_value2.data == \"testData@%!\""]

emulate_main_input(data)

= Clean up session files

try:
    os.remove("scapySaveSession.dat")
except OSError:
    pass

= Test temporary file creation
tmpfile = get_temp_file(autoext=".ut")
if WINDOWS:
    assert("scapy" in tmpfile and tmpfile.startswith('C:\\Users\\appveyor\\AppData\\Local\\Temp'))
else:
    import platform
    IS_PYPY = platform.python_implementation() == "PyPy"
    assert("scapy" in tmpfile and (IS_PYPY == True or "/tmp/" in tmpfile))
assert(conf.temp_files[0].endswith(".ut"))
assert(conf.temp_files.pop())
assert(len(conf.temp_files) == 0)
= Test sane function
sane("A\x00\xFFB") == "A..B"
= Test lhex function
assert(lhex(42) == "0x2a")
assert(lhex((28,07)) == "(0x1c, 0x7)")
assert(lhex([28,07]) == "[0x1c, 0x7]")

= Test linehexdump function
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
assert(linehexdump(Ether(src="00:01:02:03:04:05"), dump=True) == "FFFFFFFFFFFF0001020304059000 ..............")
conf.color_theme = conf_color_theme
= Test chexdump function
chexdump(Ether(src="00:01:02:02:04:05"), dump=True) == "0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x02, 0x02, 0x04, 0x05, 0x90, 0x00"
= Test repr_hex function
repr_hex("scapy") == "7363617079"

= Test hexstr function
gpotter2's avatar
gpotter2 committed
hexstr(b"A\x00\xFFB") == "41 00 ff 42  A..B"
= Test fletcher16 functions
assert(fletcher16_checksum(b"\x28\x07") == 22319)
assert(fletcher16_checkbytes(b"\x28\x07", 1) == "\xaf(")
= Test hexdiff function
~ not_pypy
import mock
result_hexdiff = ""
def test_hexdiff():
    def write(s):
        global result_hexdiff
        result_hexdiff += s
    conf_color_theme = conf.color_theme
    conf.color_theme = BlackAndWhite()
    mock_stdout = mock.Mock()
    mock_stdout.write = write
    bck_stdout = sys.stdout
    sys.stdout = mock_stdout
    hexdiff("abcde", "abCde")
    sys.stdout = bck_stdout
    conf.interactive = True
    conf.color_theme = conf_color_theme
    expected  = "0000        61 62 63 64 65                                     abcde\n"
    expected += "     0000   61 62 43 64 65                                     abCde\n"
    assert(result_hexdiff == expected)

test_hexdiff()

= Test fletcher16_* functions
assert(fletcher16_checksum("\x28\x07") == 22319)
assert(fletcher16_checkbytes("ABCDEF", 2) == "\x89\x67")

= Test zerofree_randstring function
random.seed(0x2807)
zerofree_randstring(4) == "\xd2\x12\xe4\x5b"

= Test export_object and import_object functions
import mock
result_export_object = ""
def test_export_import_object():
    def write(s):
        global result_export_object
        result_export_object += s
    mock_stdout = mock.Mock()
    mock_stdout.write = write
    bck_stdout = sys.stdout
    sys.stdout = mock_stdout
    export_object(2807)
    sys.stdout = bck_stdout
    assert(result_export_object.endswith("eNprYPL9zqUHAAdrAf8=\n\n"))
    assert(import_object(result_export_object) == 2807)

test_export_import_object()

= Test tex_escape function
tex_escape("$#_") == "\\$\\#\\_"

f = colgen(range(3))
assert(len([f.next() for i in range(2)]) == 2)
= Test incremental_label function
f = incremental_label()
assert([f.next() for i in range(2)] == ["tag00000", "tag00001"])
import random
random.seed(0x2807)
assert(corrupt_bytes("ABCDE") == "ABCDW")
assert(sane(corrupt_bytes("ABCDE", n=3)) == "A.8D4")
assert(corrupt_bits("ABCDE") == "EBCDE")
assert(sane(corrupt_bits("ABCDE", n=3)) == "AF.EE")

= Test save_object and load_object functions
import tempfile
fd, fname = tempfile.mkstemp()
save_object(fname, 2807)
assert(load_object(fname) == 2807)

= Test whois function
if not WINDOWS:
    result = whois("193.0.6.139")
    assert("inetnum" in result and "Amsterdam" in result)

= Test manuf DB methods
~ manufdb
assert(MANUFDB._resolve_MAC("00:00:0F:01:02:03") == "Next:01:02:03")
assert(MANUFDB._get_short_manuf("00:00:0F:01:02:03") == "Next")
assert(in6_addrtovendor("fe80::0200:0fff:fe01:0203").lower().startswith("next"))

= Test utility functions - network related
~ netaccess

atol("www.secdev.org") == 3642339845

= Test autorun functions

ret = autorun_get_text_interactive_session("IP().src")
assert(ret == (">>> IP().src\n'127.0.0.1'\n", '127.0.0.1'))

ret = autorun_get_html_interactive_session("IP().src")
assert(ret == ("<span class=prompt>&gt;&gt;&gt; </span>IP().src\n'127.0.0.1'\n", '127.0.0.1'))

ret = autorun_get_latex_interactive_session("IP().src")
assert(ret == ("\\textcolor{blue}{{\\tt\\char62}{\\tt\\char62}{\\tt\\char62} }IP().src\n'127.0.0.1'\n", '127.0.0.1'))

= Test config file functions

saved_conf_verb = conf.verb
fd, fname = tempfile.mkstemp()
os.write(fd, "conf.verb = 42\n")
os.close(fd)
from scapy.main import _read_config_file
_read_config_file(fname)
assert(conf.verb == 42)
conf.verb = saved_conf_verb


Phil's avatar
Phil committed
############
############
+ Basic tests

* Those test are here mainly to check nothing has been broken
* and to catch Exceptions

= Packet class methods
p = IP()/ICMP()
ret = p.do_build_ps()                                                                                                                             
assert(ret[0] == "@\x00\x00\x00\x00\x01\x00\x00@\x01\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\x00\x00\x00\x00\x00\x00")
assert(len(ret[1]) == 2)

assert(p[ICMP].firstlayer() == p)

assert(p.command() == "IP()/ICMP()")

p.decode_payload_as(UDP)
assert(p.sport == 2048 and p.dport == 63487)

= hide_defaults
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
p = IP(ttl=64)/ICMP()
assert(repr(p) == "<IP  frag=0 ttl=64 proto=icmp |<ICMP  |>>")
p.hide_defaults()
assert(repr(p) == "<IP  frag=0 proto=icmp |<ICMP  |>>")
conf.color_theme = conf_color_theme

= split_layers
p = IP()/ICMP()
s = str(p)
split_layers(IP, ICMP, proto=1)
assert(Raw in IP(s))
bind_layers(IP, ICMP, frag=0, proto=1)

= fuzz
~ not_pypy
random.seed(0x2807)
str(fuzz(IP()/ICMP())) == '\xe5S\x00\x1c\x9dC \x007\x01(H\x7f\x00\x00\x01\x7f\x00\x00\x01*\xdb\xf7,9\x8e\xa4i'

= Building some packets
Phil's avatar
Phil committed
~ basic IP TCP UDP NTP LLC SNAP Dot11
IP()/TCP()
Ether()/IP()/UDP()/NTP()
Dot11()/LLC()/SNAP()/IP()/TCP()/"XXX"
IP(ttl=25)/TCP(sport=12, dport=42)
IP().summary()
Phil's avatar
Phil committed

= Manipulating some packets
~ basic IP TCP
a=IP(ttl=4)/TCP()
a.ttl
a.ttl=10
del(a.ttl)
a.ttl
TCP in a
a[TCP]
a[TCP].dport=[80,443]
a
assert(a.copy().time == a.time)
Phil's avatar
Phil committed
a=3


= Checking overloads
~ basic IP TCP Ether
a=Ether()/IP()/TCP()
a.proto
_ == 6


= sprintf() function
~ basic sprintf Ether IP UDP NTP
a=Ether()/IP()/IP(ttl=4)/UDP()/NTP()
a.sprintf("%type% %IP.ttl% %#05xr,UDP.sport% %IP:2.ttl%")
_ in [ '0x800 64 0x07b 4', 'IPv4 64 0x07b 4']


= sprintf() function 
~ basic sprintf IP TCP SNAP LLC Dot11
* This test is on the conditionnal substring feature of <tt>sprintf()</tt>
a=Dot11()/LLC()/SNAP()/IP()/TCP()
a.sprintf("{IP:{TCP:flags=%TCP.flags%}{UDP:port=%UDP.ports%} %IP.src%}")
_ == 'flags=S 127.0.0.1'


= haslayer function
~ basic haslayer IP TCP ICMP ISAKMP
x=IP(id=1)/ISAKMP_payload_SA(prop=ISAKMP_payload_SA(prop=IP()/ICMP()))/TCP()
TCP in x, ICMP in x, IP in x, UDP in x
_ == (True,True,True,False)

= getlayer function
~ basic getlayer IP ISAKMP UDP
x=IP(id=1)/ISAKMP_payload_SA(prop=IP(id=2)/UDP(dport=1))/IP(id=3)/UDP(dport=2)
x[IP]
x[IP:2]
x[IP:3]
x.getlayer(IP,3)
x.getlayer(IP,4)
x[UDP]
x[UDP:1]
x[UDP:2]
assert(x[IP].id == 1 and x[IP:2].id == 2 and x[IP:3].id == 3 and 
       x.getlayer(IP).id == 1 and x.getlayer(IP,3).id == 3 and
       x.getlayer(IP,4) == None and
       x[UDP].dport == 1 and x[UDP:2].dport == 2)
try:
    x[IP:4]
except IndexError:
    True
else:
    False

= equality
~ basic
w=Ether()/IP()/UDP(dport=53)
x=Ether()/IP(version=4)/UDP()
Phil's avatar
Phil committed
y=Ether()/IP()/UDP(dport=4)
z=Ether()/IP()/UDP()/NTP()
t=Ether()/IP()/TCP()
x==y, x==z, x==t, y==z, y==t, z==t, w==x
_ == (False, False, False, False, False, False, True)

= answers
~ basic
a1, a2 = "1.2.3.4", "5.6.7.8"
p1 = IP(src=a1, dst=a2)/ICMP(type=8)
p2 = IP(src=a2, dst=a1)/ICMP(type=0)
assert p1.hashret() == p2.hashret()
assert not p1.answers(p2)
assert p2.answers(p1)
assert p1 > p2
assert p2 < p1
assert p1 == p1
conf_back = conf.checkIPinIP
conf.checkIPinIP = True
px = [IP()/p1, IPv6()/p1]
assert not any(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert not any(p2.answers(p) for p in px)
conf.checkIPinIP = False
assert all(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert all(p2.answers(p) for p in px)
conf.checkIPinIP = conf_back
prt1, prt2 = 12345, 54321
s1, s2 = 2767216324, 3845532842
p1 = IP(src=a1, dst=a2)/TCP(flags='SA', seq=s1, ack=s2, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='R', seq=s2, ack=0, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='RA', seq=0, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+10, sport=prt2, dport=prt1)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1+9, ack=s2+10, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)
Phil's avatar
Phil committed

############
############
+ Tests on padding

= Padding assembly
str(Padding("abc"))
assert( _ == "abc" )
str(Padding("abc")/Padding("def"))
assert( _ == "abcdef" )
str(Raw("ABC")/Padding("abc")/Padding("def"))
assert( _ == "ABCabcdef" )
str(Raw("ABC")/Padding("abc")/Raw("DEF")/Padding("def"))
Phil's avatar
Phil committed
assert( _ == "ABCDEFabcdef" )
Phil's avatar
Phil committed

= Padding and length computation
IP(str(IP()/Padding("abc")))
assert( _.len == 20 and len(_) == 23 )
IP(str(IP()/Raw("ABC")/Padding("abc")))
assert( _.len == 23 and len(_) == 26 )
IP(str(IP()/Raw("ABC")/Padding("abc")/Padding("def")))
assert( _.len == 23 and len(_) == 29 )

= PadField test
~ PadField padding

class TestPad(Packet):
    fields_desc = [ PadField(StrNullField("st", ""),4), StrField("id", "")]

TestPad() == TestPad(str(TestPad()))
Phil's avatar
Phil committed


############
############
+ Tests on default value changes mechanism
Phil's avatar
Phil committed

= Creation of an IPv3 class from IP class with different default values
class IPv3(IP):
    version = 3
    ttl = 32

= Test of IPv3 class
a = IPv3()
a.version, a.ttl
assert(_ == (3,32))
str(a)
gpotter2's avatar
gpotter2 committed
assert(_ == b'5\x00\x00\x14\x00\x01\x00\x00 \x00\xac\xe7\x7f\x00\x00\x01\x7f\x00\x00\x01')
Phil's avatar
Phil committed


############
############
+ ISAKMP transforms test

= ISAKMP creation
~ IP UDP ISAKMP 
p=IP(src='192.168.8.14',dst='10.0.0.1')/UDP()/ISAKMP()/ISAKMP_payload_SA(prop=ISAKMP_payload_Proposal(trans=ISAKMP_payload_Transform(transforms=[('Encryption', 'AES-CBC'), ('Hash', 'MD5'), ('Authentication', 'PSK'), ('GroupDesc', '1536MODPgr'), ('KeyLength', 256), ('LifeType', 'Seconds'), ('LifeDuration', 86400L)])/ISAKMP_payload_Transform(res2=12345,transforms=[('Encryption', '3DES-CBC'), ('Hash', 'SHA'), ('Authentication', 'PSK'), ('GroupDesc', '1024MODPgr'), ('LifeType', 'Seconds'), ('LifeDuration', 86400L)])))
p.show()
p


= ISAKMP manipulation
~ ISAKMP
p[ISAKMP_payload_Transform:2]
_.res2 == 12345

= ISAKMP assembly
~ ISAKMP 
hexdump(p)
gpotter2's avatar
gpotter2 committed
str(p) == b"E\x00\x00\x96\x00\x01\x00\x00@\x11\xa7\x9f\xc0\xa8\x08\x0e\n\x00\x00\x01\x01\xf4\x01\xf4\x00\x82\xbf\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00z\x00\x00\x00^\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00R\x01\x01\x00\x00\x03\x00\x00'\x00\x01\x00\x00\x80\x01\x00\x07\x80\x02\x00\x01\x80\x03\x00\x01\x80\x04\x00\x05\x80\x0e\x01\x00\x80\x0b\x00\x01\x00\x0c\x00\x03\x01Q\x80\x00\x00\x00#\x00\x0109\x80\x01\x00\x05\x80\x02\x00\x02\x80\x03\x00\x01\x80\x04\x00\x02\x80\x0b\x00\x01\x00\x0c\x00\x03\x01Q\x80"
Phil's avatar
Phil committed


= ISAKMP disassembly
~ ISAKMP
q=IP(str(p))
q.show()
q[ISAKMP_payload_Transform:2]
_.res2 == 12345


############
############
+ TFTP tests

= TFTP Options
x=IP()/UDP(sport=12345)/TFTP()/TFTP_RRQ(filename="fname")/TFTP_Options(options=[TFTP_Option(oname="blksize", value="8192"),TFTP_Option(oname="other", value="othervalue")])
gpotter2's avatar
gpotter2 committed
assert( str(x) == b'E\x00\x00H\x00\x01\x00\x00@\x11|\xa2\x7f\x00\x00\x01\x7f\x00\x00\x0109\x00E\x004B6\x00\x01fname\x00octet\x00blksize\x008192\x00other\x00othervalue\x00' )
Phil's avatar
Phil committed
y=IP(str(x))
y[TFTP_Option].oname
y[TFTP_Option:2].oname
assert(len(y[TFTP_Options].options) == 2 and y[TFTP_Option].oname == "blksize")


############
############
+ Dot11 tests


= WEP tests
~ wifi crypto Dot11 LLC SNAP IP TCP
conf.wepkey = "Fobar"
Phil's avatar
Phil committed
str(Dot11WEP()/LLC()/SNAP()/IP()/TCP(seq=12345678))
gpotter2's avatar
gpotter2 committed
assert(_ == b'\x00\x00\x00\x00\xe3OjYLw\xc3x_%\xd0\xcf\xdeu-\xc3pH#\x1eK\xae\xf5\xde\xe7\xb8\x1d,\xa1\xfe\xe83\xca\xe1\xfe\xbd\xfe\xec\x00)T`\xde.\x93Td\x95C\x0f\x07\xdd')
Phil's avatar
Phil committed
Dot11WEP(_)
assert(TCP in _ and _[TCP].seq == 12345678)


############
############
+ SNMP tests

= SNMP assembling
~ SNMP ASN1
str(SNMP())
gpotter2's avatar
gpotter2 committed
assert(_ == b'0\x18\x02\x01\x01\x04\x06public\xa0\x0b\x02\x01\x00\x02\x01\x00\x02\x01\x000\x00')
Phil's avatar
Phil committed
SNMP(version="v2c", community="ABC", PDU=SNMPbulk(id=4,varbindlist=[SNMPvarbind(oid="1.2.3.4",value=ASN1_INTEGER(7)),SNMPvarbind(oid="4.3.2.1.2.3",value=ASN1_IA5_STRING("testing123"))]))
str(_)
gpotter2's avatar
gpotter2 committed
assert(_ == b'05\x02\x01\x01\x04\x03ABC\xa5+\x02\x01\x04\x02\x01\x00\x02\x01\x000 0\x08\x06\x03*\x03\x04\x02\x01\x070\x14\x06\x06\x81#\x02\x01\x02\x03\x16\ntesting123')
Phil's avatar
Phil committed

= SNMP disassembling
~ SNMP ASN1
gpotter2's avatar
gpotter2 committed
x=SNMP(b'0y\x02\x01\x00\x04\x06public\xa2l\x02\x01)\x02\x01\x00\x02\x01\x000a0!\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x07\n\x86\xde\xb78\x04\x0b172.31.19.20#\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x07\n\x86\xde\xb76\x04\r255.255.255.00\x17\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x05\n\x86\xde\xb9`\x02\x01\x01')
Phil's avatar
Phil committed
x.show()
assert(x.community=="public" and x.version == 0)
assert(x.PDU.id == 41 and len(x.PDU.varbindlist) == 3)
assert(x.PDU.varbindlist[0].oid == "1.3.6.1.4.1.253.8.64.4.2.1.7.10.14130104")
assert(x.PDU.varbindlist[0].value == "172.31.19.2")
assert(x.PDU.varbindlist[2].oid == "1.3.6.1.4.1.253.8.64.4.2.1.5.10.14130400")
assert(x.PDU.varbindlist[2].value == 1)


############
############
+ Network tests

* Those tests need network access

= Sending and receiving an ICMP
~ netaccess IP ICMP
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3)
conf.debug_dissector = old_debug_dissector
Phil's avatar
Phil committed
x
Pierre LALET's avatar
Pierre LALET committed
assert x[IP].ottl() in [32, 64, 128, 255]
assert 0 <= x[IP].hops() <= 126
Phil's avatar
Phil committed
x is not None and ICMP in x and x[ICMP].type == 0

= DNS request
~ netaccess IP UDP DNS
* A possible cause of failure could be that the open DNS (resolver1.opendns.com)
* is not reachable or down.
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
Phil's avatar
Phil committed
dns_ans = sr1(IP(dst="resolver1.opendns.com")/UDP()/DNS(rd=1,qd=DNSQR(qname="www.slashdot.com")),timeout=5)
conf.debug_dissector = old_debug_dissector
Phil's avatar
Phil committed
DNS in dns_ans

Pierre LALET's avatar
Pierre LALET committed
= Whois request
~ netaccess IP
* This test retries on failure because it often fails
import time
import socket
success = False
for i in xrange(5):
    try:
        IP(src="8.8.8.8").whois()
Pierre LALET's avatar
Pierre LALET committed
    except socket.error:
        time.sleep(2)
    else:
        success = True
        break

assert success
Guillaume Valadon's avatar
Guillaume Valadon committed
= AS resolvers
~ netaccess IP
* This test retries on failure because it often fails

ret = list()
success = False
for i in xrange(5):
    try:
        ret = conf.AS_resolver.resolve("8.8.8.8", "8.8.4.4")
gpotter2's avatar
gpotter2 committed
    except RuntimeError:
        time.sleep(2)
    else:
        success = True
        break
Guillaume Valadon's avatar
Guillaume Valadon committed

assert (len(ret) == 2)

gpotter2's avatar
gpotter2 committed
all(x[1] == "AS15169" for x in ret)
= AS resolver - IPv6
~ netaccess IP
* This test retries on failure because it often fails

ret = list()
success = False
as_resolver6 = AS_resolver6()
for i in xrange(5):
    try:
        ret = as_resolver6.resolve("2001:4860:4860::8888", "2001:4860:4860::4444")
    except socket.error:
        time.sleep(2)
    else:
        success = True
        break

assert (len(ret) == 2)
assert all(x[1] == 15169 for x in ret)
success = False
for i in xrange(5):
    try:
        ret = AS_resolver_riswhois().resolve("8.8.8.8")
    except socket.error:
        time.sleep(2)
    else:
        success = True
        break

assert (len(ret) == 1)
assert all(x[1] == "AS15169" for x in ret)

# This test is too buggy
#success = False
#for i in xrange(5):
#    try:
#        ret = AS_resolver_cymru().resolve("8.8.8.8")
#    except socket.error:
#        time.sleep(2)
#    else:
#        success = True
#        break
#
#assert (len(ret) == 1)
#
#all(x[1] == "AS15169" for x in ret)
Phil's avatar
Phil committed

############
############
+ More complex tests

= Implicit logic
~ IP TCP
a=IP(ttl=(5,10))/TCP(dport=[80,443])
Phil's avatar
Phil committed
[p for p in a]
len(_) == 12


############
############
+ Real usages

= Port scan
~ netaccess IP TCP
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
Phil's avatar
Phil committed
ans,unans=sr(IP(dst="www.google.com/30")/TCP(dport=[80,443]),timeout=2)
conf.debug_dissector = old_debug_dissector
Phil's avatar
Phil committed
ans.make_table(lambda (s,r): (s.dst, s.dport, r.sprintf("{TCP:%TCP.flags%}{ICMP:%ICMP.code%}")))

= Traceroute function
~ netaccess
* Let's test traceroute
traceroute("www.slashdot.org")
ans,unans=_

= Result manipulation
~ netaccess
ans.nsummary()
s,r=ans[0]
s.show()
s.show(2)

= DNS packet manipulation
Phil's avatar
Phil committed
dns_ans.show()
dns_ans.show2()
dns_ans[DNS].an.show()
dns_ans2 = IP(str(dns_ans))
DNS in dns_ans2
assert(str(dns_ans2) == str(dns_ans))
dns_ans2.qd.qname = "www.secdev.org."
* We need to recalculate these values
del(dns_ans2[IP].len)
del(dns_ans2[IP].chksum)
del(dns_ans2[UDP].len)
del(dns_ans2[UDP].chksum)
gpotter2's avatar
gpotter2 committed
assert(b"\x03www\x06secdev\x03org\x00" in str(dns_ans2))
assert(DNS in IP(str(dns_ans2)))
Phil's avatar
Phil committed

= Arping
~ netaccess
* This test assumes the local network is a /24. This is bad.
conf.route.route("0.0.0.0")[2]
arping(_+"/24")

= send() and sniff()
import time
import os
def _send_or_sniff(pkt, timeout, flt, pid, fork, t_other=None):
    assert pid != -1
    if pid == 0:
        time.sleep(1)
        (sendp if isinstance(pkt, (Ether, Dot3)) else send)(pkt)
        if fork:
            os._exit(0)
        else:
            return
    else:
        spkt = str(pkt)
        # We do not want to crash when a packet cannot be parsed
        old_debug_dissector = conf.debug_dissector
        conf.debug_dissector = False
        pkts = sniff(
            timeout=timeout, filter=flt,
            stop_filter=lambda p: pkt.__class__ in p and str(p[pkt.__class__]) == spkt
        )
        conf.debug_dissector = old_debug_dissector
        if fork:
            os.waitpid(pid, 0)
        else:
            t_other.join()
    assert str(pkt) in (str(p[pkt.__class__]) for p in pkts if pkt.__class__ in p)

def send_and_sniff(pkt, timeout=2, flt=None):
    """Send a packet, sniff, and check the packet has been seen"""
    if hasattr(os, "fork"):
        _send_or_sniff(pkt, timeout, flt, os.fork(), True)
    else:
        from threading import Thread
        def run_function(pkt, timeout, flt, pid, thread, results):
            _send_or_sniff(pkt, timeout, flt, pid, False, t_other=thread)
            results.put(True)
        results = Queue.Queue()
        t_parent = Thread(target=run_function, args=(pkt, timeout, flt, 0, None, results))
        t_child = Thread(target=run_function, args=(pkt, timeout, flt, 1, t_parent, results))
        t_parent.start()
        t_child.start()
        t_parent.join()
        t_child.join()
        assert results.qsize() >= 2
        while not results.empty():
            assert results.get()

send_and_sniff(IP(dst="secdev.org")/ICMP())
send_and_sniff(IP(dst="secdev.org")/ICMP(), flt="icmp")
send_and_sniff(Ether()/IP(dst="secdev.org")/ICMP())

############
############
+ ManuFDB tests

= __repr__

if conf.manufdb:
    len(conf.manufdb)
else:
    True

= check _resolve_MAC

if conf.manufdb:
    assert conf.manufdb._resolve_MAC("00:00:63") == "HP"
else:
    True
Phil's avatar
Phil committed

############
############
+ Automaton tests

= Simple automaton
~ automaton
class ATMT1(Automaton):
    def parse_args(self, init, *args, **kargs):
        Automaton.parse_args(self, *args, **kargs)
        self.init = init
    @ATMT.state(initial=1)
    def BEGIN(self):
gpotter2's avatar
gpotter2 committed
        raise self.MAIN(self.init)
Phil's avatar
Phil committed
    @ATMT.state()
    def MAIN(self, s):
        return s
    @ATMT.condition(MAIN, prio=-1)
    def go_to_END(self, s):
        if len(s) > 20:
            raise self.END(s).action_parameters(s)
    @ATMT.condition(MAIN)
    def trA(self, s):
        if s.endswith("b"):
            raise self.MAIN(s+"a")
    @ATMT.condition(MAIN)
    def trB(self, s):
        if s.endswith("a"):
            raise self.MAIN(s*2+"b")
    @ATMT.state(final=1)
    def END(self, s):
        return s
    @ATMT.action(go_to_END)
    def action_test(self, s):
        self.result = s
    
= Simple automaton Tests
~ automaton

a=ATMT1(init="a", ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'aabaaababaaabaaababab' )
a.result
assert( _ == 'aabaaababaaabaaababab' )
a=ATMT1(init="b", ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'babababababababababababababab' )
a.result
assert( _ == 'babababababababababababababab' )

= Simple automaton stuck test
~ automaton

try:    
    ATMT1(init="", ll=lambda: None, recvsock=lambda: None).run()
Phil's avatar
Phil committed
except Automaton.Stuck:
    True
else:
    False


= Automaton state overloading
~ automaton
class ATMT2(ATMT1):
    @ATMT.state()
    def MAIN(self, s):
        return "c"+ATMT1.MAIN(self, s).run()

a=ATMT2(init="a", ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'ccccccacabacccacababacccccacabacccacababab' )


a.result
assert( _ == 'ccccccacabacccacababacccccacabacccacababab' )
a=ATMT2(init="b", ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'cccccbaccbabaccccbaccbabab')
a.result
assert( _ == 'cccccbaccbabaccccbaccbabab')


= Automaton condition overloading
~ automaton
class ATMT3(ATMT2):
    @ATMT.condition(ATMT1.MAIN)
    def trA(self, s):
        if s.endswith("b"):
            raise self.MAIN(s+"da")