Skip to content
Snippets Groups Projects
Commit 6dfbb9a0 authored by Sebastien Mainand's avatar Sebastien Mainand
Browse files

Add support for Modbus TCP protocol

parent 7c485806
No related branches found
No related tags found
No related merge requests found
# coding: utf8
# This file is part of Scapy
# Scapy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# any later version.
#
# Scapy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scapy. If not, see <http://www.gnu.org/licenses/>.
# Copyright (C) 2016 Arthur Gervais, Ken LE PRADO, Sébastien Mainand
from scapy.all import *
# TODO: implement serial specific function codes
_modbus_exceptions = {1: "Illegal Function Code",
2: "Illegal Data Address",
3: "Illegal Data Value",
4: "Server Device Failure",
5: "Acknowledge",
6: "Server Device Busy",
8: "Memory Parity Error",
10: "Gateway Path Unavailable",
11: "Gateway Target Device Failed to Respond"}
class ModbusPDU00GenericRequest(Packet):
name = "Generic Request"
fields_desc = [XByteField("funcCode", 0x00),
StrFixedLenField("payload", "", 255)]
def extract_padding(self, s):
return "", None
def mysummary(self):
return self.sprintf("Modbus Request %funcCode%")
class ModbusPDU00GenericResponse(Packet):
name = "Generic Request"
fields_desc = [XByteField("funcCode", 0x00),
StrFixedLenField("payload", "", 255)]
def extract_padding(self, s):
return "", None
def mysummary(self):
return self.sprintf("Modbus Response %funcCode%")
class ModbusPDU00GenericError(Packet):
name = "Generic Exception"
fields_desc = [XByteField("funcCode", 0x80),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
def extract_padding(self, s):
return "", None
def my_summary(self):
return self.sprintf("Modbus Exception %funcCode%")
class ModbusPDU01ReadCoilsRequest(Packet):
name = "Read Coils Request"
fields_desc = [XByteField("funcCode", 0x01),
XShortField("startAddr", 0x0000), # 0x0000 to 0xFFFF
XShortField("quantity", 0x0001)]
def extract_padding(self, s):
return "", None
class ModbusPDU01ReadCoilsResponse(Packet):
name = "Read Coils Response"
fields_desc = [XByteField("funcCode", 0x01),
BitFieldLenField("byteCount", None, 8, count_of="coilStatus"),
FieldListField("coilStatus", [0x00], ByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)]
def extract_padding(self, s):
return "", None
class ModbusPDU01ReadCoilsError(Packet):
name = "Read Coils Exception"
fields_desc = [XByteField("funcCode", 0x81),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
def extract_padding(self, s):
return "", None
class ModbusPDU02ReadDiscreteInputsRequest(Packet):
name = "Read Discrete Inputs"
fields_desc = [XByteField("funcCode", 0x02),
XShortField("startAddr", 0x0000),
XShortField("quantity", 0x0001)]
def extract_padding(self, s):
return "", None
class ModbusPDU02ReadDiscreteInputsResponse(Packet):
""" inputStatus: result is represented as bytes, padded with 0 to have a
integer number of bytes. The field does not parse this result and
present the bytes directly
"""
name = "Read Discrete Inputs Response"
fields_desc = [XByteField("funcCode", 0x02),
BitFieldLenField("byteCount", None, 8, count_of="inputStatus"),
FieldListField("inputStatus", [0x00], ByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)]
class ModbusPDU02ReadDiscreteInputsError(Packet):
name = "Read Discrete Inputs Exception"
fields_desc = [XByteField("funcCode", 0x82),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusPDU03ReadHoldingRegistersRequest(Packet):
name = "Read Holding Registers"
fields_desc = [XByteField("funcCode", 0x03),
XShortField("startAddr", 0x0000),
XShortField("quantity", 0x0001)]
def extract_padding(self, s):
return "", None
class ModbusPDU03ReadHoldingRegistersResponse(Packet):
name = "Read Holding Registers Response"
fields_desc = [XByteField("funcCode", 0x03),
BitFieldLenField("byteCount", None, 8, count_of="registerVal", adjust=lambda pkt, x: x*2),
FieldListField("registerVal", [0x0000], ShortField("", 0x0000),
count_from=lambda pkt: pkt.byteCount)]
class ModbusPDU03ReadHoldingRegistersError(Packet):
name = "Read Holding Registers Exception"
fields_desc = [XByteField("funcCode", 0x83),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusPDU04ReadInputRegistersRequest(Packet):
name = "Read Input Registers"
fields_desc = [XByteField("funcCode", 0x04),
XShortField("startAddr", 0x0000),
XShortField("quantity", 0x0001)]
def extract_padding(self, s):
return "", None
class ModbusPDU04ReadInputRegistersResponse(Packet):
name = "Read Input Registers Response"
fields_desc = [XByteField("funcCode", 0x04),
BitFieldLenField("byteCount", None, 8, count_of="registerVal", adjust=lambda pkt, x: x*2),
FieldListField("registerVal", [0x0000], ShortField("", 0x0000),
count_from=lambda pkt: pkt.byteCount)]
class ModbusPDU04ReadInputRegistersError(Packet):
name = "Read Input Registers Exception"
fields_desc = [XByteField("funcCode", 0x84),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusPDU05WriteSingleCoilRequest(Packet):
name = "Write Single Coil"
fields_desc = [XByteField("funcCode", 0x05),
XShortField("outputAddr", 0x0000), # from 0x0000 to 0xFFFF
XShortField("outputValue", 0x0000)] # 0x0000 == Off, 0xFF00 == On
class ModbusPDU05WriteSingleCoilResponse(Packet): # The answer is the same as the request if successful
name = "Write Single Coil"
fields_desc = [XByteField("funcCode", 0x05),
XShortField("outputAddr", 0x0000), # from 0x0000 to 0xFFFF
XShortField("outputValue", 0x0000)] # 0x0000 == Off, 0xFF00 == On
class ModbusPDU05WriteSingleCoilError(Packet):
name = "Write Single Coil Exception"
fields_desc = [XByteField("funcCode", 0x85),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusPDU06WriteSingleRegisterRequest(Packet):
name = "Write Single Register"
fields_desc = [XByteField("funcCode", 0x06),
XShortField("registerAddr", 0x0000),
XShortField("registerValue", 0x0000)]
def extract_padding(self, s):
return "", None
class ModbusPDU06WriteSingleRegisterResponse(Packet):
name = "Write Single Register Response"
fields_desc = [XByteField("funcCode", 0x06),
XShortField("registerAddr", 0x0000),
XShortField("registerValue", 0x0000)]
class ModbusPDU06WriteSingleRegisterError(Packet):
name = "Write Single Register Exception"
fields_desc = [XByteField("funcCode", 0x86),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusPDU07ReadExceptionStatusRequest(Packet):
name = "Read Exception Status"
fields_desc = [XByteField("funcCode", 0x07)]
def extract_padding(self, s):
return "", None
class ModbusPDU07ReadExceptionStatusResponse(Packet):
name = "Read Exception Status Response"
fields_desc = [XByteField("funcCode", 0x07),
XByteField("startingAddr", 0x00)]
class ModbusPDU07ReadExceptionStatusError(Packet):
name = "Read Exception Status Exception"
fields_desc = [XByteField("funcCode", 0x87),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusPDU0FWriteMultipleCoilsRequest(Packet):
name = "Write Multiple Coils"
fields_desc = [XByteField("funcCode", 0x0F),
XShortField("startingAddr", 0x0000),
XShortField("quantityOutput", 0x0001),
BitFieldLenField("byteCount", None, 8, count_of="outputsValue"),
FieldListField("outputsValue", [0x00], XByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)]
def extract_padding(self, s):
return "", None
class ModbusPDU0FWriteMultipleCoilsResponse(Packet):
name = "Write Multiple Coils Response"
fields_desc = [XByteField("funcCode", 0x0F),
XShortField("startingAddr", 0x0000),
XShortField("quantityOutput", 0x0001)]
class ModbusPDU0FWriteMultipleCoilsError(Packet):
name = "Write Multiple Coils Exception"
fields_desc = [XByteField("funcCode", 0x8F),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusPDU10WriteMultipleRegistersRequest(Packet):
name = "Write Multiple Registers"
fields_desc = [XByteField("funcCode", 0x10),
XShortField("startingAddr", 0x0000),
BitFieldLenField("quantityRegisters", None, 16, count_of="outputsValue",),
BitFieldLenField("byteCount", None, 8, count_of="outputsValue", adjust=lambda pkt, x: x*2),
FieldListField("outputsValue", [0x0000], XShortField("", 0x0000),
count_from=lambda pkt: pkt.byteCount)]
class ModbusPDU10WriteMultipleRegistersResponse(Packet):
name = "Write Multiple Registers Response"
fields_desc = [XByteField("funcCode", 0x10),
XShortField("startingAddr", 0x0000),
XShortField("quantityRegisters", 0x0001)]
class ModbusPDU10WriteMultipleRegistersError(Packet):
name = "Write Multiple Registers Exception"
fields_desc = [XByteField("funcCode", 0x90),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusPDU11ReportSlaveIdRequest(Packet):
name = "Report Slave Id"
fields_desc = [XByteField("funcCode", 0x11)]
def extract_padding(self, s):
return "", None
class ModbusPDU11ReportSlaveIdResponse(Packet):
name = "Report Slave Id Response"
fields_desc = [XByteField("funcCode", 0x11),
BitFieldLenField("byteCount", None, 8, length_of="slaveId"),
ConditionalField(StrLenField("slaveId", "", length_from=lambda pkt: pkt.byteCount),
lambda pkt: pkt.byteCount > 0),
ConditionalField(XByteField("runIdicatorStatus", 0x00), lambda pkt: pkt.byteCount > 0)]
class ModbusPDU11ReportSlaveIdError(Packet):
name = "Report Slave Id Exception"
fields_desc = [XByteField("funcCode", 0x91),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusReadFileSubRequest(Packet):
name = "Sub-request of Read File Record"
fields_desc = [ByteField("refType", 0x06),
ShortField("fileNumber", 0x0001),
ShortField("recordNumber", 0x0000),
ShortField("recordLength", 0x0001)]
def guess_payload_class(self, payload):
return ModbusReadFileSubRequest
class ModbusPDU14ReadFileRecordRequest(Packet):
name = "Read File Record"
fields_desc = [XByteField("funcCode", 0x14),
ByteField("byteCount", None)]
def guess_payload_class(self, payload):
if self.byteCount > 0:
return ModbusReadFileSubRequest
else:
return Packet.guess_payload_class(self, payload)
def post_build(self, p, pay):
if self.byteCount is None:
l = len(pay)
p = p[:1] + struct.pack("!B", l) + p[3:]
return p + pay
class ModbusReadFileSubResponse(Packet):
name = "Sub-response"
fields_desc = [BitFieldLenField("respLength", None, 8, count_of="recData", adjust=lambda pkt, p: p*2+1),
ByteField("refType", 0x06),
FieldListField("recData", [0x0000], XShortField("", 0x0000),
count_from=lambda pkt: (pkt.respLength-1)/2)]
def guess_payload_class(self, payload):
return ModbusReadFileSubResponse
class ModbusPDU14ReadFileRecordResponse(Packet):
name = "Read File Record Response"
fields_desc = [XByteField("funcCode", 0x14),
ByteField("dataLength", None)]
def post_build(self, p, pay):
if self.dataLength is None:
l = len(pay)
p = p[:1] + struct.pack("!B", l) + p[3:]
return p + pay
def guess_payload_class(self, payload):
if self.dataLength > 0:
return ModbusReadFileSubResponse
else:
return Packet.guess_payload_class(self, payload)
class ModbusPDU14ReadFileRecordError(Packet):
name = "Read File Record Exception"
fields_desc = [XByteField("funcCode", 0x94),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
# 0x15 : Write File Record
class ModbusWriteFileSubRequest(Packet):
name = "Sub request of Write File Record"
fields_desc = [ByteField("refType", 0x06),
ShortField("fileNumber", 0x0001),
ShortField("recordNumber", 0x0000),
BitFieldLenField("recordLength", None, 16, length_of="recordData", adjust=lambda pkt, p: p/2),
FieldListField("recordData", [0x0000], ShortField("", 0x0000),
length_from=lambda pkt: pkt.recordLength*2)]
def guess_payload_class(self, payload):
if payload:
return ModbusWriteFileSubRequest
class ModbusPDU15WriteFileRecordRequest(Packet):
name = "Write File Record"
fields_desc = [XByteField("funcCode", 0x15),
ByteField("dataLength", None)]
def post_build(self, p, pay):
if self.dataLength is None:
l = len(pay)
p = p[:1] + struct.pack("!B", l) + p[3:]
return p + pay
def guess_payload_class(self, payload):
if self.dataLength > 0:
return ModbusWriteFileSubRequest
else:
return Packet.guess_payload_class(self, payload)
class ModbusWriteFileSubResponse(ModbusWriteFileSubRequest):
name = "Sub response of Write File Record"
def guess_payload_class(self, payload):
if payload:
return ModbusWriteFileSubResponse
class ModbusPDU15WriteFileRecordResponse(ModbusPDU15WriteFileRecordRequest):
name = "Write File Record Response"
def guess_payload_class(self, payload):
if self.dataLength > 0:
return ModbusWriteFileSubResponse
else:
return Packet.guess_payload_class(self, payload)
class ModbusPDU15WriteFileRecordError(Packet):
name = "Write File Record Exception"
fields_desc = [XByteField("funcCode", 0x95),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusPDU16MaskWriteRegisterRequest(Packet):
# and/or to 0xFFFF/0x0000 so that nothing is changed in memory
name = "Mask Write Register"
fields_desc = [XByteField("funcCode", 0x16),
XShortField("refAddr", 0x0000),
XShortField("andMask", 0xffff),
XShortField("orMask", 0x0000)]
class ModbusPDU16MaskWriteRegisterResponse(Packet):
name = "Mask Write Register Response"
fields_desc = [XByteField("funcCode", 0x16),
XShortField("refAddr", 0x0000),
XShortField("andMask", 0xffff),
XShortField("orMask", 0x0000)]
class ModbusPDU16MaskWriteRegisterError(Packet):
name = "Mask Write Register Exception"
fields_desc = [XByteField("funcCode", 0x96),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusPDU17ReadWriteMultipleRegistersRequest(Packet):
name = "Read Write Multiple Registers"
fields_desc = [XByteField("funcCode", 0x17),
XShortField("readStartingAddr", 0x0000),
XShortField("readQuantityRegisters", 0x0001),
XShortField("writeStartingAddr", 0x0000),
BitFieldLenField("writeQuantityRegisters", None, 16, count_of="writeRegistersValue"),
BitFieldLenField("byteCount", None, 8, count_of="writeRegistersValue", adjust=lambda pkt, x: x*2),
FieldListField("writeRegistersValue", [0x0000], XShortField("", 0x0000),
count_from=lambda pkt: pkt.byteCount)]
class ModbusPDU17ReadWriteMultipleRegistersResponse(Packet):
name = "Read Write Multiple Registers Response"
fields_desc = [XByteField("funcCode", 0x17),
BitFieldLenField("byteCount", None, 8, count_of="registerVal", adjust=lambda pkt, x: x*2),
FieldListField("registerVal", [0x0000], ShortField("", 0x0000),
count_from=lambda pkt: pkt.byteCount)]
class ModbusPDU17ReadWriteMultipleRegistersError(Packet):
name = "Read Write Multiple Exception"
fields_desc = [XByteField("funcCode", 0x97),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusPDU18ReadFIFOQueueRequest(Packet):
name = "Read FIFO Queue"
fields_desc = [XByteField("funcCode", 0x18),
XShortField("FIFOPointerAddr", 0x0000)]
class ModbusPDU18ReadFIFOQueueResponse(Packet):
name = "Read FIFO Queue Response"
fields_desc = [XByteField("funcCode", 0x18),
# TODO: ByteCount must includes size of FIFOCount
BitFieldLenField("byteCount", None, 16, count_of="FIFOVal", adjust=lambda pkt, p: p*2+2),
BitFieldLenField("FIFOCount", None, 16, count_of="FIFOVal"),
FieldListField("FIFOVal", [], ShortField("", 0x0000), count_from=lambda pkt: pkt.byteCount)]
class ModbusPDU18ReadFIFOQueueError(Packet):
name = "Read FIFO Queue Exception"
fields_desc = [XByteField("funcCode", 0x98),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
# TODO: not implemented, out of the main specification
# class ModbusPDU2B0DCANOpenGeneralReferenceRequest(Packet):
# name = "CANopen General Reference Request"
# fields_desc = []
#
#
# class ModbusPDU2B0DCANOpenGeneralReferenceResponse(Packet):
# name = "CANopen General Reference Response"
# fields_desc = []
#
#
# class ModbusPDU2B0DCANOpenGeneralReferenceError(Packet):
# name = "CANopen General Reference Error"
# fields_desc = []
# 0x2B/0x0E - Read Device Identification values
_read_device_id_codes = {1: "Basic",
2: "Regular",
3: "Extended",
4: "Specific"}
# 0x00->0x02: mandatory
# 0x03->0x06: optional
# 0x07->0x7F: Reserved (optional)
# 0x80->0xFF: product dependent private objects (optional)
_read_device_id_object_id = {0x00: "VendorName",
0x01: "ProductCode",
0x02: "MajorMinorRevision",
0x03: "VendorUrl",
0x04: "ProductName",
0x05: "ModelName",
0x06: "UserApplicationName"}
_read_device_id_conformity_lvl = {0x01: "Basic Identification (stream only)",
0x02: "Regular Identification (stream only)",
0x03: "Extended Identification (stream only)",
0x81: "Basic Identification (stream and individual access)",
0x82: "Regular Identification (stream and individual access)",
0x83: "Extended Identification (stream and individual access)"}
_read_device_id_more_follow = {0x00: "No",
0x01: "Yes"}
class ModbusPDU2B0EReadDeviceIdentificationRequest(Packet):
name = "Read Device Identification"
fields_desc = [XByteField("funcCode", 0x2B),
XByteField("MEIType", 0x0E),
ByteEnumField("readCode", 1, _read_device_id_codes),
ByteEnumField("objectId", 0x00, _read_device_id_object_id)]
class ModbusPDU2B0EReadDeviceIdentificationResponse(Packet):
name = "Read Device Identification"
fields_desc = [XByteField("funcCode", 0x2B),
XByteField("MEIType", 0x0E),
ByteEnumField("readCode", 4, _read_device_id_codes),
ByteEnumField("conformityLevel", 0x01, _read_device_id_conformity_lvl),
ByteEnumField("more", 0x00, _read_device_id_more_follow),
ByteEnumField("nextObjId", 0x00, _read_device_id_object_id),
ByteField("objCount", 0x00)]
def guess_payload_class(self, payload):
if self.objCount > 0:
return ModbusObjectId
else:
return Packet.guess_payload_class(self, payload)
class ModbusPDU2B0EReadDeviceIdentificationError(Packet):
name = "Read Exception Status Exception"
fields_desc = [XByteField("funcCode", 0xAB),
ByteEnumField("exceptCode", 1, _modbus_exceptions)]
class ModbusObjectId(Packet):
name = "Object"
fields_desc = [ByteEnumField("id", 0x00, _read_device_id_object_id),
BitFieldLenField("length", None, 8, count_of="value"),
StrLenField("value", "", length_from=lambda pkt: pkt.length)]
def guess_payload_class(self, payload):
return ModbusObjectId
_modbus_request_classes = {
0x01: ModbusPDU01ReadCoilsRequest,
0x02: ModbusPDU02ReadDiscreteInputsRequest,
0x03: ModbusPDU03ReadHoldingRegistersRequest,
0x04: ModbusPDU04ReadInputRegistersRequest,
0x05: ModbusPDU05WriteSingleCoilRequest,
0x06: ModbusPDU06WriteSingleRegisterRequest,
0x07: ModbusPDU07ReadExceptionStatusRequest,
0x0F: ModbusPDU0FWriteMultipleCoilsRequest,
0x10: ModbusPDU10WriteMultipleRegistersRequest,
0x11: ModbusPDU11ReportSlaveIdRequest,
0x14: ModbusPDU14ReadFileRecordRequest,
0x15: ModbusPDU15WriteFileRecordRequest,
0x16: ModbusPDU16MaskWriteRegisterRequest,
0x17: ModbusPDU17ReadWriteMultipleRegistersRequest,
0x18: ModbusPDU18ReadFIFOQueueRequest,
}
_modbus_error_classes = {
0x81: ModbusPDU01ReadCoilsError,
0x82: ModbusPDU02ReadDiscreteInputsError,
0x83: ModbusPDU03ReadHoldingRegistersError,
0x84: ModbusPDU04ReadInputRegistersError,
0x85: ModbusPDU05WriteSingleCoilError,
0x86: ModbusPDU06WriteSingleRegisterError,
0x87: ModbusPDU07ReadExceptionStatusError,
0x8F: ModbusPDU0FWriteMultipleCoilsError,
0x90: ModbusPDU10WriteMultipleRegistersError,
0x91: ModbusPDU11ReportSlaveIdError,
0x94: ModbusPDU14ReadFileRecordError,
0x95: ModbusPDU15WriteFileRecordError,
0x96: ModbusPDU16MaskWriteRegisterError,
0x97: ModbusPDU17ReadWriteMultipleRegistersError,
0x98: ModbusPDU18ReadFIFOQueueError,
0xAB: ModbusPDU2B0EReadDeviceIdentificationError
}
_modbus_response_classes = {
0x01: ModbusPDU01ReadCoilsResponse,
0x02: ModbusPDU02ReadDiscreteInputsResponse,
0x03: ModbusPDU03ReadHoldingRegistersResponse,
0x04: ModbusPDU04ReadInputRegistersResponse,
0x05: ModbusPDU05WriteSingleCoilResponse,
0x06: ModbusPDU06WriteSingleRegisterResponse,
0x07: ModbusPDU07ReadExceptionStatusResponse,
0x0F: ModbusPDU0FWriteMultipleCoilsResponse,
0x10: ModbusPDU10WriteMultipleRegistersResponse,
0x11: ModbusPDU11ReportSlaveIdResponse,
0x14: ModbusPDU14ReadFileRecordResponse,
0x15: ModbusPDU15WriteFileRecordResponse,
0x16: ModbusPDU16MaskWriteRegisterResponse,
0x17: ModbusPDU17ReadWriteMultipleRegistersResponse,
0x18: ModbusPDU18ReadFIFOQueueResponse
}
_mei_types_request = {
0x0E: ModbusPDU2B0EReadDeviceIdentificationRequest,
# 0x0D: ModbusPDU2B0DCANOpenGeneralReferenceRequest,
}
_mei_types_response = {
0x0E: ModbusPDU2B0EReadDeviceIdentificationResponse,
# 0x0D: ModbusPDU2B0DCANOpenGeneralReferenceResponse,
}
class ModbusADURequest(Packet):
name = "ModbusADU"
fields_desc = [XShortField("transId", 0x0000), # needs to be unique
XShortField("protoId", 0x0000), # needs to be zero (Modbus)
ShortField("len", None), # is calculated with payload
XByteField("unitId", 0xff)] # 0xFF (recommended as non-significant value) or 0x00
def guess_payload_class(self, payload):
function_code = int(payload[0].encode("hex"), 16)
sub_code = int(payload[1].encode("hex"), 16)
if function_code == 0x2B:
try:
return _mei_types_request[sub_code]
except KeyError:
pass
try:
return _modbus_request_classes[function_code]
except KeyError:
pass
return ModbusPDU00GenericRequest
def post_build(self, p, pay):
if self.len is None:
l = len(pay) + 1 # +len(p)
p = p[:4] + struct.pack("!H", l) + p[6:]
return p + pay
class ModbusADUResponse(Packet):
name = "ModbusADU"
fields_desc = [XShortField("transId", 0x0000), # needs to be unique
XShortField("protoId", 0x0000), # needs to be zero (Modbus)
ShortField("len", None), # is calculated with payload
XByteField("unitId", 0xff)] # 0xFF or 0x00 should be used for Modbus over TCP/IP
def guess_payload_class(self, payload):
function_code = int(payload[0].encode("hex"), 16)
sub_code = int(payload[1].encode("hex"), 16)
if function_code == 0x2B:
try:
return _mei_types_response[sub_code]
except KeyError:
pass
try:
return _modbus_response_classes[function_code]
except KeyError:
pass
try:
return _modbus_error_classes[function_code]
except KeyError:
pass
if function_code < 0x81:
return ModbusPDU00GenericResponse
return ModbusPDU00GenericError
def post_build(self, p, pay):
if self.len is None:
l = len(pay) + 1 # +len(p)
p = p[:4] + struct.pack("!H", l) + p[6:]
return p + pay
bind_layers(TCP, ModbusADURequest, dport=502)
bind_layers(TCP, ModbusADUResponse, sport=502)
% Modbus layer test campaign
+ Syntax check
= Import the modbus layer
from scapy.contrib.modbus import *
+ Test MBAP
= MBAP default values
str(ModbusADURequest()) == '\x00\x00\x00\x00\x00\x01\xff'
= MBAP payload length calculation
str(ModbusADURequest() / '\x00\x01\x02') == '\x00\x00\x00\x00\x00\x04\xff\x00\x01\x02'
= MBAP Guess Payload ModbusPDU01ReadCoilsRequest (simple case)
p = ModbusADURequest('\x00\x00\x00\x00\x00\x06\xff\x01\x00\x00\x00\x01')
p.payload.__class__.__name__ == 'ModbusPDU01ReadCoilsRequest'
= MBAP Guess Payload ModbusPDU01ReadCoilsResponse
p = ModbusADUResponse('\x00\x00\x00\x00\x00\x04\xff\x01\x01\x01')
p.payload.__class__.__name__ == 'ModbusPDU01ReadCoilsResponse'
= MBAP Guess Payload ModbusPDU01ReadCoilsError
p = ModbusADUResponse('\x00\x00\x00\x00\x00\x03\xff\x81\x02')
p.payload.__class__.__name__ == 'ModbusPDU01ReadCoilsError'
= MBAP Guess Payload ModbusPDU2B0EReadDeviceIdentificationRequest (2 level test)
p = ModbusADURequest('\x00\x00\x00\x00\x00\x04\xff+\x0e\x01\x00')
p.payload.__class__.__name__ == 'ModbusPDU2B0EReadDeviceIdentificationRequest'
= MBAP Guess Payload ModbusPDU2B0EReadDeviceIdentificationResponse
p = ModbusADUResponse('\x00\x00\x00\x00\x00\x1b\xff+\x0e\x01\x83\x00\x00\x03\x00\x08Pymodbus\x01\x02PM\x02\x031.0')
p.payload.__class__.__name__ == 'ModbusPDU2B0EReadDeviceIdentificationResponse'
= MBAP Guess Payload ModbusPDU2B0EReadDeviceIdentificationError
p = ModbusADUResponse('\x00\x00\x00\x00\x00\x03\xff\xab\x01')
p.payload.__class__.__name__ == 'ModbusPDU2B0EReadDeviceIdentificationError'
= MBAP Guess Payload (Invalid payload)
p = ModbusADURequest('\x00\x00\x00\x00\x00\x03\xff\xff\xff')
p.payload.__class__.__name__ == 'ModbusPDU00GenericRequest'
= MBAP Guess Payload ModbusPDU02ReadDiscreteInputsResponse
p = ModbusADUResponse('\x00\x00\x00\x00\x00\x04\xff\x80\xff\x00')
p.payload.__class__.__name__ == 'ModbusPDU00GenericResponse'
= MBAP Guess Payload ModbusPDU02ReadDiscreteInputsError
p = ModbusADUResponse('\x00\x00\x00\x00\x00\x04\xff\xff\xff\xff')
p.payload.__class__.__name__ == 'ModbusPDU00GenericError'
+ Test layer binding
= Destination port
p = TCP()/ModbusADURequest()
p[TCP].dport == 502
= Source port
p = TCP()/ModbusADUResponse()
p[TCP].sport == 502
+ Test PDU
* Note on tests cases: dissection/minimal parameters will not be done for packets that does not perform calculation
# 0x01/0x81 Read Coils --------------------------------------------------------------
= ModbusPDU01ReadCoilsRequest
str(ModbusPDU01ReadCoilsRequest()) == '\x01\x00\x00\x00\x01'
= ModbusPDU01ReadCoilsRequest minimal parameters
str(ModbusPDU01ReadCoilsRequest(startAddr=16, quantity=2)) == '\x01\x00\x10\x00\x02'
= ModbusPDU01ReadCoilsRequest dissection
p = ModbusPDU01ReadCoilsRequest('\x01\x00\x10\x00\x02')
p.startAddr == 16 and p.quantity == 2
= ModbusPDU01ReadCoilsResponse
str(ModbusPDU01ReadCoilsResponse()) == '\x01\x01\x00'
= ModbusPDU01ReadCoilsResponse minimal parameters
str(ModbusPDU01ReadCoilsResponse(coilStatus=[0x10]*3)) == '\x01\x03\x10\x10\x10'
= ModbusPDU01ReadCoilsResponse dissection
p = ModbusPDU01ReadCoilsResponse('\x01\x03\x10\x10\x10')
p.coilStatus == [16, 16, 16] and p.byteCount == 3
= ModbusPDU01ReadCoilsError
str(ModbusPDU01ReadCoilsError() == '\x81\x01')
= ModbusPDU81ReadCoilsError minimal parameters
str(ModbusPDU01ReadCoilsError(exceptCode=2)) == '\x81\x02'
= ModbusPDU81ReadCoilsError dissection
p = ModbusPDU01ReadCoilsError('\x81\x02')
p.funcCode == 0x81 and p.exceptCode == 2
# 0x02/0x82 Read Discrete Inputs Registers ------------------------------------------
= ModbusPDU02ReadDiscreteInputsRequest
str(ModbusPDU02ReadDiscreteInputsRequest()) == '\x02\x00\x00\x00\x01'
= ModbusPDU02ReadDiscreteInputsRequest minimal parameters
str(ModbusPDU02ReadDiscreteInputsRequest(startAddr=8, quantity=128)) == '\x02\x00\x08\x00\x80'
= ModbusPDU02ReadDiscreteInputsResponse
str(ModbusPDU02ReadDiscreteInputsResponse()) == '\x02\x01\x00'
= ModbusPDU02ReadDiscreteInputsResponse minimal parameters
str(ModbusPDU02ReadDiscreteInputsResponse(inputStatus=[0x02, 0x01])) == '\x02\x02\x02\x01'
= ModbusPDU02ReadDiscreteInputsRequest dissection
p = ModbusPDU02ReadDiscreteInputsResponse('\x02\x02\x02\x01')
p.byteCount == 2 and p.inputStatus == [0x02, 0x01]
= ModbusPDU02ReadDiscreteInputsError
str(ModbusPDU02ReadDiscreteInputsError()) == '\x82\x01'
# 0x03/0x83 Read Holding Registers --------------------------------------------------
= ModbusPDU03ReadHoldingRegistersRequest
str(ModbusPDU03ReadHoldingRegistersRequest()) == '\x03\x00\x00\x00\x01'
= ModbusPDU03ReadHoldingRegistersRequest minimal parameters
str(ModbusPDU03ReadHoldingRegistersRequest(startAddr=2048, quantity=16)) == '\x03\x08\x00\x00\x10'
= ModbusPDU03ReadHoldingRegistersResponse
str(ModbusPDU03ReadHoldingRegistersResponse()) == '\x03\x02\x00\x00'
= ModbusPDU03ReadHoldingRegistersResponse minimal parameters
1==1
= ModbusPDU03ReadHoldingRegistersResponse dissection
p = ModbusPDU03ReadHoldingRegistersResponse('\x03\x06\x02+\x00\x00\x00d')
p.byteCount == 6 and p.registerVal == [555, 0, 100]
= ModbusPDU03ReadHoldingRegistersError
str(ModbusPDU03ReadHoldingRegistersError()) == '\x83\x01'
# 0x04/0x84 Read Input Register -----------------------------------------------------
= ModbusPDU04ReadInputRegistersRequest
str(ModbusPDU04ReadInputRegistersRequest()) == '\x04\x00\x00\x00\x01'
= ModbusPDU04ReadInputRegistersResponse
str(ModbusPDU04ReadInputRegistersResponse()) == '\x04\x02\x00\x00'
= ModbusPDU04ReadInputRegistersResponse minimal parameters
str(ModbusPDU04ReadInputRegistersResponse(registerVal=[0x01, 0x02])) == '\x04\x04\x00\x01\x00\x02'
= ModbusPDU04ReadInputRegistersError
str(ModbusPDU04ReadInputRegistersError()) == '\x84\x01'
# 0x05/0x85 Write Single Coil -------------------------------------------------------
= ModbusPDU05WriteSingleCoilRequest
str(ModbusPDU05WriteSingleCoilRequest()) == '\x05\x00\x00\x00\x00'
= ModbusPDU05WriteSingleCoilResponse
str(ModbusPDU05WriteSingleCoilResponse()) == '\x05\x00\x00\x00\x00'
= ModbusPDU05WriteSingleCoilError
str(ModbusPDU05WriteSingleCoilError()) == '\x85\x01'
# 0x06/0x86 Write Single Register ---------------------------------------------------
= ModbusPDU06WriteSingleRegisterError
str(ModbusPDU06WriteSingleRegisterRequest()) == '\x06\x00\x00\x00\x00'
= ModbusPDU06WriteSingleRegisterResponse
str(ModbusPDU06WriteSingleRegisterResponse()) == '\x06\x00\x00\x00\x00'
= ModbusPDU06WriteSingleRegisterError
str(ModbusPDU06WriteSingleRegisterError()) == '\x86\x01'
# 0x07/0x87 Read Exception Status (serial line only) --------------------------------
# 0x08/0x88 Diagnostics (serial line only) ------------------------------------------
# 0x0b Get Comm Event Counter: serial line only -------------------------------------
# 0x0c Get Comm Event Log: serial line only -----------------------------------------
# 0x0f/0x8f Write Multiple Coils ----------------------------------------------------
= ModbusPDU0FWriteMultipleCoilsRequest
str(ModbusPDU0FWriteMultipleCoilsRequest())
= ModbusPDU0FWriteMultipleCoilsRequest minimal parameters
str(ModbusPDU0FWriteMultipleCoilsRequest(outputsValue=[0x01, 0x01])) == '\x0f\x00\x00\x00\x01\x02\x01\x01'
= ModbusPDU0FWriteMultipleCoilsResponse
str(ModbusPDU0FWriteMultipleCoilsResponse()) == '\x0f\x00\x00\x00\x01'
= ModbusPDU0FWriteMultipleCoilsError
str(ModbusPDU0FWriteMultipleCoilsError()) == '\x8f\x01'
# 0x10/0x90 Write Multiple Registers ----------------------------------------------------
= ModbusPDU10WriteMultipleRegistersRequest
str(ModbusPDU10WriteMultipleRegistersRequest()) == '\x10\x00\x00\x00\x01\x02\x00\x00'
= ModbusPDU10WriteMultipleRegistersRequest minimal parameters
str(ModbusPDU10WriteMultipleRegistersRequest(outputsValue=[0x0001, 0x0002])) == '\x10\x00\x00\x00\x02\x04\x00\x01\x00\x02'
= ModbusPDU10WriteMultipleRegistersResponse
str(ModbusPDU10WriteMultipleRegistersResponse()) == '\x10\x00\x00\x00\x01'
= ModbusPDU10WriteMultipleRegistersError
str(ModbusPDU10WriteMultipleRegistersError()) == '\x90\x01'
# 0x11/91 Report Server ID: serial line only ----------------------------------------
# 0x14/944 Read File Record ---------------------------------------------------------
= ModbusPDU14ReadFileRecordRequest len parameters
str(ModbusPDU14ReadFileRecordRequest()/ModbusReadFileSubRequest()/ModbusReadFileSubRequest()) == '\x14\x0e\x06\x00\x01\x00\x00\x00\x01\x06\x00\x01\x00\x00\x00\x01'
= ModbusPDU14ReadFileRecordRequest minimal parameters
str(ModbusPDU14ReadFileRecordRequest()/ModbusReadFileSubRequest(fileNumber=4, recordNumber=1, recordLength=02)/ModbusReadFileSubRequest(fileNumber=3, recordNumber=9, recordLength=2)) == '\x14\x0e\x06\x00\x04\x00\x01\x00\x02\x06\x00\x03\x00\t\x00\x02'
= ModbusPDU14ReadFileRecordRequest dissection
p = ModbusPDU14ReadFileRecordRequest('\x14\x0e\x06\x00\x04\x00\x01\x00\x02\x06\x00\x03\x00\t\x00\x02')
p.payload.__class__.__name__ == 'ModbusReadFileSubRequest' and p.payload.payload.__class__.__name__ == 'ModbusReadFileSubRequest'
= ModbusPDU14ReadFileRecordResponse minimal parameters
str(ModbusPDU14ReadFileRecordResponse()/ModbusReadFileSubResponse(recData=[0x0dfe, 0x0020])/ModbusReadFileSubResponse(recData=[0x33cd, 0x0040])) == '\x14\x0c\x05\x06\r\xfe\x00 \x05\x063\xcd\x00@'
= ModbusPDU14ReadFileRecordResponse dissection
p = ModbusPDU14ReadFileRecordResponse('\x14\x0c\x05\x06\r\xfe\x00 \x05\x063\xcd\x00@')
p.payload.__class__.__name__ == 'ModbusReadFileSubResponse' and p.payload.payload.__class__.__name__ == 'ModbusReadFileSubResponse'
= ModbusPDU14ReadFileRecordError
str(ModbusPDU14ReadFileRecordError()) == '\x94\x01'
# 0x15/0x95 Write File Record -------------------------------------------------------
= ModbusPDU15WriteFileRecordRequest minimal parameters
str(ModbusPDU15WriteFileRecordRequest()/ModbusWriteFileSubRequest(fileNumber=4, recordNumber=07, recordData=[0x06af, 0x04be, 0x100d])) == '\x15\r\x06\x00\x04\x00\x07\x00\x03\x06\xaf\x04\xbe\x10\r'
= ModbusPDU15WriteFileRecordRequest dissection
p = ModbusPDU15WriteFileRecordRequest('\x15\x0d\x06\x00\x04\x00\x07\x00\x03\x06\xaf\x04\xbe\x10\r')
p.payload.__class__.__name__ == 'ModbusWriteFileSubRequest' and p.payload.recordLength == 3
= ModbusPDU15WriteFileRecordResponse minimal parameters
str(ModbusPDU15WriteFileRecordResponse()/ModbusWriteFileSubResponse(fileNumber=4, recordNumber=07, recordData=[0x06af, 0x04be, 0x100d])) == '\x15\r\x06\x00\x04\x00\x07\x00\x03\x06\xaf\x04\xbe\x10\r'
= ModbusPDU15WriteFileRecordResponse dissection
p = ModbusPDU15WriteFileRecordResponse('\x15\x0d\x06\x00\x04\x00\x07\x00\x03\x06\xaf\x04\xbe\x10\r')
p.payload.__class__.__name__ == 'ModbusWriteFileSubResponse' and p.payload.recordLength == 3
= ModbusPDU15WriteFileRecordError
str(ModbusPDU15WriteFileRecordError()) == '\x95\x01'
# 0x16/0x96 Mask Write Register -----------------------------------------------------
= ModbusPDU16MaskWriteRegisterRequest
str(ModbusPDU16MaskWriteRegisterRequest()) == '\x16\x00\x00\xff\xff\x00\x00'
= ModbusPDU16MaskWriteRegisterResponse
str(ModbusPDU16MaskWriteRegisterResponse()) == '\x16\x00\x00\xff\xff\x00\x00'
= ModbusPDU16MaskWriteRegisterError
str(ModbusPDU16MaskWriteRegisterError()) == '\x96\x01'
# 0x17/0x97 Read/Write Multiple Registers -------------------------------------------
= ModbusPDU17ReadWriteMultipleRegistersRequest
str(ModbusPDU17ReadWriteMultipleRegistersRequest()) == '\x17\x00\x00\x00\x01\x00\x00\x00\x01\x02\x00\x00'
= ModbusPDU17ReadWriteMultipleRegistersRequest minimal parameters
str(ModbusPDU17ReadWriteMultipleRegistersRequest(writeRegistersValue=[0x0001, 0x0002])) == '\x17\x00\x00\x00\x01\x00\x00\x00\x02\x04\x00\x01\x00\x02'
= ModbusPDU17ReadWriteMultipleRegistersRequest dissection
p = ModbusPDU17ReadWriteMultipleRegistersRequest('\x17\x00\x00\x00\x01\x00\x00\x00\x02\x04\x00\x01\x00\x02')
p.byteCount == 4 and p.writeQuantityRegisters == 2
= ModbusPDU17ReadWriteMultipleRegistersResponse
str(ModbusPDU17ReadWriteMultipleRegistersResponse()) == '\x17\x02\x00\x00'
= ModbusPDU17ReadWriteMultipleRegistersResponse minimal parameters
str(ModbusPDU17ReadWriteMultipleRegistersResponse(registerVal=[1,2,3])) == '\x17\x06\x00\x01\x00\x02\x00\x03'
= ModbusPDU17ReadWriteMultipleRegistersResponse dissection
str(ModbusPDU17ReadWriteMultipleRegistersResponse('\x17\x02\x00\x01')) == '\x17\x02\x00\x01'
= ModbusPDU17ReadWriteMultipleRegistersError
str(ModbusPDU17ReadWriteMultipleRegistersError()) == '\x97\x01'
# 0x18/0x88 Read FIFO Queue ---------------------------------------------------------
= ModbusPDU18ReadFIFOQueueRequest
str(ModbusPDU18ReadFIFOQueueRequest()) == '\x18\x00\x00'
= ModbusPDU18ReadFIFOQueueResponse
= ModbusPDU18ReadFIFOQueueResponse
str(ModbusPDU18ReadFIFOQueueResponse()) == '\x18\x00\x02\x00\x00'
= ModbusPDU18ReadFIFOQueueResponse minimal parameters
str(ModbusPDU18ReadFIFOQueueResponse(FIFOVal=[0x0001, 0x0002, 0x0003])) == '\x18\x00\x08\x00\x03\x00\x01\x00\x02\x00\x03'
= ModbusPDU18ReadFIFOQueueResponse dissection
p = ModbusPDU18ReadFIFOQueueResponse('\x18\x00\x08\x00\x03\x00\x01\x00\x02\x00\x03')
p.byteCount == 8 and p.FIFOCount == 3
= ModbusPDU18ReadFIFOQueueError
str(ModbusPDU18ReadFIFOQueueError()) == '\x98\x01'
# 0x2b encapsulated Interface Transport ---------------------------------------------
# 0x2b 0xOD CANopen General Reference (out of the main specification) ---------------
# 0x2b 0xOE Read Device Information -------------------------------------------------
= ModbusPDU2B0EReadDeviceIdentificationRequest
str(ModbusPDU2B0EReadDeviceIdentificationRequest()) =='+\x0e\x01\x00'
= ModbusPDU2B0EReadDeviceIdentificationResponse
str(ModbusPDU2B0EReadDeviceIdentificationResponse()) == '+\x0e\x04\x01\x00\x00\x00'
= ModbusPDU2B0EReadDeviceIdentificationResponse dissection
p = ModbusPDU2B0EReadDeviceIdentificationResponse('+\x0e\x01\x83\x00\x00\x03\x00\x08Pymodbus\x01\x02PM\x02\x031.0')
p.payload.payload.payload.id == 2 and p.payload.payload.id == 1 and p.payload.id == 0
= ModbusPDU2B0EReadDeviceIdentificationError
str(ModbusPDU2B0EReadDeviceIdentificationError()) == '\xab\x01'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment