Skip to content
Snippets Groups Projects
Commit 10511d08 authored by sebastien mainand's avatar sebastien mainand Committed by Guillaume Valadon
Browse files

modbus: specify PDU function code types + tests (thms.aurel@gmail.com)

modbus: fix len handling for function code 0x2b + tests
parent 66d692d2
No related branches found
No related tags found
No related merge requests found
......@@ -17,7 +17,7 @@
# scapy.contrib.description = ModBus Protocol
# scapy.contrib.status = loads
# Copyright (C) 2016 Arthur Gervais, Ken LE PRADO, Sébastien Mainand
# Copyright (C) 2017 Arthur Gervais, Ken LE PRADO, Sébastien Mainand, Thomas Aurel
from scapy.packet import *
from scapy.fields import *
......@@ -36,42 +36,6 @@ _modbus_exceptions = {1: "Illegal Function Code",
11: "Gateway Target Device Failed to Respond"}
class ModbusPDU00GenericRequest(Packet):
name = "Generic Request"
fields_desc = [XByteField("funcCode", 0x00),
StrFixedLenField("payload", "", 255)]
def extract_padding(self, s):
return "", None
def mysummary(self):
return self.sprintf("Modbus Request %funcCode%")
class ModbusPDU00GenericResponse(Packet):
name = "Generic Request"
fields_desc = [XByteField("funcCode", 0x00),
StrFixedLenField("payload", "", 255)]
def extract_padding(self, s):
return "", None
def mysummary(self):
return self.sprintf("Modbus Response %funcCode%")
class ModbusPDU00GenericError(Packet):
name = "Generic Exception"
fields_desc = [XByteField("funcCode", 0x80),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
def extract_padding(self, s):
return "", None
def my_summary(self):
return self.sprintf("Modbus Exception %funcCode%")
class ModbusPDU01ReadCoilsRequest(Packet):
name = "Read Coils Request"
fields_desc = [XByteField("funcCode", 0x01),
......@@ -79,7 +43,7 @@ class ModbusPDU01ReadCoilsRequest(Packet):
XShortField("quantity", 0x0001)]
def extract_padding(self, s):
return "", None
return b"", None
class ModbusPDU01ReadCoilsResponse(Packet):
......@@ -89,7 +53,7 @@ class ModbusPDU01ReadCoilsResponse(Packet):
FieldListField("coilStatus", [0x00], ByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)]
def extract_padding(self, s):
return "", None
return b"", None
class ModbusPDU01ReadCoilsError(Packet):
......@@ -98,7 +62,7 @@ class ModbusPDU01ReadCoilsError(Packet):
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
def extract_padding(self, s):
return "", None
return b"", None
class ModbusPDU02ReadDiscreteInputsRequest(Packet):
......@@ -108,7 +72,7 @@ class ModbusPDU02ReadDiscreteInputsRequest(Packet):
XShortField("quantity", 0x0001)]
def extract_padding(self, s):
return "", None
return b"", None
class ModbusPDU02ReadDiscreteInputsResponse(Packet):
......@@ -135,7 +99,7 @@ class ModbusPDU03ReadHoldingRegistersRequest(Packet):
XShortField("quantity", 0x0001)]
def extract_padding(self, s):
return "", None
return b"", None
class ModbusPDU03ReadHoldingRegistersResponse(Packet):
......@@ -159,7 +123,7 @@ class ModbusPDU04ReadInputRegistersRequest(Packet):
XShortField("quantity", 0x0001)]
def extract_padding(self, s):
return "", None
return b"", None
class ModbusPDU04ReadInputRegistersResponse(Packet):
......@@ -203,7 +167,7 @@ class ModbusPDU06WriteSingleRegisterRequest(Packet):
XShortField("registerValue", 0x0000)]
def extract_padding(self, s):
return "", None
return b"", None
class ModbusPDU06WriteSingleRegisterResponse(Packet):
......@@ -224,7 +188,7 @@ class ModbusPDU07ReadExceptionStatusRequest(Packet):
fields_desc = [XByteField("funcCode", 0x07)]
def extract_padding(self, s):
return "", None
return b"", None
class ModbusPDU07ReadExceptionStatusResponse(Packet):
......@@ -248,7 +212,7 @@ class ModbusPDU0FWriteMultipleCoilsRequest(Packet):
FieldListField("outputsValue", [0x00], XByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)]
def extract_padding(self, s):
return "", None
return b"", None
class ModbusPDU0FWriteMultipleCoilsResponse(Packet):
......@@ -292,7 +256,7 @@ class ModbusPDU11ReportSlaveIdRequest(Packet):
fields_desc = [XByteField("funcCode", 0x11)]
def extract_padding(self, s):
return "", None
return b"", None
class ModbusPDU11ReportSlaveIdResponse(Packet):
......@@ -574,10 +538,165 @@ class ModbusPDU2B0EReadDeviceIdentificationError(Packet):
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
_reserved_funccode_request = {
0x08: '0x08 Unknown Reserved Request',
0x09: '0x09 Unknown Reserved Request',
0x0A: '0x0a Unknown Reserved Request',
0x0D: '0x0d Unknown Reserved Request',
0x0E: '0x0e Unknown Reserved Request',
0x29: '0x29 Unknown Reserved Request',
0x2A: '0x2a Unknown Reserved Request',
0x5A: 'Specific Schneider Electric Request',
0x5B: '0x5b Unknown Reserved Request',
0x7D: '0x7d Unknown Reserved Request',
0x7E: '0x7e Unknown Reserved Request',
0x7F: '0x7f Unknown Reserved Request',
}
_reserved_funccode_response = {
0x08: '0x08 Unknown Reserved Response',
0x09: '0x09 Unknown Reserved Response',
0x0A: '0x0a Unknown Reserved Response',
0x0D: '0x0d Unknown Reserved Response',
0x0E: '0x0e Unknown Reserved Response',
0x29: '0x29 Unknown Reserved Response',
0x2A: '0x2a Unknown Reserved Response',
0x5A: 'Specific Schneider Electric Response',
0x5B: '0x5b Unknown Reserved Response',
0x7D: '0x7d Unknown Reserved Response',
0x7E: '0x7e Unknown Reserved Response',
0x7F: '0x7f Unknown Reserved Response',
}
_reserved_funccode_error = {
0x88: '0x88 Unknown Reserved Error',
0x89: '0x89 Unknown Reserved Error',
0x8A: '0x8a Unknown Reserved Error',
0x8D: '0x8d Unknown Reserved Error',
0x8E: '0x8e Unknown Reserved Error',
0xA9: '0x88 Unknown Reserved Error',
0xAA: '0x88 Unknown Reserved Error',
0xDA: 'Specific Schneider Electric Error',
0xDB: '0xdb Unknown Reserved Error',
0xDC: '0xdc Unknown Reserved Error',
0xFD: '0xfd Unknown Reserved Error',
0xFE: '0xfe Unknown Reserved Error',
0xFF: '0xff Unknown Reserved Error',
}
class ModbusPDUReservedFunctionCodeRequest(Packet):
name = "Reserved Function Code Request"
fields_desc = [
ByteEnumField("funcCode", 0x00, _reserved_funccode_request),
StrFixedLenField('payload', '', 255), ]
def extract_padding(self, s):
return b"", None
def mysummary(self):
return self.sprintf("Modbus Reserved Request %funcCode%")
class ModbusPDUReservedFunctionCodeResponse(Packet):
name = "Reserved Function Code Response"
fields_desc = [
ByteEnumField("funcCode", 0x00, _reserved_funccode_response),
StrFixedLenField('payload', '', 255), ]
def extract_padding(self, s):
return b"", None
def mysummary(self):
return self.sprintf("Modbus Reserved Response %funcCode%")
class ModbusPDUReservedFunctionCodeError(Packet):
name = "Reserved Function Code Error"
fields_desc = [
ByteEnumField("funcCode", 0x00, _reserved_funccode_error),
StrFixedLenField('payload', '', 255), ]
def extract_padding(self, s):
return b"", None
def mysummary(self):
return self.sprintf("Modbus Reserved Error %funcCode%")
_userdefined_funccode_request = {
}
_userdefined_funccode_response = {
}
_userdefined_funccode_error = {
}
class ModbusByteEnumField(EnumField):
__slots__ = "defEnum"
def __init__(self, name, default, enum, defEnum):
EnumField.__init__(self, name, default, enum, "B")
defEnum = self.defEnum = defEnum
def i2repr_one(self, pkt, x):
if self not in conf.noenum and not isinstance(x, VolatileValue) \
and x in self.i2s:
return self.i2s[x]
if self.defEnum:
return self.defEnum
return repr(x)
class ModbusPDUUserDefinedFunctionCodeRequest(Packet):
name = "User-Defined Function Code Request"
fields_desc = [
ModbusByteEnumField(
"funcCode", 0x00, _userdefined_funccode_request,
"Unknown user-defined request function Code"),
StrFixedLenField('payload', '', 255), ]
def extract_padding(self, s):
return b"", None
def mysummary(self):
return self.sprintf("Modbus User-Defined Request %funcCode%")
class ModbusPDUUserDefinedFunctionCodeResponse(Packet):
name = "User-Defined Function Code Response"
fields_desc = [
ModbusByteEnumField(
"funcCode", 0x00, _userdefined_funccode_response,
"Unknown user-defined response function Code"),
StrFixedLenField('payload', '', 255), ]
def extract_padding(self, s):
return b"", None
def mysummary(self):
return self.sprintf("Modbus User-Defined Response %funcCode%")
class ModbusPDUUserDefinedFunctionCodeError(Packet):
name = "User-Defined Function Code Error"
fields_desc = [
ModbusByteEnumField(
"funcCode", 0x00, _userdefined_funccode_error,
"Unknown user-defined error function Code"),
StrFixedLenField('payload', '', 255), ]
def extract_padding(self, s):
return b"", None
def mysummary(self):
return self.sprintf("Modbus User-Defined Error %funcCode%")
class ModbusObjectId(Packet):
name = "Object"
fields_desc = [ByteEnumField("id", 0x00, _read_device_id_object_id),
BitFieldLenField("length", None, 8, count_of="value"),
BitFieldLenField("length", None, 8, length_of="value"),
StrLenField("value", "", length_from=lambda pkt: pkt.length)]
def guess_payload_class(self, payload):
......@@ -617,7 +736,7 @@ _modbus_error_classes = {
0x96: ModbusPDU16MaskWriteRegisterError,
0x97: ModbusPDU17ReadWriteMultipleRegistersError,
0x98: ModbusPDU18ReadFIFOQueueError,
0xAB: ModbusPDU2B0EReadDeviceIdentificationError
0xAB: ModbusPDU2B0EReadDeviceIdentificationError,
}
_modbus_response_classes = {
0x01: ModbusPDU01ReadCoilsResponse,
......@@ -634,7 +753,7 @@ _modbus_response_classes = {
0x15: ModbusPDU15WriteFileRecordResponse,
0x16: ModbusPDU16MaskWriteRegisterResponse,
0x17: ModbusPDU17ReadWriteMultipleRegistersResponse,
0x18: ModbusPDU18ReadFIFOQueueResponse
0x18: ModbusPDU18ReadFIFOQueueResponse,
}
_mei_types_request = {
0x0E: ModbusPDU2B0EReadDeviceIdentificationRequest,
......@@ -655,9 +774,9 @@ class ModbusADURequest(Packet):
def guess_payload_class(self, payload):
function_code = int(payload[0].encode("hex"), 16)
sub_code = int(payload[1].encode("hex"), 16)
if function_code == 0x2B:
sub_code = int(payload[1].encode("hex"), 16)
try:
return _mei_types_request[sub_code]
except KeyError:
......@@ -666,7 +785,9 @@ class ModbusADURequest(Packet):
return _modbus_request_classes[function_code]
except KeyError:
pass
return ModbusPDU00GenericRequest
if function_code in _reserved_funccode_request.keys():
return ModbusPDUReservedFunctionCodeRequest
return ModbusPDUUserDefinedFunctionCodeRequest
def post_build(self, p, pay):
if self.len is None:
......@@ -684,9 +805,9 @@ class ModbusADUResponse(Packet):
def guess_payload_class(self, payload):
function_code = int(payload[0].encode("hex"), 16)
sub_code = int(payload[1].encode("hex"), 16)
if function_code == 0x2B:
sub_code = int(payload[1].encode("hex"), 16)
try:
return _mei_types_response[sub_code]
except KeyError:
......@@ -699,9 +820,13 @@ class ModbusADUResponse(Packet):
return _modbus_error_classes[function_code]
except KeyError:
pass
if function_code < 0x81:
return ModbusPDU00GenericResponse
return ModbusPDU00GenericError
if function_code in _reserved_funccode_response.keys():
return ModbusPDUReservedFunctionCodeResponse
elif function_code in _reserved_funccode_error.keys():
return ModbusPDUReservedFunctionCodeError
if function_code < 0x80:
return ModbusPDUUserDefinedFunctionCodeResponse
return ModbusPDUUserDefinedFunctionCodeError
def post_build(self, p, pay):
if self.len is None:
......@@ -712,4 +837,3 @@ class ModbusADUResponse(Packet):
bind_layers(TCP, ModbusADURequest, dport=502)
bind_layers(TCP, ModbusADUResponse, sport=502)
......@@ -31,16 +31,36 @@ assert(isinstance(p.payload, ModbusPDU2B0EReadDeviceIdentificationResponse))
p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x03\xff\xab\x01')
assert(isinstance(p.payload, ModbusPDU2B0EReadDeviceIdentificationError))
= MBAP Guess Payload (Invalid payload)
p = ModbusADURequest(b'\x00\x00\x00\x00\x00\x03\xff\xff\xff')
assert(isinstance(p.payload, ModbusPDU00GenericRequest))
= MBAP Guess Payload Reserved Function Request (Invalid payload)
p = ModbusADURequest(b'\x00\x00\x00\x00\x00\x02\xff\x5b')
assert(isinstance(p.payload,ModbusPDUReservedFunctionCodeRequest))
= MBAP Guess Payload Reserved Function Response (Invalid payload)
p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x02\xff\x7e')
assert(isinstance(p.payload, ModbusPDUReservedFunctionCodeResponse))
= MBAP Guess Payload Reserved Function Error (Invalid payload)
p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x02\xff\x8a')
assert(isinstance(p.payload, ModbusPDUReservedFunctionCodeError))
= MBAP Guess Payload ModbusPDU02ReadDiscreteInputsResponse
p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x04\xff\x80\xff\x00')
assert(isinstance(p.payload, ModbusPDU00GenericResponse))
= MBAP Guess Payload ModbusPDU02ReadDiscreteInputsError
p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x04\xff\xff\xff\xff')
assert(isinstance(p.payload, ModbusPDU00GenericError))
assert(str(ModbusPDU02ReadDiscreteInputsResponse()), b'\x02\x01\x00')
= MBAP Guess Payload ModbusPDU02ReadDiscreteInputsResponse minimal parameters
assert(str(ModbusPDU02ReadDiscreteInputsResponse(inputStatus=[0x02, 0x01])), b'\x02\x02\x02\x01')
= MBAP Guess Payload ModbusPDU02ReadDiscreteInputsRequest dissection
p = ModbusPDU02ReadDiscreteInputsResponse(b'\x02\x02\x02\x01')
p.byteCount == 2 and p.inputStatus == [0x02, 0x01]
= ModbusPDU02ReadDiscreteInputsError
str(ModbusPDU02ReadDiscreteInputsError()) == b'\x82\x01'
= MBAP Guess Payload User-Defined Function Request (Invalid payload)
p = ModbusADURequest(b'\x00\x00\x00\x00\x00\x02\xff\x5b')
assert(isinstance(p.payload, ModbusPDUReservedFunctionCodeRequest))
= MBAP Guess Payload User-Defined Function Response (Invalid payload)
p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x02\xff\x7e')
assert(isinstance(p.payload, ModbusPDUReservedFunctionCodeResponse))
= MBAP Guess Payload User-Defined Function Error (Invalid payload)
p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x02\xff\x8a')
assert(isinstance(p.payload, ModbusPDUReservedFunctionCodeError))
+ Test layer binding
= Destination port
......@@ -182,9 +202,11 @@ str(ModbusPDU10WriteMultipleRegistersError()) == b'\x90\x01'
# 0x14/944 Read File Record ---------------------------------------------------------
= ModbusPDU14ReadFileRecordRequest len parameters
str(ModbusPDU14ReadFileRecordRequest()/ModbusReadFileSubRequest()/ModbusReadFileSubRequest()) == b'\x14\x0e\x06\x00\x01\x00\x00\x00\x01\x06\x00\x01\x00\x00\x00\x01'
p = str(ModbusPDU14ReadFileRecordRequest()/ModbusReadFileSubRequest()/ModbusReadFileSubRequest())
assert(p == b'\x14\x0e\x06\x00\x01\x00\x00\x00\x01\x06\x00\x01\x00\x00\x00\x01')
= ModbusPDU14ReadFileRecordRequest minimal parameters
str(ModbusPDU14ReadFileRecordRequest()/ModbusReadFileSubRequest(fileNumber=4, recordNumber=1, recordLength=02)/ModbusReadFileSubRequest(fileNumber=3, recordNumber=9, recordLength=2)) == b'\x14\x0e\x06\x00\x04\x00\x01\x00\x02\x06\x00\x03\x00\t\x00\x02'
p = str(ModbusPDU14ReadFileRecordRequest()/ModbusReadFileSubRequest(fileNumber=4, recordNumber=1, recordLength=02)/ModbusReadFileSubRequest(fileNumber=3, recordNumber=9, recordLength=2))
assert(p == b'\x14\x0e\x06\x00\x04\x00\x01\x00\x02\x06\x00\x03\x00\t\x00\x02')
= ModbusPDU14ReadFileRecordRequest dissection
p = ModbusPDU14ReadFileRecordRequest(b'\x14\x0e\x06\x00\x04\x00\x01\x00\x02\x06\x00\x03\x00\t\x00\x02')
assert(isinstance(p.payload, ModbusReadFileSubRequest))
......@@ -270,10 +292,13 @@ str(ModbusPDU18ReadFIFOQueueError()) == b'\x98\x01'
# 0x2b 0xOE Read Device Information -------------------------------------------------
= ModbusPDU2B0EReadDeviceIdentificationRequest
str(ModbusPDU2B0EReadDeviceIdentificationRequest()) ==b'+\x0e\x01\x00'
str(ModbusPDU2B0EReadDeviceIdentificationRequest()) == b'+\x0e\x01\x00'
= ModbusPDU2B0EReadDeviceIdentificationResponse
str(ModbusPDU2B0EReadDeviceIdentificationResponse()) == b'+\x0e\x04\x01\x00\x00\x00'
= ModbusPDU2B0EReadDeviceIdentificationResponse complete response
p = str(ModbusPDU2B0EReadDeviceIdentificationResponse(objCount=2)/ModbusObjectId(id=0, value="Obj1")/ModbusObjectId(id=1, value="Obj2"))
assert(p == b'+\x0e\x04\x01\x00\x00\x02\x00\x04Obj1\x01\x04Obj2')
= ModbusPDU2B0EReadDeviceIdentificationResponse dissection
p = ModbusPDU2B0EReadDeviceIdentificationResponse(b'+\x0e\x01\x83\x00\x00\x03\x00\x08Pymodbus\x01\x02PM\x02\x031.0')
assert(p.payload.payload.payload.id == 2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment