Skip to content
Snippets Groups Projects
Commit 4f0ebca1 authored by Pierre Lalet's avatar Pierre Lalet Committed by GitHub
Browse files

Merge pull request #663 from gpotter2/patch-file-format

[FF] Fix file format in the scapy project
parents 06e73eee 0ef62bfe
No related branches found
No related tags found
No related merge requests found
scapy/__init__.py export-subst
* text=auto
*.bat text eol=crlf
@echo off
REM Use Python to run the UTscapy script from the current directory, passing all parameters
title UTscapy
python "%~dp0\UTscapy" %*
@echo off
REM Use Python to run the UTscapy script from the current directory, passing all parameters
title UTscapy
python "%~dp0\UTscapy" %*
@echo off
REM Use Python to run the Scapy script from the current directory, passing all parameters
title scapy
python "%~dp0\scapy" %*
@echo off
REM Use Python to run the Scapy script from the current directory, passing all parameters
title scapy
python "%~dp0\scapy" %*
@echo off
set PYTHONPATH=%cd%
python -m scapy.__init__
if errorlevel 1 (
PAUSE
)
@echo off
set PYTHONPATH=%cd%
python -m scapy.__init__
if errorlevel 1 (
PAUSE
)
This diff is collapsed.
# MQTT layer unit tests
# Copyright (C) Santiago Hernandez Ramos <shramos@protonmail.com>
#
# Type the following command to launch start the tests:
# $ test/run_tests -P "load_contrib('mqtt')" -t scapy/contrib/mqtt.uts
+ Syntax check
= Import the MQTT layer
from scapy.contrib.mqtt import *
+ MQTT protocol test
= MQTTPublish, packet instanciation
p = MQTT()/MQTTPublish(topic='test1',value='test2')
assert(p.type == 3)
assert(p.topic == 'test1')
assert(p.value == 'test2')
assert(p.len == None)
assert(p.length == None)
= Fixed header and MQTTPublish, packet dissection
s = b'0\n\x00\x04testtest'
publish = MQTT(s)
assert(publish.type == 3)
assert(publish.QOS == 0)
assert(publish.DUP == 0)
assert(publish.RETAIN == 0)
assert(publish.len == 10)
assert(publish[MQTTPublish].length == 4)
assert(publish[MQTTPublish].topic == 'test')
assert(publish[MQTTPublish].value == 'test')
= MQTTConnect, packet instanciation
c = MQTT()/MQTTConnect(clientIdlen=5, clientId='newid')
assert(c.type == 1)
assert(c.clientId == 'newid')
assert(c.clientIdlen == 5)
= MQTTConnect, packet dissection
s = b'\x10\x1f\x00\x06MQIsdp\x03\x02\x00<\x00\x11mosqpub/1440-kali'
connect = MQTT(s)
assert(connect.length == 6)
assert(connect.protoname == 'MQIsdp')
assert(connect.protolevel == 3)
assert(connect.usernameflag == 0)
assert(connect.passwordflag == 0)
assert(connect.willretainflag == 0)
assert(connect.willQOSflag == 0)
assert(connect.willflag == 0)
assert(connect.cleansess == 1)
assert(connect.reserved == 0)
assert(connect.klive == 60)
assert(connect.clientIdlen == 17)
assert(connect.clientId == 'mosqpub/1440-kali')
=MQTTConnack, packet instanciation
ck = MQTT()/MQTTConnack(sessPresentFlag=1,retcode=0)
assert(ck.type == 2)
assert(ck.sessPresentFlag == 1)
assert(ck.retcode == 0)
= MQTTConnack, packet dissection
s = b' \x02\x00\x00'
connack = MQTT(s)
assert(connack.sessPresentFlag == 0)
assert(connack.retcode == 0)
= MQTTSubscribe, packet instanciation
sb = MQTT()/MQTTSubscribe(msgid=1,topic='newtopic',QOS=0,length=0)
assert(sb.type == 8)
assert(sb.msgid == 1)
assert(sb.topic == 'newtopic')
assert(sb.length == 0)
assert(sb[MQTTSubscribe].QOS == 0)
= MQTTSubscribe, packet dissection
s = b'\x82\t\x00\x01\x00\x04test\x00'
subscribe = MQTT(s)
assert(subscribe.msgid == 1)
assert(subscribe.length == 4)
assert(subscribe.topic == 'test')
assert(subscribe.QOS == 1)
= MQTTSuback, packet instanciation
sk = MQTT()/MQTTSuback(msgid=1, retcode=0)
assert(sk.type == 9)
assert(sk.msgid == 1)
assert(sk.retcode == 0)
= MQTTSuback, packet dissection
s = b'\x90\x03\x00\x01\x00'
suback = MQTT(s)
assert(suback.msgid == 1)
assert(suback.retcode == 0)
= MQTTPubrec, packet instanciation
pc = MQTT()/MQTTPubrec(msgid=1)
assert(pc.type == 5)
assert(pc.msgid == 1)
= MQTTPubrec packet dissection
s = b'P\x02\x00\x01'
pubrec = MQTT(s)
assert(pubrec.msgid == 1)
# MQTT layer unit tests
# Copyright (C) Santiago Hernandez Ramos <shramos@protonmail.com>
#
# Type the following command to launch start the tests:
# $ test/run_tests -P "load_contrib('mqtt')" -t scapy/contrib/mqtt.uts
+ Syntax check
= Import the MQTT layer
from scapy.contrib.mqtt import *
+ MQTT protocol test
= MQTTPublish, packet instanciation
p = MQTT()/MQTTPublish(topic='test1',value='test2')
assert(p.type == 3)
assert(p.topic == 'test1')
assert(p.value == 'test2')
assert(p.len == None)
assert(p.length == None)
= Fixed header and MQTTPublish, packet dissection
s = b'0\n\x00\x04testtest'
publish = MQTT(s)
assert(publish.type == 3)
assert(publish.QOS == 0)
assert(publish.DUP == 0)
assert(publish.RETAIN == 0)
assert(publish.len == 10)
assert(publish[MQTTPublish].length == 4)
assert(publish[MQTTPublish].topic == 'test')
assert(publish[MQTTPublish].value == 'test')
= MQTTConnect, packet instanciation
c = MQTT()/MQTTConnect(clientIdlen=5, clientId='newid')
assert(c.type == 1)
assert(c.clientId == 'newid')
assert(c.clientIdlen == 5)
= MQTTConnect, packet dissection
s = b'\x10\x1f\x00\x06MQIsdp\x03\x02\x00<\x00\x11mosqpub/1440-kali'
connect = MQTT(s)
assert(connect.length == 6)
assert(connect.protoname == 'MQIsdp')
assert(connect.protolevel == 3)
assert(connect.usernameflag == 0)
assert(connect.passwordflag == 0)
assert(connect.willretainflag == 0)
assert(connect.willQOSflag == 0)
assert(connect.willflag == 0)
assert(connect.cleansess == 1)
assert(connect.reserved == 0)
assert(connect.klive == 60)
assert(connect.clientIdlen == 17)
assert(connect.clientId == 'mosqpub/1440-kali')
=MQTTConnack, packet instanciation
ck = MQTT()/MQTTConnack(sessPresentFlag=1,retcode=0)
assert(ck.type == 2)
assert(ck.sessPresentFlag == 1)
assert(ck.retcode == 0)
= MQTTConnack, packet dissection
s = b' \x02\x00\x00'
connack = MQTT(s)
assert(connack.sessPresentFlag == 0)
assert(connack.retcode == 0)
= MQTTSubscribe, packet instanciation
sb = MQTT()/MQTTSubscribe(msgid=1,topic='newtopic',QOS=0,length=0)
assert(sb.type == 8)
assert(sb.msgid == 1)
assert(sb.topic == 'newtopic')
assert(sb.length == 0)
assert(sb[MQTTSubscribe].QOS == 0)
= MQTTSubscribe, packet dissection
s = b'\x82\t\x00\x01\x00\x04test\x00'
subscribe = MQTT(s)
assert(subscribe.msgid == 1)
assert(subscribe.length == 4)
assert(subscribe.topic == 'test')
assert(subscribe.QOS == 1)
= MQTTSuback, packet instanciation
sk = MQTT()/MQTTSuback(msgid=1, retcode=0)
assert(sk.type == 9)
assert(sk.msgid == 1)
assert(sk.retcode == 0)
= MQTTSuback, packet dissection
s = b'\x90\x03\x00\x01\x00'
suback = MQTT(s)
assert(suback.msgid == 1)
assert(suback.retcode == 0)
= MQTTPubrec, packet instanciation
pc = MQTT()/MQTTPubrec(msgid=1)
assert(pc.type == 5)
assert(pc.msgid == 1)
= MQTTPubrec packet dissection
s = b'P\x02\x00\x01'
pubrec = MQTT(s)
assert(pubrec.msgid == 1)
# This file is part of Scapy
# Scapy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# any later version.
#
# Scapy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scapy. If not, see <http://www.gnu.org/licenses/>.
# author: <jellch@harris.com>
# scapy.contrib.description = PPI
# scapy.contrib.status = loads
"""
PPI (Per-Packet Information).
"""
import logging,struct
from scapy.config import conf
from scapy.packet import *
from scapy.fields import *
from scapy.layers.l2 import Ether
from scapy.layers.dot11 import Dot11
# Dictionary to map the TLV type to the class name of a sub-packet
_ppi_types = {}
def addPPIType(id, value):
_ppi_types[id] = value
def getPPIType(id, default="default"):
return _ppi_types.get(id, _ppi_types.get(default, None))
# Default PPI Field Header
class PPIGenericFldHdr(Packet):
name = "PPI Field Header"
fields_desc = [ LEShortField('pfh_type', 0),
FieldLenField('pfh_length', None, length_of="value", fmt='<H', adjust=lambda p,x:x+4),
StrLenField("value", "", length_from=lambda p:p.pfh_length) ]
def extract_padding(self, p):
return "",p
def _PPIGuessPayloadClass(p, **kargs):
""" This function tells the PacketListField how it should extract the
TLVs from the payload. We pass cls only the length string
pfh_len says it needs. If a payload is returned, that means
part of the sting was unused. This converts to a Raw layer, and
the remainder of p is added as Raw's payload. If there is no
payload, the remainder of p is added as out's payload.
"""
if len(p) >= 4:
t,pfh_len = struct.unpack("<HH", p[:4])
# Find out if the value t is in the dict _ppi_types.
# If not, return the default TLV class
cls = getPPIType(t, "default")
pfh_len += 4
out = cls(p[:pfh_len], **kargs)
if (out.payload):
out.payload = conf.raw_layer(out.payload.load)
if (len(p) > pfh_len):
out.payload.payload = conf.padding_layer(p[pfh_len:])
elif (len(p) > pfh_len):
out.payload = conf.padding_layer(p[pfh_len:])
else:
out = conf.raw_layer(p, **kargs)
return out
class PPI(Packet):
name = "PPI Packet Header"
fields_desc = [ ByteField('pph_version', 0),
ByteField('pph_flags', 0),
FieldLenField('pph_len', None, length_of="PPIFieldHeaders", fmt="<H", adjust=lambda p,x:x+8 ),
LEIntField('dlt', None),
PacketListField("PPIFieldHeaders", [], _PPIGuessPayloadClass, length_from=lambda p:p.pph_len-8,) ]
def guess_payload_class(self,payload):
return conf.l2types.get(self.dlt, Packet.guess_payload_class(self, payload))
#Register PPI
addPPIType("default", PPIGenericFldHdr)
conf.l2types.register(192, PPI)
conf.l2types.register_num2layer(192, PPI)
bind_layers(PPI, Dot11, dlt=conf.l2types.get(Dot11))
bind_layers(Dot11, PPI)
bind_layers(PPI, Ether, dlt=conf.l2types.get(Ether))
bind_layers(Dot11, Ether)
# This file is part of Scapy
# Scapy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# any later version.
#
# Scapy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scapy. If not, see <http://www.gnu.org/licenses/>.
# author: <jellch@harris.com>
# scapy.contrib.description = PPI
# scapy.contrib.status = loads
"""
PPI (Per-Packet Information).
"""
import logging,struct
from scapy.config import conf
from scapy.packet import *
from scapy.fields import *
from scapy.layers.l2 import Ether
from scapy.layers.dot11 import Dot11
# Dictionary to map the TLV type to the class name of a sub-packet
_ppi_types = {}
def addPPIType(id, value):
_ppi_types[id] = value
def getPPIType(id, default="default"):
return _ppi_types.get(id, _ppi_types.get(default, None))
# Default PPI Field Header
class PPIGenericFldHdr(Packet):
name = "PPI Field Header"
fields_desc = [ LEShortField('pfh_type', 0),
FieldLenField('pfh_length', None, length_of="value", fmt='<H', adjust=lambda p,x:x+4),
StrLenField("value", "", length_from=lambda p:p.pfh_length) ]
def extract_padding(self, p):
return "",p
def _PPIGuessPayloadClass(p, **kargs):
""" This function tells the PacketListField how it should extract the
TLVs from the payload. We pass cls only the length string
pfh_len says it needs. If a payload is returned, that means
part of the sting was unused. This converts to a Raw layer, and
the remainder of p is added as Raw's payload. If there is no
payload, the remainder of p is added as out's payload.
"""
if len(p) >= 4:
t,pfh_len = struct.unpack("<HH", p[:4])
# Find out if the value t is in the dict _ppi_types.
# If not, return the default TLV class
cls = getPPIType(t, "default")
pfh_len += 4
out = cls(p[:pfh_len], **kargs)
if (out.payload):
out.payload = conf.raw_layer(out.payload.load)
if (len(p) > pfh_len):
out.payload.payload = conf.padding_layer(p[pfh_len:])
elif (len(p) > pfh_len):
out.payload = conf.padding_layer(p[pfh_len:])
else:
out = conf.raw_layer(p, **kargs)
return out
class PPI(Packet):
name = "PPI Packet Header"
fields_desc = [ ByteField('pph_version', 0),
ByteField('pph_flags', 0),
FieldLenField('pph_len', None, length_of="PPIFieldHeaders", fmt="<H", adjust=lambda p,x:x+8 ),
LEIntField('dlt', None),
PacketListField("PPIFieldHeaders", [], _PPIGuessPayloadClass, length_from=lambda p:p.pph_len-8,) ]
def guess_payload_class(self,payload):
return conf.l2types.get(self.dlt, Packet.guess_payload_class(self, payload))
#Register PPI
addPPIType("default", PPIGenericFldHdr)
conf.l2types.register(192, PPI)
conf.l2types.register_num2layer(192, PPI)
bind_layers(PPI, Dot11, dlt=conf.l2types.get(Dot11))
bind_layers(Dot11, PPI)
bind_layers(PPI, Ether, dlt=conf.l2types.get(Ether))
bind_layers(Dot11, Ether)
# This file is part of Scapy
# Scapy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# any later version.
#
# Scapy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scapy. If not, see <http://www.gnu.org/licenses/>.
# author: <jellch@harris.com>
# scapy.contrib.description = PPI CACE
# scapy.contrib.status = loads
"""
CACE PPI types
"""
import logging,struct
from scapy.config import conf
from scapy.packet import *
from scapy.fields import *
from scapy.layers.l2 import Ether
from scapy.layers.dot11 import Dot11
from scapy.contrib.ppi import *
PPI_DOT11COMMON = 2
PPI_DOT11NMAC = 3
PPI_DOT11NMACPHY = 4
PPI_SPECTRUMMAP = 5
PPI_PROCESSINFO = 6
PPI_CAPTUREINFO = 7
PPI_AGGREGATION = 8
PPI_DOT3 = 9
# PPI 802.11 Common Field Header Fields
class dBmByteField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "b")
def i2repr(self, pkt, val):
if (val != None):
val = "%4d dBm" % val
return val
class PPITSFTField(LELongField):
def i2h(self, pkt, val):
flags = 0
if (pkt):
flags = pkt.getfieldval("Pkt_Flags")
if not flags:
flags = 0
if (flags & 0x02):
scale = 1e-3
else:
scale = 1e-6
tout = scale * float(val)
return tout
def h2i(self, pkt, val):
scale = 1e6
if pkt:
flags = pkt.getfieldval("Pkt_Flags")
if flags:
if (flags & 0x02):
scale = 1e3
tout = int((scale * val) + 0.5)
return tout
_PPIDot11CommonChFlags = ['','','','','Turbo','CCK','OFDM','2GHz','5GHz',
'PassiveOnly','Dynamic CCK-OFDM','GSFK']
_PPIDot11CommonPktFlags = ['FCS','TSFT_ms','FCS_Invalid','PHY_Error']
# PPI 802.11 Common Field Header
class Dot11Common(Packet):
name = "PPI 802.11-Common"
fields_desc = [ LEShortField('pfh_type',PPI_DOT11COMMON),
LEShortField('pfh_length', 20),
PPITSFTField('TSF_Timer', 0),
FlagsField('Pkt_Flags',0, -16, _PPIDot11CommonPktFlags),
LEShortField('Rate',0),
LEShortField('Ch_Freq',0),
FlagsField('Ch_Flags', 0, -16, _PPIDot11CommonChFlags),
ByteField('FHSS_Hop',0),
ByteField('FHSS_Pat',0),
dBmByteField('Antsignal',-128),
dBmByteField('Antnoise',-128)]
def extract_padding(self, p):
return "",p
#Hopefully other CACE defined types will be added here.
#Add the dot11common layer to the PPI array
addPPIType(PPI_DOT11COMMON, Dot11Common)
# This file is part of Scapy
# Scapy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# any later version.
#
# Scapy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scapy. If not, see <http://www.gnu.org/licenses/>.
# author: <jellch@harris.com>
# scapy.contrib.description = PPI CACE
# scapy.contrib.status = loads
"""
CACE PPI types
"""
import logging,struct
from scapy.config import conf
from scapy.packet import *
from scapy.fields import *
from scapy.layers.l2 import Ether
from scapy.layers.dot11 import Dot11
from scapy.contrib.ppi import *
PPI_DOT11COMMON = 2
PPI_DOT11NMAC = 3
PPI_DOT11NMACPHY = 4
PPI_SPECTRUMMAP = 5
PPI_PROCESSINFO = 6
PPI_CAPTUREINFO = 7
PPI_AGGREGATION = 8
PPI_DOT3 = 9
# PPI 802.11 Common Field Header Fields
class dBmByteField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "b")
def i2repr(self, pkt, val):
if (val != None):
val = "%4d dBm" % val
return val
class PPITSFTField(LELongField):
def i2h(self, pkt, val):
flags = 0
if (pkt):
flags = pkt.getfieldval("Pkt_Flags")
if not flags:
flags = 0
if (flags & 0x02):
scale = 1e-3
else:
scale = 1e-6
tout = scale * float(val)
return tout
def h2i(self, pkt, val):
scale = 1e6
if pkt:
flags = pkt.getfieldval("Pkt_Flags")
if flags:
if (flags & 0x02):
scale = 1e3
tout = int((scale * val) + 0.5)
return tout
_PPIDot11CommonChFlags = ['','','','','Turbo','CCK','OFDM','2GHz','5GHz',
'PassiveOnly','Dynamic CCK-OFDM','GSFK']
_PPIDot11CommonPktFlags = ['FCS','TSFT_ms','FCS_Invalid','PHY_Error']
# PPI 802.11 Common Field Header
class Dot11Common(Packet):
name = "PPI 802.11-Common"
fields_desc = [ LEShortField('pfh_type',PPI_DOT11COMMON),
LEShortField('pfh_length', 20),
PPITSFTField('TSF_Timer', 0),
FlagsField('Pkt_Flags',0, -16, _PPIDot11CommonPktFlags),
LEShortField('Rate',0),
LEShortField('Ch_Freq',0),
FlagsField('Ch_Flags', 0, -16, _PPIDot11CommonChFlags),
ByteField('FHSS_Hop',0),
ByteField('FHSS_Pat',0),
dBmByteField('Antsignal',-128),
dBmByteField('Antnoise',-128)]
def extract_padding(self, p):
return "",p
#Hopefully other CACE defined types will be added here.
#Add the dot11common layer to the PPI array
addPPIType(PPI_DOT11COMMON, Dot11Common)
This diff is collapsed.
## RSVP layer
# This file is part of Scapy
# Scapy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# any later version.
#
# Scapy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scapy. If not, see <http://www.gnu.org/licenses/>.
# scapy.contrib.description = RSVP
# scapy.contrib.status = loads
from scapy.packet import *
from scapy.fields import *
from scapy.layers.inet import IP
rsvpmsgtypes = { 0x01 : "Path",
0x02 : "Reservation request",
0x03 : "Path error",
0x04 : "Reservation request error",
0x05 : "Path teardown",
0x06 : "Reservation teardown",
0x07 : "Reservation request acknowledgment"
}
class RSVP(Packet):
name = "RSVP"
fields_desc = [ BitField("Version",1,4),
BitField("Flags",1,4),
ByteEnumField("Class",0x01, rsvpmsgtypes),
XShortField("chksum", None),
ByteField("TTL",1),
XByteField("dataofs", 0),
ShortField("Length",None)]
def post_build(self, p, pay):
p += pay
if self.Length is None:
l = len(p)
p = p[:6]+chr((l>>8)&0xff)+chr(l&0xff)+p[8:]
if self.chksum is None:
ck = checksum(p)
p = p[:2]+chr(ck>>8)+chr(ck&0xff)+p[4:]
return p
rsvptypes = { 0x01 : "Session",
0x03 : "HOP",
0x04 : "INTEGRITY",
0x05 : "TIME_VALUES",
0x06 : "ERROR_SPEC",
0x07 : "SCOPE",
0x08 : "STYLE",
0x09 : "FLOWSPEC",
0x0A : "FILTER_SPEC",
0x0B : "SENDER_TEMPLATE",
0x0C : "SENDER_TSPEC",
0x0D : "ADSPEC",
0x0E : "POLICY_DATA",
0x0F : "RESV_CONFIRM",
0x10 : "RSVP_LABEL",
0x11 : "HOP_COUNT",
0x12 : "STRICT_SOURCE_ROUTE",
0x13 : "LABEL_REQUEST",
0x14 : "EXPLICIT_ROUTE",
0x15 : "ROUTE_RECORD",
0x16 : "HELLO",
0x17 : "MESSAGE_ID",
0x18 : "MESSAGE_ID_ACK",
0x19 : "MESSAGE_ID_LIST",
0x1E : "DIAGNOSTIC",
0x1F : "ROUTE",
0x20 : "DIAG_RESPONSE",
0x21 : "DIAG_SELECT",
0x22 : "RECOVERY_LABEL",
0x23 : "UPSTREAM_LABEL",
0x24 : "LABEL_SET",
0x25 : "PROTECTION",
0x26 : "PRIMARY PATH ROUTE",
0x2A : "DSBM IP ADDRESS",
0x2B : "SBM_PRIORITY",
0x2C : "DSBM TIMER INTERVALS",
0x2D : "SBM_INFO",
0x32 : "S2L_SUB_LSP",
0x3F : "DETOUR",
0x40 : "CHALLENGE",
0x41 : "DIFF-SERV",
0x42 : "CLASSTYPE",
0x43 : "LSP_REQUIRED_ATTRIBUTES",
0x80 : "NODE_CHAR",
0x81 : "SUGGESTED_LABEL",
0x82 : "ACCEPTABLE_LABEL_SET",
0x83 : "RESTART_CA",
0x84 : "SESSION-OF-INTEREST",
0x85 : "LINK_CAPABILITY",
0x86 : "Capability Object",
0xA1 : "RSVP_HOP_L2",
0xA2 : "LAN_NHOP_L2",
0xA3 : "LAN_NHOP_L3",
0xA4 : "LAN_LOOPBACK",
0xA5 : "TCLASS",
0xC0 : "TUNNEL",
0xC1 : "LSP_TUNNEL_INTERFACE_ID",
0xC2 : "USER_ERROR_SPEC",
0xC3 : "NOTIFY_REQUEST",
0xC4 : "ADMIN-STATUS",
0xC5 : "LSP_ATTRIBUTES",
0xC6 : "ALARM_SPEC",
0xC7 : "ASSOCIATION",
0xC8 : "SECONDARY_EXPLICIT_ROUTE",
0xC9 : "SECONDARY_RECORD_ROUTE",
0xCD : "FAST_REROUTE",
0xCF : "SESSION_ATTRIBUTE",
0xE1 : "DCLASS",
0xE2 : "PACKETCABLE EXTENSIONS",
0xE3 : "ATM_SERVICECLASS",
0xE4 : "CALL_OPS (ASON)",
0xE5 : "GENERALIZED_UNI",
0xE6 : "CALL_ID",
0xE7 : "3GPP2_Object",
0xE8 : "EXCLUDE_ROUTE"
}
class RSVP_Object(Packet):
name = "RSVP_Object"
fields_desc = [ ShortField("Length",4),
ByteEnumField("Class",0x01, rsvptypes),
ByteField("C-Type",1)]
def guess_payload_class(self, payload):
if self.Class == 0x03:
return RSVP_HOP
elif self.Class == 0x05:
return RSVP_Time
elif self.Class == 0x0c:
return RSVP_SenderTSPEC
elif self.Class == 0x13:
return RSVP_LabelReq
elif self.Class == 0xCF:
return RSVP_SessionAttrb
else:
return RSVP_Data
class RSVP_Data(Packet):
name = "Data"
fields_desc = [StrLenField("Data","",length_from= lambda pkt:pkt.underlayer.Length - 4)]
def default_payload_class(self, payload):
return RSVP_Object
class RSVP_HOP(Packet):
name = "HOP"
fields_desc = [ IPField("neighbor","0.0.0.0"),
BitField("inface",1,32)]
def default_payload_class(self, payload):
return RSVP_Object
class RSVP_Time(Packet):
name = "Time Val"
fields_desc = [ BitField("refresh",1,32)]
def default_payload_class(self, payload):
return RSVP_Object
class RSVP_SenderTSPEC(Packet):
name = "Sender_TSPEC"
fields_desc = [ ByteField("Msg_Format",0),
ByteField("reserve",0),
ShortField("Data_Length",4),
ByteField("Srv_hdr",1),
ByteField("reserve2",0),
ShortField("Srv_Length",4),
StrLenField("Tokens","",length_from= lambda pkt:pkt.underlayer.Length - 12) ]
def default_payload_class(self, payload):
return RSVP_Object
class RSVP_LabelReq(Packet):
name = "Lable Req"
fields_desc = [ ShortField("reserve",1),
ShortField("L3PID",1)]
def default_payload_class(self, payload):
return RSVP_Object
class RSVP_SessionAttrb(Packet):
name = "Session_Attribute"
fields_desc = [ ByteField("Setup_priority",1),
ByteField("Hold_priority",1),
ByteField("flags",1),
ByteField("Name_length",1),
StrLenField("Name","",length_from= lambda pkt:pkt.underlayer.Length - 8),
]
def default_payload_class(self, payload):
return RSVP_Object
bind_layers( IP, RSVP, { "proto" : 46} )
bind_layers( RSVP, RSVP_Object, {})
## RSVP layer
# This file is part of Scapy
# Scapy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# any later version.
#
# Scapy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scapy. If not, see <http://www.gnu.org/licenses/>.
# scapy.contrib.description = RSVP
# scapy.contrib.status = loads
from scapy.packet import *
from scapy.fields import *
from scapy.layers.inet import IP
rsvpmsgtypes = { 0x01 : "Path",
0x02 : "Reservation request",
0x03 : "Path error",
0x04 : "Reservation request error",
0x05 : "Path teardown",
0x06 : "Reservation teardown",
0x07 : "Reservation request acknowledgment"
}
class RSVP(Packet):
name = "RSVP"
fields_desc = [ BitField("Version",1,4),
BitField("Flags",1,4),
ByteEnumField("Class",0x01, rsvpmsgtypes),
XShortField("chksum", None),
ByteField("TTL",1),
XByteField("dataofs", 0),
ShortField("Length",None)]
def post_build(self, p, pay):
p += pay
if self.Length is None:
l = len(p)
p = p[:6]+chr((l>>8)&0xff)+chr(l&0xff)+p[8:]
if self.chksum is None:
ck = checksum(p)
p = p[:2]+chr(ck>>8)+chr(ck&0xff)+p[4:]
return p
rsvptypes = { 0x01 : "Session",
0x03 : "HOP",
0x04 : "INTEGRITY",
0x05 : "TIME_VALUES",
0x06 : "ERROR_SPEC",
0x07 : "SCOPE",
0x08 : "STYLE",
0x09 : "FLOWSPEC",
0x0A : "FILTER_SPEC",
0x0B : "SENDER_TEMPLATE",
0x0C : "SENDER_TSPEC",
0x0D : "ADSPEC",
0x0E : "POLICY_DATA",
0x0F : "RESV_CONFIRM",
0x10 : "RSVP_LABEL",
0x11 : "HOP_COUNT",
0x12 : "STRICT_SOURCE_ROUTE",
0x13 : "LABEL_REQUEST",
0x14 : "EXPLICIT_ROUTE",
0x15 : "ROUTE_RECORD",
0x16 : "HELLO",
0x17 : "MESSAGE_ID",
0x18 : "MESSAGE_ID_ACK",
0x19 : "MESSAGE_ID_LIST",
0x1E : "DIAGNOSTIC",
0x1F : "ROUTE",
0x20 : "DIAG_RESPONSE",
0x21 : "DIAG_SELECT",
0x22 : "RECOVERY_LABEL",
0x23 : "UPSTREAM_LABEL",
0x24 : "LABEL_SET",
0x25 : "PROTECTION",
0x26 : "PRIMARY PATH ROUTE",
0x2A : "DSBM IP ADDRESS",
0x2B : "SBM_PRIORITY",
0x2C : "DSBM TIMER INTERVALS",
0x2D : "SBM_INFO",
0x32 : "S2L_SUB_LSP",
0x3F : "DETOUR",
0x40 : "CHALLENGE",
0x41 : "DIFF-SERV",
0x42 : "CLASSTYPE",
0x43 : "LSP_REQUIRED_ATTRIBUTES",
0x80 : "NODE_CHAR",
0x81 : "SUGGESTED_LABEL",
0x82 : "ACCEPTABLE_LABEL_SET",
0x83 : "RESTART_CA",
0x84 : "SESSION-OF-INTEREST",
0x85 : "LINK_CAPABILITY",
0x86 : "Capability Object",
0xA1 : "RSVP_HOP_L2",
0xA2 : "LAN_NHOP_L2",
0xA3 : "LAN_NHOP_L3",
0xA4 : "LAN_LOOPBACK",
0xA5 : "TCLASS",
0xC0 : "TUNNEL",
0xC1 : "LSP_TUNNEL_INTERFACE_ID",
0xC2 : "USER_ERROR_SPEC",
0xC3 : "NOTIFY_REQUEST",
0xC4 : "ADMIN-STATUS",
0xC5 : "LSP_ATTRIBUTES",
0xC6 : "ALARM_SPEC",
0xC7 : "ASSOCIATION",
0xC8 : "SECONDARY_EXPLICIT_ROUTE",
0xC9 : "SECONDARY_RECORD_ROUTE",
0xCD : "FAST_REROUTE",
0xCF : "SESSION_ATTRIBUTE",
0xE1 : "DCLASS",
0xE2 : "PACKETCABLE EXTENSIONS",
0xE3 : "ATM_SERVICECLASS",
0xE4 : "CALL_OPS (ASON)",
0xE5 : "GENERALIZED_UNI",
0xE6 : "CALL_ID",
0xE7 : "3GPP2_Object",
0xE8 : "EXCLUDE_ROUTE"
}
class RSVP_Object(Packet):
name = "RSVP_Object"
fields_desc = [ ShortField("Length",4),
ByteEnumField("Class",0x01, rsvptypes),
ByteField("C-Type",1)]
def guess_payload_class(self, payload):
if self.Class == 0x03:
return RSVP_HOP
elif self.Class == 0x05:
return RSVP_Time
elif self.Class == 0x0c:
return RSVP_SenderTSPEC
elif self.Class == 0x13:
return RSVP_LabelReq
elif self.Class == 0xCF:
return RSVP_SessionAttrb
else:
return RSVP_Data
class RSVP_Data(Packet):
name = "Data"
fields_desc = [StrLenField("Data","",length_from= lambda pkt:pkt.underlayer.Length - 4)]
def default_payload_class(self, payload):
return RSVP_Object
class RSVP_HOP(Packet):
name = "HOP"
fields_desc = [ IPField("neighbor","0.0.0.0"),
BitField("inface",1,32)]
def default_payload_class(self, payload):
return RSVP_Object
class RSVP_Time(Packet):
name = "Time Val"
fields_desc = [ BitField("refresh",1,32)]
def default_payload_class(self, payload):
return RSVP_Object
class RSVP_SenderTSPEC(Packet):
name = "Sender_TSPEC"
fields_desc = [ ByteField("Msg_Format",0),
ByteField("reserve",0),
ShortField("Data_Length",4),
ByteField("Srv_hdr",1),
ByteField("reserve2",0),
ShortField("Srv_Length",4),
StrLenField("Tokens","",length_from= lambda pkt:pkt.underlayer.Length - 12) ]
def default_payload_class(self, payload):
return RSVP_Object
class RSVP_LabelReq(Packet):
name = "Lable Req"
fields_desc = [ ShortField("reserve",1),
ShortField("L3PID",1)]
def default_payload_class(self, payload):
return RSVP_Object
class RSVP_SessionAttrb(Packet):
name = "Session_Attribute"
fields_desc = [ ByteField("Setup_priority",1),
ByteField("Hold_priority",1),
ByteField("flags",1),
ByteField("Name_length",1),
StrLenField("Name","",length_from= lambda pkt:pkt.underlayer.Length - 8),
]
def default_payload_class(self, payload):
return RSVP_Object
bind_layers( IP, RSVP, { "proto" : 46} )
bind_layers( RSVP, RSVP_Object, {})
@echo off
title UTscapy - All tests
set MYDIR=%cd%\..
set PYTHONPATH=%MYDIR%
if [%1]==[] (
python "%MYDIR%\scapy\tools\UTscapy.py" -c configs\\windows2.utsc -T bpf.uts -o scapy_regression_test_%date:~6,4%_%date:~3,2%_%date:~0,2%.html
) else (
python "%MYDIR%\scapy\tools\UTscapy.py" %@
)
PAUSE
@echo off
title UTscapy - All tests
set MYDIR=%cd%\..
set PYTHONPATH=%MYDIR%
if [%1]==[] (
python "%MYDIR%\scapy\tools\UTscapy.py" -c configs\\windows2.utsc -T bpf.uts -o scapy_regression_test_%date:~6,4%_%date:~3,2%_%date:~0,2%.html
) else (
python "%MYDIR%\scapy\tools\UTscapy.py" %@
)
PAUSE
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment