diff --git a/scapy/contrib/modbus.py b/scapy/contrib/modbus.py index 55640711894d5113457d8a66358fc47b299c9b92..25bcc82f470c65d7f92de84a3a57463bab4ed630 100644 --- a/scapy/contrib/modbus.py +++ b/scapy/contrib/modbus.py @@ -17,7 +17,7 @@ # scapy.contrib.description = ModBus Protocol # scapy.contrib.status = loads -# Copyright (C) 2016 Arthur Gervais, Ken LE PRADO, Sébastien Mainand +# Copyright (C) 2017 Arthur Gervais, Ken LE PRADO, Sébastien Mainand, Thomas Aurel from scapy.packet import * from scapy.fields import * @@ -36,42 +36,6 @@ _modbus_exceptions = {1: "Illegal Function Code", 11: "Gateway Target Device Failed to Respond"} -class ModbusPDU00GenericRequest(Packet): - name = "Generic Request" - fields_desc = [XByteField("funcCode", 0x00), - StrFixedLenField("payload", "", 255)] - - def extract_padding(self, s): - return "", None - - def mysummary(self): - return self.sprintf("Modbus Request %funcCode%") - - -class ModbusPDU00GenericResponse(Packet): - name = "Generic Request" - fields_desc = [XByteField("funcCode", 0x00), - StrFixedLenField("payload", "", 255)] - - def extract_padding(self, s): - return "", None - - def mysummary(self): - return self.sprintf("Modbus Response %funcCode%") - - -class ModbusPDU00GenericError(Packet): - name = "Generic Exception" - fields_desc = [XByteField("funcCode", 0x80), - ByteEnumField("exceptCode", 1, _modbus_exceptions)] - - def extract_padding(self, s): - return "", None - - def my_summary(self): - return self.sprintf("Modbus Exception %funcCode%") - - class ModbusPDU01ReadCoilsRequest(Packet): name = "Read Coils Request" fields_desc = [XByteField("funcCode", 0x01), @@ -79,7 +43,7 @@ class ModbusPDU01ReadCoilsRequest(Packet): XShortField("quantity", 0x0001)] def extract_padding(self, s): - return "", None + return b"", None class ModbusPDU01ReadCoilsResponse(Packet): @@ -89,7 +53,7 @@ class ModbusPDU01ReadCoilsResponse(Packet): FieldListField("coilStatus", [0x00], ByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)] def extract_padding(self, s): - return "", None + return b"", None class ModbusPDU01ReadCoilsError(Packet): @@ -98,7 +62,7 @@ class ModbusPDU01ReadCoilsError(Packet): ByteEnumField("exceptCode", 1, _modbus_exceptions)] def extract_padding(self, s): - return "", None + return b"", None class ModbusPDU02ReadDiscreteInputsRequest(Packet): @@ -108,7 +72,7 @@ class ModbusPDU02ReadDiscreteInputsRequest(Packet): XShortField("quantity", 0x0001)] def extract_padding(self, s): - return "", None + return b"", None class ModbusPDU02ReadDiscreteInputsResponse(Packet): @@ -135,7 +99,7 @@ class ModbusPDU03ReadHoldingRegistersRequest(Packet): XShortField("quantity", 0x0001)] def extract_padding(self, s): - return "", None + return b"", None class ModbusPDU03ReadHoldingRegistersResponse(Packet): @@ -159,7 +123,7 @@ class ModbusPDU04ReadInputRegistersRequest(Packet): XShortField("quantity", 0x0001)] def extract_padding(self, s): - return "", None + return b"", None class ModbusPDU04ReadInputRegistersResponse(Packet): @@ -203,7 +167,7 @@ class ModbusPDU06WriteSingleRegisterRequest(Packet): XShortField("registerValue", 0x0000)] def extract_padding(self, s): - return "", None + return b"", None class ModbusPDU06WriteSingleRegisterResponse(Packet): @@ -224,7 +188,7 @@ class ModbusPDU07ReadExceptionStatusRequest(Packet): fields_desc = [XByteField("funcCode", 0x07)] def extract_padding(self, s): - return "", None + return b"", None class ModbusPDU07ReadExceptionStatusResponse(Packet): @@ -248,7 +212,7 @@ class ModbusPDU0FWriteMultipleCoilsRequest(Packet): FieldListField("outputsValue", [0x00], XByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)] def extract_padding(self, s): - return "", None + return b"", None class ModbusPDU0FWriteMultipleCoilsResponse(Packet): @@ -292,7 +256,7 @@ class ModbusPDU11ReportSlaveIdRequest(Packet): fields_desc = [XByteField("funcCode", 0x11)] def extract_padding(self, s): - return "", None + return b"", None class ModbusPDU11ReportSlaveIdResponse(Packet): @@ -574,10 +538,165 @@ class ModbusPDU2B0EReadDeviceIdentificationError(Packet): ByteEnumField("exceptCode", 1, _modbus_exceptions)] +_reserved_funccode_request = { + 0x08: '0x08 Unknown Reserved Request', + 0x09: '0x09 Unknown Reserved Request', + 0x0A: '0x0a Unknown Reserved Request', + 0x0D: '0x0d Unknown Reserved Request', + 0x0E: '0x0e Unknown Reserved Request', + 0x29: '0x29 Unknown Reserved Request', + 0x2A: '0x2a Unknown Reserved Request', + 0x5A: 'Specific Schneider Electric Request', + 0x5B: '0x5b Unknown Reserved Request', + 0x7D: '0x7d Unknown Reserved Request', + 0x7E: '0x7e Unknown Reserved Request', + 0x7F: '0x7f Unknown Reserved Request', +} + +_reserved_funccode_response = { + 0x08: '0x08 Unknown Reserved Response', + 0x09: '0x09 Unknown Reserved Response', + 0x0A: '0x0a Unknown Reserved Response', + 0x0D: '0x0d Unknown Reserved Response', + 0x0E: '0x0e Unknown Reserved Response', + 0x29: '0x29 Unknown Reserved Response', + 0x2A: '0x2a Unknown Reserved Response', + 0x5A: 'Specific Schneider Electric Response', + 0x5B: '0x5b Unknown Reserved Response', + 0x7D: '0x7d Unknown Reserved Response', + 0x7E: '0x7e Unknown Reserved Response', + 0x7F: '0x7f Unknown Reserved Response', +} + +_reserved_funccode_error = { + 0x88: '0x88 Unknown Reserved Error', + 0x89: '0x89 Unknown Reserved Error', + 0x8A: '0x8a Unknown Reserved Error', + 0x8D: '0x8d Unknown Reserved Error', + 0x8E: '0x8e Unknown Reserved Error', + 0xA9: '0x88 Unknown Reserved Error', + 0xAA: '0x88 Unknown Reserved Error', + 0xDA: 'Specific Schneider Electric Error', + 0xDB: '0xdb Unknown Reserved Error', + 0xDC: '0xdc Unknown Reserved Error', + 0xFD: '0xfd Unknown Reserved Error', + 0xFE: '0xfe Unknown Reserved Error', + 0xFF: '0xff Unknown Reserved Error', +} + + +class ModbusPDUReservedFunctionCodeRequest(Packet): + name = "Reserved Function Code Request" + fields_desc = [ + ByteEnumField("funcCode", 0x00, _reserved_funccode_request), + StrFixedLenField('payload', '', 255), ] + + def extract_padding(self, s): + return b"", None + + def mysummary(self): + return self.sprintf("Modbus Reserved Request %funcCode%") + + +class ModbusPDUReservedFunctionCodeResponse(Packet): + name = "Reserved Function Code Response" + fields_desc = [ + ByteEnumField("funcCode", 0x00, _reserved_funccode_response), + StrFixedLenField('payload', '', 255), ] + + def extract_padding(self, s): + return b"", None + + def mysummary(self): + return self.sprintf("Modbus Reserved Response %funcCode%") + + +class ModbusPDUReservedFunctionCodeError(Packet): + name = "Reserved Function Code Error" + fields_desc = [ + ByteEnumField("funcCode", 0x00, _reserved_funccode_error), + StrFixedLenField('payload', '', 255), ] + + def extract_padding(self, s): + return b"", None + + def mysummary(self): + return self.sprintf("Modbus Reserved Error %funcCode%") + + +_userdefined_funccode_request = { +} +_userdefined_funccode_response = { +} +_userdefined_funccode_error = { +} + + +class ModbusByteEnumField(EnumField): + __slots__ = "defEnum" + + def __init__(self, name, default, enum, defEnum): + EnumField.__init__(self, name, default, enum, "B") + defEnum = self.defEnum = defEnum + + def i2repr_one(self, pkt, x): + if self not in conf.noenum and not isinstance(x, VolatileValue) \ + and x in self.i2s: + return self.i2s[x] + if self.defEnum: + return self.defEnum + return repr(x) + + +class ModbusPDUUserDefinedFunctionCodeRequest(Packet): + name = "User-Defined Function Code Request" + fields_desc = [ + ModbusByteEnumField( + "funcCode", 0x00, _userdefined_funccode_request, + "Unknown user-defined request function Code"), + StrFixedLenField('payload', '', 255), ] + + def extract_padding(self, s): + return b"", None + + def mysummary(self): + return self.sprintf("Modbus User-Defined Request %funcCode%") + + +class ModbusPDUUserDefinedFunctionCodeResponse(Packet): + name = "User-Defined Function Code Response" + fields_desc = [ + ModbusByteEnumField( + "funcCode", 0x00, _userdefined_funccode_response, + "Unknown user-defined response function Code"), + StrFixedLenField('payload', '', 255), ] + + def extract_padding(self, s): + return b"", None + + def mysummary(self): + return self.sprintf("Modbus User-Defined Response %funcCode%") + + +class ModbusPDUUserDefinedFunctionCodeError(Packet): + name = "User-Defined Function Code Error" + fields_desc = [ + ModbusByteEnumField( + "funcCode", 0x00, _userdefined_funccode_error, + "Unknown user-defined error function Code"), + StrFixedLenField('payload', '', 255), ] + + def extract_padding(self, s): + return b"", None + + def mysummary(self): + return self.sprintf("Modbus User-Defined Error %funcCode%") + + class ModbusObjectId(Packet): name = "Object" fields_desc = [ByteEnumField("id", 0x00, _read_device_id_object_id), - BitFieldLenField("length", None, 8, count_of="value"), + BitFieldLenField("length", None, 8, length_of="value"), StrLenField("value", "", length_from=lambda pkt: pkt.length)] def guess_payload_class(self, payload): @@ -617,7 +736,7 @@ _modbus_error_classes = { 0x96: ModbusPDU16MaskWriteRegisterError, 0x97: ModbusPDU17ReadWriteMultipleRegistersError, 0x98: ModbusPDU18ReadFIFOQueueError, - 0xAB: ModbusPDU2B0EReadDeviceIdentificationError + 0xAB: ModbusPDU2B0EReadDeviceIdentificationError, } _modbus_response_classes = { 0x01: ModbusPDU01ReadCoilsResponse, @@ -634,7 +753,7 @@ _modbus_response_classes = { 0x15: ModbusPDU15WriteFileRecordResponse, 0x16: ModbusPDU16MaskWriteRegisterResponse, 0x17: ModbusPDU17ReadWriteMultipleRegistersResponse, - 0x18: ModbusPDU18ReadFIFOQueueResponse + 0x18: ModbusPDU18ReadFIFOQueueResponse, } _mei_types_request = { 0x0E: ModbusPDU2B0EReadDeviceIdentificationRequest, @@ -655,9 +774,9 @@ class ModbusADURequest(Packet): def guess_payload_class(self, payload): function_code = int(payload[0].encode("hex"), 16) - sub_code = int(payload[1].encode("hex"), 16) if function_code == 0x2B: + sub_code = int(payload[1].encode("hex"), 16) try: return _mei_types_request[sub_code] except KeyError: @@ -666,7 +785,9 @@ class ModbusADURequest(Packet): return _modbus_request_classes[function_code] except KeyError: pass - return ModbusPDU00GenericRequest + if function_code in _reserved_funccode_request.keys(): + return ModbusPDUReservedFunctionCodeRequest + return ModbusPDUUserDefinedFunctionCodeRequest def post_build(self, p, pay): if self.len is None: @@ -684,9 +805,9 @@ class ModbusADUResponse(Packet): def guess_payload_class(self, payload): function_code = int(payload[0].encode("hex"), 16) - sub_code = int(payload[1].encode("hex"), 16) if function_code == 0x2B: + sub_code = int(payload[1].encode("hex"), 16) try: return _mei_types_response[sub_code] except KeyError: @@ -699,9 +820,13 @@ class ModbusADUResponse(Packet): return _modbus_error_classes[function_code] except KeyError: pass - if function_code < 0x81: - return ModbusPDU00GenericResponse - return ModbusPDU00GenericError + if function_code in _reserved_funccode_response.keys(): + return ModbusPDUReservedFunctionCodeResponse + elif function_code in _reserved_funccode_error.keys(): + return ModbusPDUReservedFunctionCodeError + if function_code < 0x80: + return ModbusPDUUserDefinedFunctionCodeResponse + return ModbusPDUUserDefinedFunctionCodeError def post_build(self, p, pay): if self.len is None: @@ -712,4 +837,3 @@ class ModbusADUResponse(Packet): bind_layers(TCP, ModbusADURequest, dport=502) bind_layers(TCP, ModbusADUResponse, sport=502) - diff --git a/scapy/contrib/modbus.uts b/scapy/contrib/modbus.uts index 210e191b2fb994225bb2a274f23c28542cde7d5f..8adc0d23edb30dbf4d801530fe5cc991a44b69cd 100644 --- a/scapy/contrib/modbus.uts +++ b/scapy/contrib/modbus.uts @@ -31,16 +31,36 @@ assert(isinstance(p.payload, ModbusPDU2B0EReadDeviceIdentificationResponse)) p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x03\xff\xab\x01') assert(isinstance(p.payload, ModbusPDU2B0EReadDeviceIdentificationError)) -= MBAP Guess Payload (Invalid payload) -p = ModbusADURequest(b'\x00\x00\x00\x00\x00\x03\xff\xff\xff') -assert(isinstance(p.payload, ModbusPDU00GenericRequest)) += MBAP Guess Payload Reserved Function Request (Invalid payload) +p = ModbusADURequest(b'\x00\x00\x00\x00\x00\x02\xff\x5b') +assert(isinstance(p.payload,ModbusPDUReservedFunctionCodeRequest)) += MBAP Guess Payload Reserved Function Response (Invalid payload) +p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x02\xff\x7e') +assert(isinstance(p.payload, ModbusPDUReservedFunctionCodeResponse)) += MBAP Guess Payload Reserved Function Error (Invalid payload) +p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x02\xff\x8a') +assert(isinstance(p.payload, ModbusPDUReservedFunctionCodeError)) + = MBAP Guess Payload ModbusPDU02ReadDiscreteInputsResponse -p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x04\xff\x80\xff\x00') -assert(isinstance(p.payload, ModbusPDU00GenericResponse)) -= MBAP Guess Payload ModbusPDU02ReadDiscreteInputsError -p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x04\xff\xff\xff\xff') -assert(isinstance(p.payload, ModbusPDU00GenericError)) +assert(str(ModbusPDU02ReadDiscreteInputsResponse()), b'\x02\x01\x00') += MBAP Guess Payload ModbusPDU02ReadDiscreteInputsResponse minimal parameters +assert(str(ModbusPDU02ReadDiscreteInputsResponse(inputStatus=[0x02, 0x01])), b'\x02\x02\x02\x01') += MBAP Guess Payload ModbusPDU02ReadDiscreteInputsRequest dissection +p = ModbusPDU02ReadDiscreteInputsResponse(b'\x02\x02\x02\x01') +p.byteCount == 2 and p.inputStatus == [0x02, 0x01] + += ModbusPDU02ReadDiscreteInputsError +str(ModbusPDU02ReadDiscreteInputsError()) == b'\x82\x01' += MBAP Guess Payload User-Defined Function Request (Invalid payload) +p = ModbusADURequest(b'\x00\x00\x00\x00\x00\x02\xff\x5b') +assert(isinstance(p.payload, ModbusPDUReservedFunctionCodeRequest)) += MBAP Guess Payload User-Defined Function Response (Invalid payload) +p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x02\xff\x7e') +assert(isinstance(p.payload, ModbusPDUReservedFunctionCodeResponse)) += MBAP Guess Payload User-Defined Function Error (Invalid payload) +p = ModbusADUResponse(b'\x00\x00\x00\x00\x00\x02\xff\x8a') +assert(isinstance(p.payload, ModbusPDUReservedFunctionCodeError)) + Test layer binding = Destination port @@ -182,9 +202,11 @@ str(ModbusPDU10WriteMultipleRegistersError()) == b'\x90\x01' # 0x14/944 Read File Record --------------------------------------------------------- = ModbusPDU14ReadFileRecordRequest len parameters -str(ModbusPDU14ReadFileRecordRequest()/ModbusReadFileSubRequest()/ModbusReadFileSubRequest()) == b'\x14\x0e\x06\x00\x01\x00\x00\x00\x01\x06\x00\x01\x00\x00\x00\x01' +p = str(ModbusPDU14ReadFileRecordRequest()/ModbusReadFileSubRequest()/ModbusReadFileSubRequest()) +assert(p == b'\x14\x0e\x06\x00\x01\x00\x00\x00\x01\x06\x00\x01\x00\x00\x00\x01') = ModbusPDU14ReadFileRecordRequest minimal parameters -str(ModbusPDU14ReadFileRecordRequest()/ModbusReadFileSubRequest(fileNumber=4, recordNumber=1, recordLength=02)/ModbusReadFileSubRequest(fileNumber=3, recordNumber=9, recordLength=2)) == b'\x14\x0e\x06\x00\x04\x00\x01\x00\x02\x06\x00\x03\x00\t\x00\x02' +p = str(ModbusPDU14ReadFileRecordRequest()/ModbusReadFileSubRequest(fileNumber=4, recordNumber=1, recordLength=02)/ModbusReadFileSubRequest(fileNumber=3, recordNumber=9, recordLength=2)) +assert(p == b'\x14\x0e\x06\x00\x04\x00\x01\x00\x02\x06\x00\x03\x00\t\x00\x02') = ModbusPDU14ReadFileRecordRequest dissection p = ModbusPDU14ReadFileRecordRequest(b'\x14\x0e\x06\x00\x04\x00\x01\x00\x02\x06\x00\x03\x00\t\x00\x02') assert(isinstance(p.payload, ModbusReadFileSubRequest)) @@ -270,10 +292,13 @@ str(ModbusPDU18ReadFIFOQueueError()) == b'\x98\x01' # 0x2b 0xOE Read Device Information ------------------------------------------------- = ModbusPDU2B0EReadDeviceIdentificationRequest -str(ModbusPDU2B0EReadDeviceIdentificationRequest()) ==b'+\x0e\x01\x00' +str(ModbusPDU2B0EReadDeviceIdentificationRequest()) == b'+\x0e\x01\x00' = ModbusPDU2B0EReadDeviceIdentificationResponse str(ModbusPDU2B0EReadDeviceIdentificationResponse()) == b'+\x0e\x04\x01\x00\x00\x00' += ModbusPDU2B0EReadDeviceIdentificationResponse complete response +p = str(ModbusPDU2B0EReadDeviceIdentificationResponse(objCount=2)/ModbusObjectId(id=0, value="Obj1")/ModbusObjectId(id=1, value="Obj2")) +assert(p == b'+\x0e\x04\x01\x00\x00\x02\x00\x04Obj1\x01\x04Obj2') = ModbusPDU2B0EReadDeviceIdentificationResponse dissection p = ModbusPDU2B0EReadDeviceIdentificationResponse(b'+\x0e\x01\x83\x00\x00\x03\x00\x08Pymodbus\x01\x02PM\x02\x031.0') assert(p.payload.payload.payload.id == 2)