Skip to content
Snippets Groups Projects
Commit 4d7ccd51 authored by Todd Freed's avatar Todd Freed
Browse files

coap: allow CoAP packets to have a payload

parent 0d92fc9f
No related branches found
No related tags found
No related merge requests found
......@@ -157,8 +157,13 @@ class _CoAPOptsField(StrField):
def i2h(self, pkt, x):
return [(coap_options[0][o[0]], o[1]) if o[0] in coap_options[0] else o for o in x]
# consume only the coap layer from the wire string
def getfield(self, pkt, s):
return "", self.m2i(pkt, s)
opts = self.m2i(pkt, s)
used = 0
for o in opts:
used += o[0]
return s[used:], [ (o[1], o[2]) for o in opts ]
def m2i(self, pkt, x):
opts = []
......@@ -166,7 +171,9 @@ class _CoAPOptsField(StrField):
cur_delta = 0
while isinstance(o, _CoAPOpt):
cur_delta += _get_abs_val(o.delta, o.delta_ext)
opts.append((cur_delta, o.opt_val))
# size of this option in bytes
u = 1 + len(o.opt_val) + len(o.delta_ext) + len(o.len_ext)
opts.append((u, cur_delta, o.opt_val))
o = o.payload
return opts
......@@ -189,8 +196,26 @@ class _CoAPOptsField(StrField):
return str(opts)
class _CoAPPaymark(StrField):
def i2h(self, pkt, x):
return x
def getfield(self, pkt, s):
(u, m) = self.m2i(pkt, s)
return s[u:], m
def m2i(self, pkt, x):
if len(x) > 0 and x[0] == '\xff':
return 1, '\xff'
return 0, '';
def i2m(self, pkt, x):
return x
class CoAP(Packet):
__slots__ = ["content_format"]
name = "CoAP"
fields_desc = [BitField("ver", 1, 2),
......@@ -199,9 +224,21 @@ class CoAP(Packet):
ByteEnumField("code", 0, coap_codes),
ShortField("msg_id", 0),
StrLenField("token", "", length_from=lambda pkt: pkt.tkl),
_CoAPOptsField("options", [])
_CoAPOptsField("options", []),
_CoAPPaymark("paymark", "")
]
def getfieldval(self, attr):
v = getattr(self, attr)
if v:
return v
return Packet.getfieldval(self, attr)
def post_dissect(self, pay):
for k in self.options:
if k[0] == "Content-Format":
self.content_format = k[1]
return pay
bind_layers(UDP, CoAP, sport=5683)
bind_layers(UDP, CoAP, dport=5683)
......@@ -6,39 +6,39 @@ from scapy.contrib.coap import *
+ Test CoAP
= CoAP default values
str(CoAP()) == '\x40\x00\x00\x00'
assert(str(CoAP()) == '\x40\x00\x00\x00')
= Token length calculation
p = CoAP(token='foobar')
CoAP(str(p)).tkl == 6
assert(CoAP(str(p)).tkl == 6)
= CON GET dissect
p = CoAP('\x40\x01\xd9\xe1\xbb\x2e\x77\x65\x6c\x6c\x2d\x6b\x6e\x6f\x77\x6e\x04\x63\x6f\x72\x65')
p.code == 1
p.ver == 1
p.tkl == 0
p.tkl == 0
p.msg_id = 55777
p.token == ''
p.type == 0
p.options == [('Uri-Path', '.well-known'), ('Uri-Path', 'core')]
assert(p.code == 1)
assert(p.ver == 1)
assert(p.tkl == 0)
assert(p.tkl == 0)
assert(p.msg_id == 55777)
assert(p.token == '')
assert(p.type == 0)
assert(p.options == [('Uri-Path', '.well-known'), ('Uri-Path', 'core')])
= Extended option delta
str(CoAP(options=[("Uri-Query", "query")])) == '\x40\x00\x00\x00\xd5\x02\x71\x75\x65\x72\x79'
assert(str(CoAP(options=[("Uri-Query", "query")])) == '\x40\x00\x00\x00\xd5\x02\x71\x75\x65\x72\x79')
= Extended option length
str(CoAP(options=[("Location-Path", 'x' * 280)])) == '\x40\x00\x00\x00\x8e\x0b\x00' + '\x78' * 280
assert(str(CoAP(options=[("Location-Path", 'x' * 280)])) == '\x40\x00\x00\x00\x8e\x0b\x00' + '\x78' * 280)
= Options should be ordered by option number
str(CoAP(options=[("Uri-Query", "b"),("Uri-Path","a")])) == '\x40\x00\x00\x00\xb1\x61\x41\x62'
assert(str(CoAP(options=[("Uri-Query", "b"),("Uri-Path","a")])) == '\x40\x00\x00\x00\xb1\x61\x41\x62')
= Options of the same type should not be reordered
str(CoAP(options=[("Uri-Path", "b"),("Uri-Path","a")])) == '\x40\x00\x00\x00\xb1\x62\x01\x61'
assert(str(CoAP(options=[("Uri-Path", "b"),("Uri-Path","a")])) == '\x40\x00\x00\x00\xb1\x62\x01\x61')
+ Test layer binding
= Destination port
p = UDP()/CoAP()
p[UDP].dport == 5683
assert(p[UDP].dport == 5683)
= Source port
s = '\x16\x33\xa0\xa4\x00\x78\xfe\x8b\x60\x45\xd9\xe1\xc1\x28\xff\x3c\x2f\x3e\x3b\x74\x69\x74\x6c\x65\x3d\x22\x47\x65' \
......@@ -46,4 +46,18 @@ s = '\x16\x33\xa0\xa4\x00\x78\xfe\x8b\x60\x45\xd9\xe1\xc1\x28\xff\x3c\x2f\x3e\x3
'\x22\x63\x6c\x6f\x63\x6b\x22\x3b\x72\x74\x3d\x22\x54\x69\x63\x6b\x73\x22\x3b\x74\x69\x74\x6c\x65\x3d\x22\x49\x6e' \
'\x74\x65\x72\x6e\x61\x6c\x20\x43\x6c\x6f\x63\x6b\x22\x3b\x63\x74\x3d\x30\x3b\x6f\x62\x73\x2c\x3c\x2f\x61\x73\x79' \
'\x6e\x63\x3e\x3b\x63\x74\x3d\x30'
CoAP in UDP(s)
assert(CoAP in UDP(s))
= building with a text/plain payload
p = CoAP(ver = 1, type = 0, code = 0x42, msg_id = 0xface, options=[("Content-Format", "\x00")], paymark = "\xff")
p /= Raw("\xde\xad\xbe\xef")
assert(str(p) == '\x40\x42\xfa\xce\xc1\x00\xff\xde\xad\xbe\xef')
= dissection with a text/plain payload
p = CoAP(str(p))
assert(p.ver == 1)
assert(p.type == 0)
assert(p.code == 0x42)
assert(p.msg_id == 0xface)
assert(isinstance(p.payload, Raw))
assert(p.payload.load == '\xde\xad\xbe\xef')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment