Skip to content
Snippets Groups Projects
regression.uts 305 KiB
Newer Older
Phil's avatar
Phil committed
% Regression tests for Scapy

# More informations at http://www.secdev.org/projects/UTscapy/
# $Id: regression.uts,v 1.9 2008/07/29 15:30:29 pbi Exp pbi $

############
############
+ Informations on Scapy

= Get conf
~ conf command
* Dump the current configuration
conf

= List layers
~ conf command
ls()

= List commands
~ conf command
lsc()

= Configuration
~ conf
conf.debug_dissector = True
Phil's avatar
Phil committed


############
############
+ Scapy functions tests

= Interface related functions

get_if_raw_hwaddr(conf.iface)

get_if_raw_addr(conf.iface).encode("hex")

get_if_list()

= Test read_routes6() - default output

routes6 = read_routes6()
# Expected results:
# - one route if there is only the loopback interface
# - three routes if there is a network interface

if len(routes6):
    iflist = get_if_list()
    if iflist == [LOOPBACK_NAME]:
	len(routes6) == 1
    elif len(iflist) >= 2:
	len(routes6) >= 3
    else:
	False
else:
    # IPv6 seems disabled. Force a route to ::1
    conf.route6.routes.append(("::1", 128, "::", LOOPBACK_NAME, ["::1"]))
    True

= Test read_routes6() - check mandatory routes

if len(routes6):
    assert(len(filter(lambda r: r[0] == "::1" and r[-1] == ["::1"], routes6)) >= 1)
    if iflist >= 2:
	assert(len(filter(lambda r: r[0] == "fe80::" and r[1] == 64, routes6)) >= 1)
	len(filter(lambda r: in6_islladdr(r[0]) and r[1] == 128 and r[-1] == ["::1"], routes6)) >= 1
else:
    True
Phil's avatar
Phil committed
############
############
+ Basic tests

* Those test are here mainly to check nothing has been broken
* and to catch Exceptions

= Building some packets packet
~ basic IP TCP UDP NTP LLC SNAP Dot11
IP()/TCP()
Ether()/IP()/UDP()/NTP()
Dot11()/LLC()/SNAP()/IP()/TCP()/"XXX"
IP(ttl=25)/TCP(sport=12, dport=42)

= Manipulating some packets
~ basic IP TCP
a=IP(ttl=4)/TCP()
a.ttl
a.ttl=10
del(a.ttl)
a.ttl
TCP in a
a[TCP]
a[TCP].dport=[80,443]
a
assert(a.copy().time == a.time)
Phil's avatar
Phil committed
a=3


= Checking overloads
~ basic IP TCP Ether
a=Ether()/IP()/TCP()
a.proto
_ == 6


= sprintf() function
~ basic sprintf Ether IP UDP NTP
a=Ether()/IP()/IP(ttl=4)/UDP()/NTP()
a.sprintf("%type% %IP.ttl% %#05xr,UDP.sport% %IP:2.ttl%")
_ in [ '0x800 64 0x07b 4', 'IPv4 64 0x07b 4']


= sprintf() function 
~ basic sprintf IP TCP SNAP LLC Dot11
* This test is on the conditionnal substring feature of <tt>sprintf()</tt>
a=Dot11()/LLC()/SNAP()/IP()/TCP()
a.sprintf("{IP:{TCP:flags=%TCP.flags%}{UDP:port=%UDP.ports%} %IP.src%}")
_ == 'flags=S 127.0.0.1'


= haslayer function
~ basic haslayer IP TCP ICMP ISAKMP
x=IP(id=1)/ISAKMP_payload_SA(prop=ISAKMP_payload_SA(prop=IP()/ICMP()))/TCP()
TCP in x, ICMP in x, IP in x, UDP in x
_ == (True,True,True,False)

= getlayer function
~ basic getlayer IP ISAKMP UDP
x=IP(id=1)/ISAKMP_payload_SA(prop=IP(id=2)/UDP(dport=1))/IP(id=3)/UDP(dport=2)
x[IP]
x[IP:2]
x[IP:3]
x.getlayer(IP,3)
x.getlayer(IP,4)
x[UDP]
x[UDP:1]
x[UDP:2]
assert(x[IP].id == 1 and x[IP:2].id == 2 and x[IP:3].id == 3 and 
       x.getlayer(IP).id == 1 and x.getlayer(IP,3).id == 3 and
       x.getlayer(IP,4) == None and
       x[UDP].dport == 1 and x[UDP:2].dport == 2)
try:
    x[IP:4]
except IndexError:
    True
else:
    False

= equality
~ basic
w=Ether()/IP()/UDP(dport=53)
x=Ether()/IP(version=4)/UDP()
Phil's avatar
Phil committed
y=Ether()/IP()/UDP(dport=4)
z=Ether()/IP()/UDP()/NTP()
t=Ether()/IP()/TCP()
x==y, x==z, x==t, y==z, y==t, z==t, w==x
_ == (False, False, False, False, False, False, True)

= answers
~ basic
a1, a2 = "1.2.3.4", "5.6.7.8"
p1 = IP(src=a1, dst=a2)/ICMP(type=8)
p2 = IP(src=a2, dst=a1)/ICMP(type=0)
assert p1.hashret() == p2.hashret()
assert not p1.answers(p2)
assert p2.answers(p1)
conf_back = conf.checkIPinIP
conf.checkIPinIP = True
px = [IP()/p1, IPv6()/p1]
assert not any(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert not any(p2.answers(p) for p in px)
conf.checkIPinIP = False
assert all(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert all(p2.answers(p) for p in px)
conf.checkIPinIP = conf_back

Phil's avatar
Phil committed

############
############
+ Tests on padding

= Padding assembly
str(Padding("abc"))
assert( _ == "abc" )
str(Padding("abc")/Padding("def"))
assert( _ == "abcdef" )
str(Raw("ABC")/Padding("abc")/Padding("def"))
assert( _ == "ABCabcdef" )
str(Raw("ABC")/Padding("abc")/Raw("DEF")/Padding("def"))
Phil's avatar
Phil committed
assert( _ == "ABCDEFabcdef" )
Phil's avatar
Phil committed

= Padding and length computation
IP(str(IP()/Padding("abc")))
assert( _.len == 20 and len(_) == 23 )
IP(str(IP()/Raw("ABC")/Padding("abc")))
assert( _.len == 23 and len(_) == 26 )
IP(str(IP()/Raw("ABC")/Padding("abc")/Padding("def")))
assert( _.len == 23 and len(_) == 29 )

= PadField test
~ PadField padding

class TestPad(Packet):
    fields_desc = [ PadField(StrNullField("st", ""),4), StrField("id", "")]

TestPad() == TestPad(str(TestPad()))
Phil's avatar
Phil committed


############
############
+ Tests on basic fields

#= Field class
#~ core field
#Field("foo", None, fmt="H").i2m(None,0xabcdef)
#assert( _ == "\xcd\xef" )
#Field("foo", None, fmt="<I").i2m(None,0x12cdef)
#assert( _ == "\xef\xcd\x12\x00" )
#Field("foo", None, fmt="B").addfield(None, "FOO", 0x12)
#assert( _ == "FOO\x12" )
#Field("foo", None, fmt="I").getfield(None, "\x12\x34\x56\x78ABCD")
#assert( _ == ("ABCD",0x12345678) )
#
#= ConditionnalField class
#~ core field
#False

= MACField class
~ core field
m = MACField("foo", None)
m.i2m(None, None)
assert( _ == "\x00\x00\x00\x00\x00\x00" )
m.getfield(None, "\xc0\x01\xbe\xef\xba\xbeABCD")
assert( _ == ("ABCD","c0:01:be:ef:ba:be") )
m.addfield(None, "FOO", "c0:01:be:ef:ba:be")
assert( _ == "FOO\xc0\x01\xbe\xef\xba\xbe" )

= IPField class
~ core field
i = IPField("foo", None)
i.i2m(None, "1.2.3.4")
assert( _ == "\x01\x02\x03\x04" )
i.i2m(None, "255.255.255.255")
assert( _ == "\xff\xff\xff\xff" )
i.m2i(None, "\x01\x02\x03\x04")
assert( _ == "1.2.3.4" )
i.getfield(None, "\x01\x02\x03\x04ABCD")
assert( _ == ("ABCD","1.2.3.4") )
i.addfield(None, "FOO", "1.2.3.4")
assert( _ == "FOO\x01\x02\x03\x04" )

#= ByteField
#~ core field
#b = ByteField("foo", None)
#b.i2m("
#b.getfield

############
############
+ Tests on default value changes mechanism
Phil's avatar
Phil committed

= Creation of an IPv3 class from IP class with different default values
class IPv3(IP):
    version = 3
    ttl = 32

= Test of IPv3 class
a = IPv3()
a.version, a.ttl
assert(_ == (3,32))
str(a)
assert(_ == '5\x00\x00\x14\x00\x01\x00\x00 \x00\xac\xe7\x7f\x00\x00\x01\x7f\x00\x00\x01')


############
############
+ Tests on ActionField

= Creation of a layer with ActionField
~ field actionfield

class TestAction(Packet):
    __slots__ = ["_val", "_fld", "_priv1", "_priv2"]
Phil's avatar
Phil committed
    name = "TestAction"
    fields_desc = [ ActionField(ByteField("tst", 3), "my_action", priv1=1, priv2=2) ]
    def __init__(self, *args, **kargs):
        self._val, self._fld, self._priv1, self._priv2 = None, None, None, None
        super(TestAction, self).__init__(*args, **kargs)
Phil's avatar
Phil committed
    def my_action(self, val, fld, priv1, priv2):
        print "Action (%i)!" %val
        self._val, self._fld, self._priv1, self._priv2 = val, fld, priv1, priv2
Phil's avatar
Phil committed

= Triggering action
~ field actionfield

t = TestAction()
assert(t._val == t._fld == t._priv1 == t._priv2 == None)
t.tst=42
assert(t._priv1 == 1)
assert(t._priv2 == 2)
assert(t._val == 42)


############
############
+ Tests on FieldLenField

= Creation of a layer with FieldLenField
~ field 
class TestFLenF(Packet):
    fields_desc = [ FieldLenField("len", None, length_of="str", fmt="B", adjust=lambda pkt,x:x+1),
                    StrLenField("str", "default", length_from=lambda pkt:pkt.len-1,) ]

= Assembly of an empty packet
~ field
TestFLenF()
str(_)
_ == "\x08default"

= Assembly of non empty packet
~ field
TestFLenF(str="123")
str(_)
_ == "\x04123"

= Disassembly
~ field
TestFLenF("\x04ABCDEFGHIJKL")
_
_.len == 4 and _.str == "ABC" and Raw in _


= BitFieldLenField test
~ field
class TestBFLenF(Packet):
    fields_desc = [ BitFieldLenField("len", None, 4, length_of="str" , adjust=lambda pkt,x:x+1),
                    BitField("nothing",0xfff, 12),
                    StrLenField("str", "default", length_from=lambda pkt:pkt.len-1, ) ]

a=TestBFLenF()
str(a)
assert( _ == "\x8f\xffdefault" )

a.str=""
str(a)
assert( _ == "\x1f\xff" )

TestBFLenF("\x1f\xff@@")
assert( _.len == 1 and _.str == "" and Raw in _ and _[Raw].load == "@@" )

TestBFLenF("\x6f\xffabcdeFGH")
assert( _.len == 6 and _.str == "abcde" and Raw in _ and _[Raw].load == "FGH" )



############
############
+ Tests on FieldListField

= Creation of a layer
~ field
class TestFLF(Packet):
    name="test"
    fields_desc = [ FieldLenField("len", None, count_of="lst", fmt="B"),
                    FieldListField("lst", None, IntField("elt",0), count_from=lambda pkt:pkt.len)
                   ]

= Assembly of an empty packet
~ field
a = TestFLF()
str(a)

= Assembly of a non-empty packet
~ field
a = TestFLF()
a.lst = [7,65539]
ls(a)
str(a)
import struct
Phil's avatar
Phil committed
_ == struct.pack("!BII", 2,7,65539)

= Disassemble
~ field
import struct
Phil's avatar
Phil committed
TestFLF("\x00\x11\x12")
assert(_.len == 0 and Raw in _ and _[Raw].load == "\x11\x12")
TestFLF(struct.pack("!BIII",3,1234,2345,12345678))
assert(_.len == 3 and _.lst == [1234,2345,12345678])

= Manipulate
~ field
a = TestFLF(lst=[4])
str(a)
assert(_ == "\x01\x00\x00\x00\x04")
a.lst.append(1234)
TestFLF(str(a))
a.show2()
a.len=7
str(a)
assert(_ == "\x07\x00\x00\x00\x04\x00\x00\x04\xd2")
a.len=2
a.lst=[1,2,3,4,5]
TestFLF(str(a))
assert(Raw in _ and _[Raw].load == '\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00\x05') 


############
############
+ PacketListField 

= Create a layer
~ field lengthfield
class TestPLF(Packet):
    name="test"
    fields_desc=[ FieldLenField("len", None, count_of="plist"),
                  PacketListField("plist", None, IP, count_from=lambda pkt:pkt.len,) ]

= Test the PacketListField assembly
~ field lengthfield
x=TestPLF()
str(x)
_ == "\x00\x00"

= Test the PacketListField assembly 2
~ field lengthfield
x=TestPLF()
x.plist=[IP()/TCP(), IP()/UDP()]
str(x)
_.startswith('\x00\x02E')

= Test disassembly
~ field lengthfield
x=TestPLF(plist=[IP()/TCP(seq=1234567), IP()/UDP()])
TestPLF(str(x))
_.show()
IP in _ and TCP in _ and UDP in _ and _[TCP].seq == 1234567

= Nested PacketListField
~ field lengthfield
y=IP()/TCP(seq=111111)/TestPLF(plist=[IP()/TCP(seq=222222),IP()/UDP()])
TestPLF(plist=[y,IP()/TCP(seq=333333)])
_.show()
IP in _ and TCP in _ and UDP in _ and _[TCP].seq == 111111 and _[TCP:2].seq==222222 and _[TCP:3].seq == 333333

############
############
+ PacketListField tests

= Create a layer
~ field lengthfield
class TestPLF(Packet):
    name="test"
    fields_desc=[ FieldLenField("len", None, count_of="plist"),
                  PacketListField("plist", None, IP, count_from=lambda pkt:pkt.len) ]

= Test the PacketListField assembly
~ field lengthfield
x=TestPLF()
str(x)
_ == "\x00\x00"

= Test the PacketListField assembly 2
~ field lengthfield
x=TestPLF()
x.plist=[IP()/TCP(), IP()/UDP()]
str(x)
_.startswith('\x00\x02E')

= Test disassembly
~ field lengthfield
x=TestPLF(plist=[IP()/TCP(seq=1234567), IP()/UDP()])
TestPLF(str(x))
_.show()
IP in _ and TCP in _ and UDP in _ and _[TCP].seq == 1234567

= Nested PacketListField
~ field lengthfield
y=IP()/TCP(seq=111111)/TestPLF(plist=[IP()/TCP(seq=222222),IP()/UDP()])
TestPLF(plist=[y,IP()/TCP(seq=333333)])
_.show()
IP in _ and TCP in _ and UDP in _ and _[TCP].seq == 111111 and _[TCP:2].seq==222222 and _[TCP:3].seq == 333333

= Complex packet
~ field lengthfield ccc
class TestPkt(Packet):
    fields_desc = [ ByteField("f1",65),
                    ShortField("f2",0x4244) ]
    def extract_padding(self, p):
        return "", p

class TestPLF2(Packet):
    fields_desc = [ FieldLenField("len1", None, count_of="plist",fmt="H", adjust=lambda pkt,x:x+2),
                    FieldLenField("len2", None, length_of="plist",fmt="I", adjust=lambda pkt,x:(x+1)/2),
                    PacketListField("plist", None, TestPkt, length_from=lambda x:(x.len2*2)/3*3) ]

a=TestPLF2()
str(a)
assert( _ == "\x00\x02\x00\x00\x00\x00" )

a.plist=[TestPkt(),TestPkt(f1=100)] 
str(a)
assert(_ == '\x00\x04\x00\x00\x00\x03ABDdBD')

a /= "123456"
b = TestPLF2(str(a))
b.show()
assert(b.len1 == 4 and b.len2 == 3)
assert(b[TestPkt].f1 == 65 and b[TestPkt].f2 == 0x4244)
assert(b[TestPkt:2].f1 == 100)
assert(Raw in b and b[Raw].load == "123456")

a.plist.append(TestPkt(f1=200))
b = TestPLF2(str(a))
b.show()
assert(b.len1 == 5 and b.len2 == 5)
assert(b[TestPkt].f1 == 65 and b[TestPkt].f2 == 0x4244)
assert(b[TestPkt:2].f1 == 100)
assert(b[TestPkt:3].f1 == 200)
assert(b.getlayer(TestPkt,4) is None)
assert(Raw in b and b[Raw].load == "123456")
hexdiff(a,b)
assert( str(a) == str(b) )




############
############
+ ISAKMP transforms test

= ISAKMP creation
~ IP UDP ISAKMP 
p=IP(src='192.168.8.14',dst='10.0.0.1')/UDP()/ISAKMP()/ISAKMP_payload_SA(prop=ISAKMP_payload_Proposal(trans=ISAKMP_payload_Transform(transforms=[('Encryption', 'AES-CBC'), ('Hash', 'MD5'), ('Authentication', 'PSK'), ('GroupDesc', '1536MODPgr'), ('KeyLength', 256), ('LifeType', 'Seconds'), ('LifeDuration', 86400L)])/ISAKMP_payload_Transform(res2=12345,transforms=[('Encryption', '3DES-CBC'), ('Hash', 'SHA'), ('Authentication', 'PSK'), ('GroupDesc', '1024MODPgr'), ('LifeType', 'Seconds'), ('LifeDuration', 86400L)])))
p.show()
p


= ISAKMP manipulation
~ ISAKMP
p[ISAKMP_payload_Transform:2]
_.res2 == 12345

= ISAKMP assembly
~ ISAKMP 
hexdump(p)
str(p) == "E\x00\x00\x96\x00\x01\x00\x00@\x11\xa7\x9f\xc0\xa8\x08\x0e\n\x00\x00\x01\x01\xf4\x01\xf4\x00\x82\xbf\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00z\x00\x00\x00^\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00R\x01\x01\x00\x00\x03\x00\x00'\x00\x01\x00\x00\x80\x01\x00\x07\x80\x02\x00\x01\x80\x03\x00\x01\x80\x04\x00\x05\x80\x0e\x01\x00\x80\x0b\x00\x01\x00\x0c\x00\x03\x01Q\x80\x00\x00\x00#\x00\x0109\x80\x01\x00\x05\x80\x02\x00\x02\x80\x03\x00\x01\x80\x04\x00\x02\x80\x0b\x00\x01\x00\x0c\x00\x03\x01Q\x80"


= ISAKMP disassembly
~ ISAKMP
q=IP(str(p))
q.show()
q[ISAKMP_payload_Transform:2]
_.res2 == 12345


############
############
+ TFTP tests

= TFTP Options
x=IP()/UDP(sport=12345)/TFTP()/TFTP_RRQ(filename="fname")/TFTP_Options(options=[TFTP_Option(oname="blksize", value="8192"),TFTP_Option(oname="other", value="othervalue")])
assert( str(x) == 'E\x00\x00H\x00\x01\x00\x00@\x11|\xa2\x7f\x00\x00\x01\x7f\x00\x00\x0109\x00E\x004B6\x00\x01fname\x00octet\x00blksize\x008192\x00other\x00othervalue\x00' )
y=IP(str(x))
y[TFTP_Option].oname
y[TFTP_Option:2].oname
assert(len(y[TFTP_Options].options) == 2 and y[TFTP_Option].oname == "blksize")


############
############
+ Dot11 tests


= WEP tests
~ wifi wep Dot11 LLC SNAP IP TCP
conf.wepkey = "ABCDEFGH"
str(Dot11WEP()/LLC()/SNAP()/IP()/TCP(seq=12345678))
assert(_ == '\x00\x00\x00\x00\x1e\xafK5G\x94\xd4m\x81\xdav\xd4,c\xf1\xfe{\xfc\xba\xd6;T\x93\xd0\t\xdb\xfc\xa5\xb9\x85\xce\x05b\x1cC\x10\xd7p\xde22&\xf0\xbcUS\x99\x83Z\\D\xa6')
Dot11WEP(_)
assert(TCP in _ and _[TCP].seq == 12345678)


############
############
+ SNMP tests

= SNMP assembling
~ SNMP ASN1
str(SNMP())
assert(_ == '0\x18\x02\x01\x01\x04\x06public\xa0\x0b\x02\x01\x00\x02\x01\x00\x02\x01\x000\x00')
SNMP(version="v2c", community="ABC", PDU=SNMPbulk(id=4,varbindlist=[SNMPvarbind(oid="1.2.3.4",value=ASN1_INTEGER(7)),SNMPvarbind(oid="4.3.2.1.2.3",value=ASN1_IA5_STRING("testing123"))]))
str(_)
assert(_ == '05\x02\x01\x01\x04\x03ABC\xa5+\x02\x01\x04\x02\x01\x00\x02\x01\x000 0\x08\x06\x03*\x03\x04\x02\x01\x070\x14\x06\x06\x81#\x02\x01\x02\x03\x16\ntesting123')

= SNMP disassembling
~ SNMP ASN1
x=SNMP('0y\x02\x01\x00\x04\x06public\xa2l\x02\x01)\x02\x01\x00\x02\x01\x000a0!\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x07\n\x86\xde\xb78\x04\x0b172.31.19.20#\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x07\n\x86\xde\xb76\x04\r255.255.255.00\x17\x06\x12+\x06\x01\x04\x01\x81}\x08@\x04\x02\x01\x05\n\x86\xde\xb9`\x02\x01\x01')
x.show()
assert(x.community=="public" and x.version == 0)
assert(x.PDU.id == 41 and len(x.PDU.varbindlist) == 3)
assert(x.PDU.varbindlist[0].oid == "1.3.6.1.4.1.253.8.64.4.2.1.7.10.14130104")
assert(x.PDU.varbindlist[0].value == "172.31.19.2")
assert(x.PDU.varbindlist[2].oid == "1.3.6.1.4.1.253.8.64.4.2.1.5.10.14130400")
assert(x.PDU.varbindlist[2].value == 1)


############
############
+ Network tests

* Those tests need network access

= Sending and receiving an ICMP
~ netaccess IP ICMP
x=sr1(IP(dst="www.google.com")/ICMP(),timeout=3)
x
x is not None and ICMP in x and x[ICMP].type == 0

= DNS request
~ netaccess IP UDP DNS
* A possible cause of failure could be that the open DNS (resolver1.opendns.com)
* is not reachable or down.
dns_ans = sr1(IP(dst="resolver1.opendns.com")/UDP()/DNS(rd=1,qd=DNSQR(qname="www.slashdot.com")),timeout=5)
DNS in dns_ans


############
############
+ More complex tests

= Implicit logic
~ IP TCP
a=IP(ttl=(5,10))/TCP(dport=[80,443])
[p for p in a]
len(_) == 12


############
############
+ Real usages

= Port scan
~ netaccess IP TCP
ans,unans=sr(IP(dst="www.google.com/30")/TCP(dport=[80,443]),timeout=2)
ans.make_table(lambda (s,r): (s.dst, s.dport, r.sprintf("{TCP:%TCP.flags%}{ICMP:%ICMP.code%}")))

= Traceroute function
~ netaccess
* Let's test traceroute
traceroute("www.slashdot.org")
ans,unans=_

= Result manipulation
~ netaccess
ans.nsummary()
s,r=ans[0]
s.show()
s.show(2)

= DNS packet manipulation
Phil's avatar
Phil committed
dns_ans.show()
dns_ans.show2()
dns_ans[DNS].an.show()
dns_ans2 = IP(str(dns_ans))
DNS in dns_ans2
assert(str(dns_ans2) == str(dns_ans))
dns_ans2.qd.qname = "www.secdev.org."
* We need to recalculate these values
del(dns_ans2[IP].len)
del(dns_ans2[IP].chksum)
del(dns_ans2[UDP].len)
del(dns_ans2[UDP].chksum)
assert("\x03www\x06secdev\x03org\x00" in str(dns_ans2))
assert(DNS in IP(str(dns_ans2)))
Phil's avatar
Phil committed

= Arping
~ netaccess
* This test assumes the local network is a /24. This is bad.
conf.route.route("0.0.0.0")[2]
arping(_+"/24")


############
############
+ Automaton tests

= Simple automaton
~ automaton
class ATMT1(Automaton):
    def parse_args(self, init, *args, **kargs):
        Automaton.parse_args(self, *args, **kargs)
        self.init = init
    @ATMT.state(initial=1)
    def BEGIN(self):
      	raise self.MAIN(self.init)
    @ATMT.state()
    def MAIN(self, s):
        return s
    @ATMT.condition(MAIN, prio=-1)
    def go_to_END(self, s):
        if len(s) > 20:
            raise self.END(s).action_parameters(s)
    @ATMT.condition(MAIN)
    def trA(self, s):
        if s.endswith("b"):
            raise self.MAIN(s+"a")
    @ATMT.condition(MAIN)
    def trB(self, s):
        if s.endswith("a"):
            raise self.MAIN(s*2+"b")
    @ATMT.state(final=1)
    def END(self, s):
        return s
    @ATMT.action(go_to_END)
    def action_test(self, s):
        self.result = s
    
= Simple automaton Tests
~ automaton

a=ATMT1(init="a", ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'aabaaababaaabaaababab' )
a.result
assert( _ == 'aabaaababaaabaaababab' )
a=ATMT1(init="b", ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'babababababababababababababab' )
a.result
assert( _ == 'babababababababababababababab' )

= Simple automaton stuck test
~ automaton

try:    
    ATMT1(init="", ll=lambda: None, recvsock=lambda: None).run()
Phil's avatar
Phil committed
except Automaton.Stuck:
    True
else:
    False


= Automaton state overloading
~ automaton
class ATMT2(ATMT1):
    @ATMT.state()
    def MAIN(self, s):
        return "c"+ATMT1.MAIN(self, s).run()

a=ATMT2(init="a", ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'ccccccacabacccacababacccccacabacccacababab' )


a.result
assert( _ == 'ccccccacabacccacababacccccacabacccacababab' )
a=ATMT2(init="b", ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'cccccbaccbabaccccbaccbabab')
a.result
assert( _ == 'cccccbaccbabaccccbaccbabab')


= Automaton condition overloading
~ automaton
class ATMT3(ATMT2):
    @ATMT.condition(ATMT1.MAIN)
    def trA(self, s):
        if s.endswith("b"):
            raise self.MAIN(s+"da")


a=ATMT3(init="a", debug=2, ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'cccccacabdacccacabdabda')
a.result
assert( _ == 'cccccacabdacccacabdabda')
a=ATMT3(init="b", ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'cccccbdaccbdabdaccccbdaccbdabdab' )

a.result
assert( _ == 'cccccbdaccbdabdaccccbdaccbdabdab' )


= Automaton action overloading
~ automaton
class ATMT4(ATMT3):
    @ATMT.action(ATMT1.go_to_END)
    def action_test(self, s):
        self.result = "e"+s+"e"

a=ATMT4(init="a", ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'cccccacabdacccacabdabda')
a.result
assert( _ == 'ecccccacabdacccacabdabdae')
a=ATMT4(init="b", ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'cccccbdaccbdabdaccccbdaccbdabdab' )
a.result
assert( _ == 'ecccccbdaccbdabdaccccbdaccbdabdabe' )


= Automaton priorities
~ automaton
class ATMT5(Automaton):
    @ATMT.state(initial=1)
    def BEGIN(self):
        self.res = "J"
    @ATMT.condition(BEGIN, prio=1)
    def tr1(self):
        self.res += "i"
        raise self.END()
    @ATMT.condition(BEGIN)
    def tr2(self):
        self.res += "p"
    @ATMT.condition(BEGIN, prio=-1)
    def tr3(self):
        self.res += "u"
    
    @ATMT.action(tr1)
    def ac1(self):
        self.res += "e"
    @ATMT.action(tr1, prio=-1)
    def ac2(self):
        self.res += "t"
    @ATMT.action(tr1, prio=1)
    def ac3(self):
        self.res += "r"
   
    @ATMT.state(final=1)
    def END(self):
        return self.res

a=ATMT5(ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'Jupiter' )

= Automaton test same action for many conditions
~ automaton
class ATMT6(Automaton):
    @ATMT.state(initial=1)
    def BEGIN(self):
        self.res="M"
    @ATMT.condition(BEGIN)
    def tr1(self):
        raise self.MIDDLE()
    @ATMT.action(tr1) # default prio=0
    def add_e(self):
        self.res += "e"
    @ATMT.action(tr1, prio=2)
    def add_c(self):
        self.res += "c"
    @ATMT.state()
    def MIDDLE(self):
        self.res += "u"
    @ATMT.condition(MIDDLE)
    def tr2(self):
        raise self.END()
    @ATMT.action(tr2, prio=2)
    def add_y(self):
        self.res += "y"
    @ATMT.action(tr1, prio=1)
    @ATMT.action(tr2)
    def add_r(self):
        self.res += "r"
    @ATMT.state(final=1)
    def END(self):
        return self.res

a=ATMT6(ll=lambda: None, recvsock=lambda: None)
Phil's avatar
Phil committed
a.run()
assert( _ == 'Mercury' )
a.restart()
a.run()
assert( _ == 'Mercury' )

= Automaton test io event
~ automaton

class ATMT7(Automaton):
    @ATMT.state(initial=1)
    def BEGIN(self):
        self.res = "S"
    @ATMT.ioevent(BEGIN, name="tst")
    def tr1(self, fd):
        self.res += fd.recv()
        raise self.NEXT_STATE()
    @ATMT.state()
    def NEXT_STATE(self):
        self.oi.tst.send("ur")
    @ATMT.ioevent(NEXT_STATE, name="tst")
    def tr2(self, fd):
        self.res += fd.recv()
        raise self.END()
    @ATMT.state(final=1)
    def END(self):
        self.res += "n"
        return self.res

a=ATMT7(ll=lambda: None, recvsock=lambda: None)
a.run(wait=False)
a.io.tst.send("at")
a.io.tst.recv()
a.io.tst.send(_)
a.run()
assert( _ == "Saturn" )

a.restart()
a.run(wait=False)
a.io.tst.send("at")
a.io.tst.recv()
a.io.tst.send(_)
a.run()
assert( _ == "Saturn" )

= Automaton test io event from external fd
~ automaton
class ATMT8(Automaton):
    @ATMT.state(initial=1)
    def BEGIN(self):
        self.res = "U"
    @ATMT.ioevent(BEGIN, name="extfd")
    def tr1(self, fd):
        self.res += fd.read(2)
        raise self.NEXT_STATE()
    @ATMT.state()
    def NEXT_STATE(self):
        pass
    @ATMT.ioevent(NEXT_STATE, name="extfd")
    def tr2(self, fd):
        self.res += fd.read(2)
        raise self.END()
    @ATMT.state(final=1)
    def END(self):
        self.res += "s"
        return self.res

r,w = os.pipe()

a=ATMT8(external_fd={"extfd":r}, ll=lambda: None, recvsock=lambda: None)
a.run(wait=False)
os.write(w,"ra")
os.write(w,"nu")
a.run()
assert( _ == "Uranus" )

a.restart()
a.run(wait=False)
os.write(w,"ra")
os.write(w,"nu")
a.run()
assert( _ == "Uranus" )

= Automaton test interception_points, and restart
~ automaton
class ATMT9(Automaton):
    def my_send(self, x):
        self.io.loop.send(x)
    @ATMT.state(initial=1)
    def BEGIN(self):
        self.res = "V"
        self.send(Raw("ENU"))
    @ATMT.ioevent(BEGIN, name="loop")
    def received_sth(self, fd):
        self.res += fd.recv().load
        raise self.END()
    @ATMT.state(final=1)
    def END(self):
        self.res += "s"
        return self.res

a=ATMT9(debug=5, ll=lambda: None, recvsock=lambda: None)
a.run()
assert( _ == "VENUs" )

a.restart()
a.run()
assert( _ == "VENUs" )

a.restart()
a.BEGIN.intercepts()
while True:
    try:
        x = a.run()
    except Automaton.InterceptionPoint,p:
        a.accept_packet(Raw(p.packet.load.lower()), wait=False)
    else:
        break

x
assert( _ == "Venus" )