Skip to content
Snippets Groups Projects
Commit e9e7a39f authored by Guillaume Valadon's avatar Guillaume Valadon Committed by GitHub
Browse files

Merge pull request #201 from unmole/CoAP

Add support for CoAP
parents 03898572 cb50d731
No related branches found
No related tags found
No related merge requests found
# This file is part of Scapy.
# See http://www.secdev.org/projects/scapy for more information.
#
# Scapy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# Scapy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scapy. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright (C) 2016 Anmol Sarma <me@anmolsarma.in>
"""
RFC 7252 - Constrained Application Protocol (CoAP) layer for Scapy
"""
from scapy.fields import *
from scapy.layers.inet import UDP
from scapy.packet import *
coap_codes = {
0: "Empty",
# Request codes
1: "GET",
2: "POST",
3: "PUT",
4: "DELETE",
# Response codes
65: "2.01 Created",
66: "2.02 Deleted",
67: "2.03 Valid",
68: "2.04 Changed",
69: "2.05 Content",
128: "4.00 Bad Request",
129: "4.01 Unauthorized",
130: "4.02 Bad Option",
131: "4.03 Forbidden",
132: "4.04 Not Found",
133: "4.05 Method Not Allowed",
134: "4.06 Not Acceptable",
140: "4.12 Precondition Failed",
141: "4.13 Request Entity Too Large",
143: "4.15 Unsupported Content-Format",
160: "5.00 Internal Server Error",
161: "5.01 Not Implemented",
162: "5.02 Bad Gateway",
163: "5.03 Service Unavailable",
164: "5.04 Gateway Timeout",
165: "Proxying Not Supported"}
coap_options = ({
1: "If-Match",
3: "Uri-Host",
4: "ETag",
5: "If-None-Match",
7: "Uri-Port",
8: "Location-Path",
11: "Uri-Path",
12: "Content-Format",
14: "Max-Age",
15: "Uri-Query",
17: "Accept",
20: "Location-Query",
35: "Proxy-Uri",
39: "Proxy-Scheme",
60: "Size1"
},
{
"If-Match": 1,
"Uri-Host": 3,
"ETag": 4,
"If-None-Match": 5,
"Uri-Port": 7,
"Location-Path": 8,
"Uri-Path": 11,
"Content-Format": 12,
"Max-Age": 14,
"Uri-Query": 15,
"Accept": 17,
"Location-Query": 20,
"Proxy-Uri": 35,
"Proxy-Scheme": 39,
"Size1": 60
})
def _get_ext_field_size(val):
if val >= 15:
warning("Invalid Option Delta or Length")
if val == 14:
return 2
if val == 13:
return 1
return 0
def _get_delta_ext_size(pkt):
return _get_ext_field_size(pkt.delta)
def _get_len_ext_size(pkt):
return _get_ext_field_size(pkt.len)
def _get_abs_val(val, ext_val):
if val >= 15:
warning("Invalid Option Length or Delta %d" % val)
if val == 14:
return 269 + struct.unpack('H', ext_val)[0]
if val == 13:
return 13 + struct.unpack('B', ext_val)[0]
return val
def _get_opt_val_size(pkt):
return _get_abs_val(pkt.len, pkt.len_ext)
class _CoAPOpt(Packet):
fields_desc = [BitField("delta", 0, 4),
BitField("len", 0, 4),
StrLenField("delta_ext", None, length_from=_get_delta_ext_size),
StrLenField("len_ext", None, length_from=_get_len_ext_size),
StrLenField("opt_val", None, length_from=_get_opt_val_size)]
@staticmethod
def _populate_extended(val):
if val >= 269:
return struct.pack('H', val - 269), 14
if val >= 13:
return struct.pack('B', val - 13), 13
return None, val
def do_build(self):
self.delta_ext, self.delta = self._populate_extended(self.delta)
self.len_ext, self.len = self._populate_extended(len(self.opt_val))
return Packet.do_build(self)
def guess_payload_class(self, payload):
if payload[0] != '\xff':
return _CoAPOpt
else:
return Packet.guess_payload_class(self, payload)
class _CoAPOptsField(StrField):
islist = 1
def i2h(self, pkt, x):
return [(coap_options[0][o[0]], o[1]) if o[0] in coap_options[0] else o for o in x]
def getfield(self, pkt, s):
return "", self.m2i(pkt, s)
def m2i(self, pkt, x):
opts = []
o = _CoAPOpt(x)
cur_delta = 0
while isinstance(o, _CoAPOpt):
cur_delta += _get_abs_val(o.delta, o.delta_ext)
opts.append((cur_delta, o.opt_val))
o = o.payload
return opts
def i2m(self, pkt, x):
if not x:
return ""
opt_lst = []
for o in x:
if isinstance(o[0], str):
opt_lst.append((coap_options[1][o[0]], o[1]))
else:
opt_lst.append(o)
opt_lst.sort()
opts = _CoAPOpt(delta=opt_lst[0][0], opt_val=opt_lst[0][1])
high_opt = opt_lst[0][0]
for o in opt_lst[1:]:
opts = opts / _CoAPOpt(delta=o[0] - high_opt, opt_val=o[1])
high_opt = o[0]
return str(opts)
class CoAP(Packet):
name = "CoAP"
fields_desc = [BitField("ver", 1, 2),
BitEnumField("type", 0, 2, {0: "CON", 1: "NON", 2: "ACK", 3: "RST"}),
BitFieldLenField("tkl", None, 4, length_of='token'),
ByteEnumField("code", 0, coap_codes),
ShortField("msg_id", 0),
StrLenField("token", "", length_from=lambda pkt: pkt.tkl),
_CoAPOptsField("options", [])
]
bind_layers(UDP, CoAP, sport=5683)
bind_layers(UDP, CoAP, dport=5683)
% CoAP layer test campaign
+ Syntax check
= Import the CoAP layer
from scapy.contrib.coap import *
+ Test CoAP
= CoAP default values
str(CoAP()) == '\x40\x00\x00\x00'
= Token length calculation
p = CoAP(token='foobar')
CoAP(str(p)).tkl == 6
= CON GET dissect
p = CoAP('\x40\x01\xd9\xe1\xbb\x2e\x77\x65\x6c\x6c\x2d\x6b\x6e\x6f\x77\x6e\x04\x63\x6f\x72\x65')
p.code == 1
p.ver == 1
p.tkl == 0
p.tkl == 0
p.msg_id = 55777
p.token == ''
p.type == 0
p.options == [('Uri-Path', '.well-known'), ('Uri-Path', 'core')]
= Extended option delta
str(CoAP(options=[("Uri-Query", "query")])) == '\x40\x00\x00\x00\xd5\x02\x71\x75\x65\x72\x79'
= Extended option length
str(CoAP(options=[("Location-Path", 'x' * 280)])) == '\x40\x00\x00\x00\x8e\x0b\x00' + '\x78' * 280
+ Test layer binding
= Destination port
p = UDP()/CoAP()
p[UDP].dport == 5683
= Source port
s = '\x16\x33\xa0\xa4\x00\x78\xfe\x8b\x60\x45\xd9\xe1\xc1\x28\xff\x3c\x2f\x3e\x3b\x74\x69\x74\x6c\x65\x3d\x22\x47\x65' \
'\x6e\x65\x72\x61\x6c\x20\x49\x6e\x66\x6f\x22\x3b\x63\x74\x3d\x30\x2c\x3c\x2f\x74\x69\x6d\x65\x3e\x3b\x69\x66\x3d' \
'\x22\x63\x6c\x6f\x63\x6b\x22\x3b\x72\x74\x3d\x22\x54\x69\x63\x6b\x73\x22\x3b\x74\x69\x74\x6c\x65\x3d\x22\x49\x6e' \
'\x74\x65\x72\x6e\x61\x6c\x20\x43\x6c\x6f\x63\x6b\x22\x3b\x63\x74\x3d\x30\x3b\x6f\x62\x73\x2c\x3c\x2f\x61\x73\x79' \
'\x6e\x63\x3e\x3b\x63\x74\x3d\x30'
CoAP in UDP(s)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment