Skip to content
Snippets Groups Projects
Commit 3e99428d authored by Pierre Lalet's avatar Pierre Lalet Committed by GitHub
Browse files

Merge pull request #793 from gpotter2/patch-igmp

[coverage] Full rework of IGMP(v1/2/3) + tests
parents 0f262efd 967a1c5f
No related branches found
No related tags found
No related merge requests found
......@@ -17,172 +17,136 @@
# scapy.contrib.description = IGMP/IGMPv2
# scapy.contrib.status = loads
# TODO: scapy 2 has function getmacbyip, maybe it can replace igmpize
# at least from the MAC layer
from __future__ import print_function
from scapy.packet import *
from scapy.fields import *
from scapy.layers.inet import *
from scapy.layers.l2 import DestMACField, getmacbyip
from scapy.error import warning
#--------------------------------------------------------------------------
def isValidMCAddr(ip):
"""convert dotted quad string to long and check the first octet"""
FirstOct=atol(ip)>>24 & 0xFF
return (FirstOct >= 224) and (FirstOct <= 239)
#--------------------------------------------------------------------------
"""convert dotted quad string to long and check the first octet"""
FirstOct=atol(ip)>>24 & 0xFF
return (FirstOct >= 224) and (FirstOct <= 239)
class IGMP(Packet):
"""IGMP Message Class for v1 and v2.
"""IGMP Message Class for v1 and v2.
This class is derived from class Packet. You need to "igmpize"
the IP and Ethernet layers before a full packet is sent.
This class is derived from class Packet. You need call "igmpize()"
so the packet is transformed according the RFC when sent.
a=Ether(src="00:01:02:03:04:05")
b=IP(src="1.2.3.4")
c=IGMP(type=0x12, gaddr="224.2.3.4")
c.igmpize(b, a)
print "Joining IP " + c.gaddr + " MAC " + a.dst
x = a/b/c
x[IGMP].igmpize()
sendp(a/b/c, iface="en0")
Parameters:
type IGMP type field, 0x11, 0x12, 0x16 or 0x17
mrtime Maximum Response time (zero for v1)
mrcode Maximum Response time (zero for v1)
gaddr Multicast Group Address 224.x.x.x/4
See RFC2236, Section 2. Introduction for definitions of proper
IGMPv2 message format http://www.faqs.org/rfcs/rfc2236.html
"""
name = "IGMP"
name = "IGMP"
igmptypes = { 0x11 : "Group Membership Query",
0x12 : "Version 1 - Membership Report",
0x16 : "Version 2 - Membership Report",
0x17 : "Leave Group"}
igmptypes = { 0x11 : "Group Membership Query",
0x12 : "Version 1 - Membership Report",
0x16 : "Version 2 - Membership Report",
0x17 : "Leave Group"}
fields_desc = [ ByteEnumField("type", 0x11, igmptypes),
ByteField("mrtime",20),
fields_desc = [ ByteEnumField("type", 0x11, igmptypes),
ByteField("mrcode", 20),
XShortField("chksum", None),
IPField("gaddr", "0.0.0.0")]
#--------------------------------------------------------------------------
def post_build(self, p, pay):
"""Called implicitly before a packet is sent to compute and place IGMP checksum.
Parameters:
self The instantiation of an IGMP class
p The IGMP message in hex in network byte order
pay Additional payload for the IGMP message
"""
p += pay
if self.chksum is None:
ck = checksum(p)
p = p[:2]+chr(ck>>8)+chr(ck&0xff)+p[4:]
return p
#--------------------------------------------------------------------------
def mysummary(self):
"""Display a summary of the IGMP object."""
if isinstance(self.underlayer, IP):
return self.underlayer.sprintf("IGMP: %IP.src% > %IP.dst% %IGMP.type% %IGMP.gaddr%")
else:
return self.sprintf("IGMP %IGMP.type% %IGMP.gaddr%")
#--------------------------------------------------------------------------
def igmpize(self, ip=None, ether=None):
"""Called to explicitly fixup associated IP and Ethernet headers
Parameters:
self The instantiation of an IGMP class.
ip The instantiation of the associated IP class.
ether The instantiation of the associated Ethernet.
Returns:
True The tuple ether/ip/self passed all check and represents
a proper IGMP packet.
False One of more validation checks failed and no fields
were adjusted.
The function will examine the IGMP message to assure proper format.
Corrections will be attempted if possible. The IP header is then properly
adjusted to ensure correct formatting and assignment. The Ethernet header
is then adjusted to the proper IGMP packet format.
"""
# The rules are:
# 1. the Max Response time is meaningful only in Membership Queries and should be zero
# otherwise (RFC 2236, section 2.2)
if (self.type != 0x11): #rule 1
self.mrtime = 0
if (self.adjust_ip(ip) == True):
if (self.adjust_ether(ip, ether) == True): return True
return False
#--------------------------------------------------------------------------
def adjust_ether (self, ip=None, ether=None):
"""Called to explicitly fixup an associated Ethernet header
The function adjusts the ethernet header destination MAC address based on
the destination IP address.
"""
# The rules are:
# 1. send to the group mac address address corresponding to the IP.dst
if ip != None and ip.haslayer(IP) and ether != None and ether.haslayer(Ether):
iplong = atol(ip.dst)
ether.dst = "01:00:5e:%02x:%02x:%02x" % ( (iplong>>16)&0x7F, (iplong>>8)&0xFF, (iplong)&0xFF )
# print "igmpize ip " + ip.dst + " as mac " + ether.dst
return True
else:
return False
#--------------------------------------------------------------------------
def adjust_ip (self, ip=None):
"""Called to explicitly fixup an associated IP header
The function adjusts the IP header based on conformance rules
and the group address encoded in the IGMP message.
The rules are:
1. Send General Group Query to 224.0.0.1 (all systems)
2. Send Leave Group to 224.0.0.2 (all routers)
3a.Otherwise send the packet to the group address
3b.Send reports/joins to the group address
4. ttl = 1 (RFC 2236, section 2)
5. send the packet with the router alert IP option (RFC 2236, section 2)
"""
if ip != None and ip.haslayer(IP):
if (self.type == 0x11):
if (self.gaddr == "0.0.0.0"):
ip.dst = "224.0.0.1" # IP rule 1
retCode = True
elif isValidMCAddr(self.gaddr):
ip.dst = self.gaddr # IP rule 3a
retCode = True
IPField("gaddr", "0.0.0.0")]
def post_build(self, p, pay):
"""Called implicitly before a packet is sent to compute and place IGMP checksum.
Parameters:
self The instantiation of an IGMP class
p The IGMP message in hex in network byte order
pay Additional payload for the IGMP message
"""
p += pay
if self.chksum is None:
ck = checksum(p)
p = p[:2]+chr(ck>>8)+chr(ck&0xff)+p[4:]
return p
@classmethod
def dispatch_hook(cls, _pkt=None, *args, **kargs):
if _pkt and len(_pkt) >= 4:
if ord(_pkt[0]) in [0x22, 0x30, 0x31, 0x32]:
return IGMPv3
if ord(_pkt[0]) == 0x11 and len(_pkt) >= 12:
return IGMPv3
return IGMP
def igmpize(self):
"""Called to explicitly fixup the packet according to the IGMP RFC
The rules are:
General:
1. the Max Response time is meaningful only in Membership Queries and should be zero
IP:
1. Send General Group Query to 224.0.0.1 (all systems)
2. Send Leave Group to 224.0.0.2 (all routers)
3a.Otherwise send the packet to the group address
3b.Send reports/joins to the group address
4. ttl = 1 (RFC 2236, section 2)
5. send the packet with the router alert IP option (RFC 2236, section 2)
Ether:
1. Recalculate destination
Returns:
True The tuple ether/ip/self passed all check and represents
a proper IGMP packet.
False One of more validation checks failed and no fields
were adjusted.
The function will examine the IGMP message to assure proper format.
Corrections will be attempted if possible. The IP header is then properly
adjusted to ensure correct formatting and assignment. The Ethernet header
is then adjusted to the proper IGMP packet format.
"""
gaddr = self.gaddr if self.gaddr else "0.0.0.0"
underlayer = self.underlayer
if not self.type in [0x11, 0x30]: # General Rule 1
self.mrcode = 0
if isinstance(underlayer, IP):
if (self.type == 0x11):
if (gaddr == "0.0.0.0"):
underlayer.dst = "224.0.0.1" # IP rule 1
elif isValidMCAddr(gaddr):
underlayer.dst = gaddr # IP rule 3a
else:
warning("Invalid IGMP Group Address detected !")
return False
elif ((self.type == 0x17) and isValidMCAddr(gaddr)):
underlayer.dst = "224.0.0.2" # IP rule 2
elif ((self.type == 0x12) or (self.type == 0x16)) and (isValidMCAddr(gaddr)):
underlayer.dst = gaddr # IP rule 3b
else:
warning("Invalid IGMP Type detected !")
return False
if not any(isinstance(x, IPOption_Router_Alert) for x in underlayer.options):
underlayer.options.append(IPOption_Router_Alert())
_root = self.firstlayer()
if _root.haslayer(Ether):
# Force recalculate Ether dst
_root[Ether].dst = getmacbyip(underlayer.dst) # Ether rule 1
return True
def mysummary(self):
"""Display a summary of the IGMP object."""
if isinstance(self.underlayer, IP):
return self.underlayer.sprintf("IGMP: %IP.src% > %IP.dst% %IGMP.type% %IGMP.gaddr%")
else:
print("Warning: Using invalid Group Address")
retCode = False
elif ((self.type == 0x17) and isValidMCAddr(self.gaddr)):
ip.dst = "224.0.0.2" # IP rule 2
retCode = True
elif ((self.type == 0x12) or (self.type == 0x16)) and (isValidMCAddr(self.gaddr)):
ip.dst = self.gaddr # IP rule 3b
retCode = True
else:
print("Warning: Using invalid IGMP Type")
retCode = False
else:
print("Warning: No IGMP Group Address set")
retCode = False
if retCode == True:
ip.ttl=1 # IP Rule 4
ip.options=[IPOption_Router_Alert()] # IP rule 5
return retCode
bind_layers( IP, IGMP, frag=0, proto=2)
return self.sprintf("IGMP %IGMP.type% %IGMP.gaddr%")
bind_layers( IP, IGMP, frag=0,
proto=2,
ttl=1)
############
% IGMP tests
############
+ Basic IGMP tests
= Build IGMP - Basic
a=Ether(src="00:01:02:03:04:05")
b=IP(src="1.2.3.4")
c=IGMP(gaddr="0.0.0.0")
x = a/b/c
x[IGMP].igmpize()
assert x.mrcode == 20
assert x[IP].dst == "224.0.0.1"
= Build IGMP - Custom membership
a=Ether(src="00:01:02:03:04:05")
b=IP(src="1.2.3.4")
c=IGMP(gaddr="224.0.1.2")
x = a/b/c
x[IGMP].igmpize()
assert x.mrcode == 20
assert x[IP].dst == "224.0.1.2"
= Build IGMP - LG
a=Ether(src="00:01:02:03:04:05")
b=IP(src="1.2.3.4")
c=IGMP(type=0x17, gaddr="224.2.3.4")
x = a/b/c
x[IGMP].igmpize()
assert x.dst == "01:00:5e:00:00:02"
assert x.mrcode == 0
assert x[IP].dst == "224.0.0.2"
= Change IGMP params
x = Ether(src="00:01:02:03:04:05")/IP()/IGMP()
x[IGMP].igmpize()
assert x.mrcode == 20
assert x[IP].dst == "224.0.0.1"
x = Ether(src="00:01:02:03:04:05")/IP()/IGMP(gaddr="224.2.3.4", type=0x12)
x.mrcode = 1
x[IGMP].igmpize()
x = Ether(str(x))
assert x.mrcode == 0
x.gaddr = "224.3.2.4"
x[IGMP].igmpize()
assert x.dst == "01:00:5e:03:02:04"
= Test mysummary
x = Ether(src="00:01:02:03:04:05")/IP(src="192.168.0.1")/IGMP(gaddr="224.0.0.2", type=0x17)
x[IGMP].igmpize()
assert x[IGMP].mysummary() == "IGMP: 192.168.0.1 > 224.0.0.2 Leave Group 224.0.0.2"
assert IGMP().mysummary() == "IGMP Group Membership Query 0.0.0.0"
= IGMP - misc
~ netaccess
x = Ether(src="00:01:02:03:04:05")/IP(dst="192.168.0.1")/IGMP(gaddr="www.google.fr", type=0x11)
x = Ether(str(x))
assert not x[IGMP].igmpize()
assert x[IP].dst == "192.168.0.1"
x = Ether(src="00:01:02:03:04:05")/IP(dst="192.168.0.1")/IGMP(gaddr="124.0.2.1", type=0x00)
assert not x[IGMP].igmpize()
assert x[IP].dst == "192.168.0.1"
\ No newline at end of file
......@@ -21,7 +21,7 @@ from __future__ import print_function
from scapy.packet import *
from scapy.fields import *
from scapy.layers.inet import *
from scapy.contrib.igmp import isValidMCAddr
from scapy.contrib.igmp import IGMP
""" Based on the following references
http://www.iana.org/assignments/igmp-type-numbers
......@@ -29,10 +29,6 @@ from scapy.contrib.igmp import isValidMCAddr
"""
# TODO: Merge IGMPv3 packet Bindlayers correct for
# membership source/Group records
# ConditionalField parameters for IGMPv3 commented out
#
# See RFC3376, Section 4. Message Formats for definitions of proper IGMPv3 message format
# http://www.faqs.org/rfcs/rfc3376.html
#
......@@ -40,245 +36,133 @@ from scapy.contrib.igmp import isValidMCAddr
# http://www.faqs.org/rfcs/rfc4286.html
#
#import sys, socket, struct, time
print("IGMPv3 is still under development - Nov 2010")
class IGMPv3(IGMP):
"""IGMP Message Class for v3.
This class is derived from class Packet.
The fields defined below are a
direct interpretation of the v3 Membership Query Message.
Fields 'type' through 'qqic' are directly assignable.
For 'numsrc', do not assign a value.
Instead add to the 'srcaddrs' list to auto-set 'numsrc'. To
assign values to 'srcaddrs', use the following methods:
c = IGMPv3()
c.srcaddrs = ['1.2.3.4', '5.6.7.8']
c.srcaddrs += ['192.168.10.24']
At this point, 'c.numsrc' is three (3)
class IGMPv3gr(Packet):
"""IGMP Group Record for IGMPv3 Membership Report
'chksum' is automagically calculated before the packet is sent.
This class is derived from class Packet and should be concatenated to an
instantiation of class IGMPv3. Within the IGMPv3 instantiation, the numgrp
element will need to be manipulated to indicate the proper number of
group records.
"""
name = "IGMPv3gr"
igmpv3grtypes = { 1 : "Mode Is Include",
2 : "Mode Is Exclude",
3 : "Change To Include Mode",
4 : "Change To Exclude Mode",
5 : "Allow New Sources",
6 : "Block Old Sources"}
'mrcode' is also the Advertisement Interval field
fields_desc = [ ByteEnumField("rtype", 1, igmpv3grtypes),
ByteField("auxdlen",0),
FieldLenField("numsrc", None, count_of="srcaddrs"),
IPField("maddr", "0.0.0.0"),
FieldListField("srcaddrs", [], IPField("sa", "0.0.0.0"), "numsrc") ]
#show_indent=0
#--------------------------------------------------------------------------
def post_build(self, p, pay):
"""Called implicitly before a packet is sent.
"""
p += pay
if self.auxdlen != 0:
print("NOTICE: A properly formatted and complaint V3 Group Record should have an Auxiliary Data length of zero (0).")
print(" Subsequent Group Records are lost!")
return p
#--------------------------------------------------------------------------
def mysummary(self):
"""Display a summary of the IGMPv3 group record."""
return self.sprintf("IGMPv3 Group Record %IGMPv3gr.type% %IGMPv3gr.maddr%")
class IGMPv3(Packet):
"""IGMP Message Class for v3.
This class is derived from class Packet.
The fields defined below are a
direct interpretation of the v3 Membership Query Message.
Fields 'type' through 'qqic' are directly assignable.
For 'numsrc', do not assign a value.
Instead add to the 'srcaddrs' list to auto-set 'numsrc'. To
assign values to 'srcaddrs', use the following methods:
c = IGMPv3()
c.srcaddrs = ['1.2.3.4', '5.6.7.8']
c.srcaddrs += ['192.168.10.24']
At this point, 'c.numsrc' is three (3)
'chksum' is automagically calculated before the packet is sent.
'mrcode' is also the Advertisement Interval field
"""
name = "IGMPv3"
igmpv3types = { 0x11 : "Membership Query",
0x22 : "Version 3 Membership Report",
0x30 : "Multicast Router Advertisement",
0x31 : "Multicast Router Solicitation",
0x32 : "Multicast Router Termination"}
fields_desc = [ ByteEnumField("type", 0x11, igmpv3types),
ByteField("mrcode",0),
XShortField("chksum", None),
IPField("gaddr", "0.0.0.0")
]
# use float_encode()
# if type = 0x11 (Membership Query), the next field is group address
# ConditionalField(IPField("gaddr", "0.0.0.0"), "type", lambda x:x==0x11),
# else if type = 0x22 (Membership Report), the next fields are
# reserved and number of group records
#ConditionalField(ShortField("rsvd2", 0), "type", lambda x:x==0x22),
#ConditionalField(ShortField("numgrp", 0), "type", lambda x:x==0x22),
# FieldLenField("numgrp", None, "grprecs")]
# else if type = 0x30 (Multicast Router Advertisement), the next fields are
# query interval and robustness
#ConditionalField(ShortField("qryIntvl", 0), "type", lambda x:x==0x30),
#ConditionalField(ShortField("robust", 0), "type", lambda x:x==0x30),
# The following are only present for membership queries
# ConditionalField(BitField("resv", 0, 4), "type", lambda x:x==0x11),
# ConditionalField(BitField("s", 0, 1), "type", lambda x:x==0x11),
# ConditionalField(BitField("qrv", 0, 3), "type", lambda x:x==0x11),
# ConditionalField(ByteField("qqic",0), "type", lambda x:x==0x11),
# ConditionalField(FieldLenField("numsrc", None, "srcaddrs"), "type", lambda x:x==0x11),
# ConditionalField(FieldListField("srcaddrs", None, IPField("sa", "0.0.0.0"), "numsrc"), "type", lambda x:x==0x11),
#--------------------------------------------------------------------------
def float_encode(self, value):
"""Convert the integer value to its IGMPv3 encoded time value if needed.
If value < 128, return the value specified. If >= 128, encode as a floating
point value. Value can be 0 - 31744.
"""
if value < 128:
code = value
elif value > 31743:
code = 255
else:
exp=0
value>>=3
while(value>31):
exp+=1
value>>=1
exp<<=4
code = 0x80 | exp | (value & 0x0F)
return code
#--------------------------------------------------------------------------
def post_build(self, p, pay):
"""Called implicitly before a packet is sent to compute and place IGMPv3 checksum.
Parameters:
self The instantiation of an IGMPv3 class
p The IGMPv3 message in hex in network byte order
pay Additional payload for the IGMPv3 message
"""
p += pay
if self.type in [0, 0x31, 0x32, 0x22]: # for these, field is reserved (0)
p = p[:1]+chr(0)+p[2:]
if self.chksum is None:
ck = checksum(p)
p = p[:2]+chr(ck>>8)+chr(ck&0xff)+p[4:]
return p
#--------------------------------------------------------------------------
def mysummary(self):
"""Display a summary of the IGMPv3 object."""
if isinstance(self.underlayer, IP):
return self.underlayer.sprintf("IGMPv3: %IP.src% > %IP.dst% %IGMPv3.type% %IGMPv3.gaddr%")
else:
return self.sprintf("IGMPv3 %IGMPv3.type% %IGMPv3.gaddr%")
#--------------------------------------------------------------------------
def igmpize(self, ip=None, ether=None):
"""Called to explicitly fixup associated IP and Ethernet headers
Parameters:
self The instantiation of an IGMP class.
ip The instantiation of the associated IP class.
ether The instantiation of the associated Ethernet.
Returns:
True The tuple ether/ip/self passed all check and represents
a proper IGMP packet.
False One of more validation checks failed and no fields
were adjusted.
The function will examine the IGMP message to assure proper format.
Corrections will be attempted if possible. The IP header is then properly
adjusted to ensure correct formatting and assignment. The Ethernet header
is then adjusted to the proper IGMP packet format.
"""
# The rules are:
# 1. ttl = 1 (RFC 2236, section 2)
# igmp_binds = [ (IP, IGMP, { "proto": 2 , "ttl": 1 }),
# 2. tos = 0xC0 (RFC 3376, section 4)
# (IP, IGMPv3, { "proto": 2 , "ttl": 1, "tos":0xc0 }),
# (IGMPv3, IGMPv3gr, { }) ]
# The rules are:
# 1. the Max Response time is meaningful only in Membership Queries and should be zero
# otherwise (RFC 2236, section 2.2)
if (self.type != 0x11): #rule 1
self.mrtime = 0
if (self.adjust_ip(ip) == True):
if (self.adjust_ether(ip, ether) == True): return True
return False
#--------------------------------------------------------------------------
def adjust_ether (self, ip=None, ether=None):
"""Called to explicitly fixup an associated Ethernet header
The function adjusts the ethernet header destination MAC address based on
the destination IP address.
"""
# The rules are:
# 1. send to the group mac address address corresponding to the IP.dst
if ip != None and ip.haslayer(IP) and ether != None and ether.haslayer(Ether):
iplong = atol(ip.dst)
ether.dst = "01:00:5e:%02x:%02x:%02x" % ( (iplong>>16)&0x7F, (iplong>>8)&0xFF, (iplong)&0xFF )
# print "igmpize ip " + ip.dst + " as mac " + ether.dst
return True
else:
return False
#--------------------------------------------------------------------------
def adjust_ip (self, ip=None):
"""Called to explicitly fixup an associated IP header
The function adjusts the IP header based on conformance rules
and the group address encoded in the IGMP message.
The rules are:
1. Send General Group Query to 224.0.0.1 (all systems)
2. Send Leave Group to 224.0.0.2 (all routers)
3a.Otherwise send the packet to the group address
3b.Send reports/joins to the group address
4. ttl = 1 (RFC 2236, section 2)
5. send the packet with the router alert IP option (RFC 2236, section 2)
"""
if ip != None and ip.haslayer(IP):
if (self.type == 0x11):
if (self.gaddr == "0.0.0.0"):
ip.dst = "224.0.0.1" # IP rule 1
retCode = True
elif isValidMCAddr(self.gaddr):
ip.dst = self.gaddr # IP rule 3a
retCode = True
name = "IGMPv3"
igmpv3types = { 0x11 : "Membership Query",
0x22 : "Version 3 Membership Report",
0x30 : "Multicast Router Advertisement",
0x31 : "Multicast Router Solicitation",
0x32 : "Multicast Router Termination"}
fields_desc = [ ByteEnumField("type", 0x11, igmpv3types),
ByteField("mrcode", 20),
XShortField("chksum", None)]
def float_encode(self, value):
"""Convert the integer value to its IGMPv3 encoded time value if needed.
If value < 128, return the value specified. If >= 128, encode as a floating
point value. Value can be 0 - 31744.
"""
if value < 128:
code = value
elif value > 31743:
code = 255
else:
exp=0
value>>=3
while(value>31):
exp+=1
value>>=1
exp<<=4
code = 0x80 | exp | (value & 0x0F)
return code
def mysummary(self):
"""Display a summary of the IGMPv3 object."""
if isinstance(self.underlayer, IP):
return self.underlayer.sprintf("IGMPv3: %IP.src% > %IP.dst% %IGMPv3.type%")
else:
print("Warning: Using invalid Group Address")
retCode = False
elif ((self.type == 0x17) and isValidMCAddr(self.gaddr)):
ip.dst = "224.0.0.2" # IP rule 2
retCode = True
elif ((self.type == 0x12) or (self.type == 0x16)) and (isValidMCAddr(self.gaddr)):
ip.dst = self.gaddr # IP rule 3b
retCode = True
else:
print("Warning: Using invalid IGMP Type")
retCode = False
else:
print("Warning: No IGMP Group Address set")
retCode = False
if retCode == True:
ip.ttl=1 # IP Rule 4
ip.options=[IPOption_Router_Alert()] # IP rule 5
return retCode
return self.sprintf("IGMPv3 %IGMPv3.type%")
@classmethod
def dispatch_hook(cls, _pkt=None, *args, **kargs):
if _pkt and len(_pkt) >= 4:
if ord(_pkt[0]) in [0x12, 0x16, 0x17]:
return IGMP
elif ord(_pkt[0]) == 0x11 and len(_pkt) < 12:
return IGMP
return IGMPv3
class IGMPv3mq(Packet):
"""IGMPv3 Membership Query.
Payload of IGMPv3 when type=0x11"""
name = "IGMPv3gr"
fields_desc = [ IPField("gaddr", "0.0.0.0"),
BitField("resv", 0, 4),
BitField("s", 0, 1),
BitField("qrv", 0, 3),
ByteField("qqic",0),
FieldLenField("numsrc", None, count_of="srcaddrs"),
FieldListField("srcaddrs", None, IPField("sa", "0.0.0.0"), count_from=lambda x: x.numsrc)]
class IGMPv3gr(Packet):
"""IGMP Group Record for IGMPv3 Membership Report
bind_layers(IP, IGMPv3, frag=0, proto=2, ttl=1, tos=0xc0)
bind_layers(IGMPv3, IGMPv3gr)
bind_layers(IGMPv3gr, IGMPv3gr)
This class is derived from class Packet and should be added in the records
of an instantiation of class IGMPv3mr.
"""
name = "IGMPv3gr"
igmpv3grtypes = { 1 : "Mode Is Include",
2 : "Mode Is Exclude",
3 : "Change To Include Mode",
4 : "Change To Exclude Mode",
5 : "Allow New Sources",
6 : "Block Old Sources"}
fields_desc = [ ByteEnumField("rtype", 1, igmpv3grtypes),
ByteField("auxdlen",0),
FieldLenField("numsrc", None, count_of="srcaddrs"),
IPField("maddr", "0.0.0.0"),
FieldListField("srcaddrs", [], IPField("sa", "0.0.0.0"), count_from=lambda x: x.numsrc) ]
def mysummary(self):
"""Display a summary of the IGMPv3 group record."""
return self.sprintf("IGMPv3 Group Record %IGMPv3gr.type% %IGMPv3gr.maddr%")
def default_payload_class(self, payload):
return conf.padding_layer
class IGMPv3mr(Packet):
"""IGMP Membership Report extension for IGMPv3.
Payload of IGMPv3 when type=0x22"""
name = "IGMPv3mr"
fields_desc = [ ByteField("res2", 0),
FieldLenField("numgrp", None, count_of="records"),
PacketListField("records", [], IGMPv3gr, count_from=lambda x: x.numgrp)]
class IGMPv3mra(Packet):
"""IGMP Multicas Router Advertisement extension for IGMPv3.
Payload of IGMPv3 when type=0x30"""
name = "IGMPv3mra"
fields_desc = [ ShortField("qryIntvl", 0),
ShortField("robust", 0)]
bind_layers(IP, IGMPv3, frag=0,
proto=2,
ttl=1,
tos=0xc0)
bind_layers(IGMPv3, IGMPv3mq, type=0x11)
bind_layers(IGMPv3, IGMPv3mr, type=0x22)
bind_layers(IGMPv3, IGMPv3mra, type=0x30)
##############
% IGMPv3 tests
##############
+ Basic IGMPv3 tests
= Build IGMPv3 - Basic
a=Ether(src="00:01:02:03:04:05")
b=IP(src="1.2.3.4")
c=IGMPv3()/IGMPv3mq()
x = a/b/c
x[IGMPv3].igmpize()
assert x.mrcode == 20
assert x[IP].dst == "224.0.0.1"
assert isinstance(IGMP(str(x[IGMPv3])), IGMPv3)
= Dissect IGMPv3 - IGMPv3mq
x = Ether(b'\x14\x0cv\x8f\xfe(\x00\x01\x02\x03\x04\x05\x08\x00F\xc0\x00$\x00\x01\x00\x00\x01\x02\xe4h\xc0\xa8\x00\x01\x7f\x00\x00\x01\x94\x04\x00\x00\x11\x14\x0e\xe9\xe0\x00\x00\x02\x00\x00\x00\x00')
assert IGMPv3 in x
assert IGMPv3mq in x
assert x[IGMPv3mq].gaddr == "224.0.0.2"
assert x.summary() == "Ether / IP / IGMPv3: 192.168.0.1 > 127.0.0.1 Membership Query / IGMPv3mq"
assert isinstance(IGMP(str(x[IGMPv3])), IGMPv3)
= Dissect IGMPv3 - IGMPv3mr
x = Ether(b'\x14\x0cv\x8f\xfe(\x00\x01\x02\x03\x04\x05\x08\x00F\xc0\x00;\x00\x01\x00\x00\x01\x02\xe4Q\xc0\xa8\x00\x01\x7f\x00\x00\x01\x94\x04\x00\x00"\x14w\xcc\x00\x00\x02\x01\x00\x00\x02{{{{\xc0\xa8\x00\x01\xc0\xa8\x84\xf7\x01\x00\x00\x01\x04\x04\x04\x04\x0c\x0c\x0c\x0c')
assert IGMPv3 in x
assert IGMPv3mr in x
assert len(x[IGMPv3mr].records) == 2
assert x[IGMPv3mr].records[0].srcaddrs == ["192.168.0.1", "192.168.132.247"]
assert x[IGMPv3mr].records[1].maddr == "4.4.4.4"
assert isinstance(IGMP(str(x[IGMPv3])), IGMPv3)
= Dissect IGMPv3 - IGMPv3mra
x = Ether(b'\x14\x0cv\x8f\xfe(\x00\x01\x02\x03\x04\x05\x08\x00F\xc0\x00 \x00\x01\x00\x00\x01\x02\xe4l\xc0\xa8\x00\x01\x7f\x00\x00\x01\x94\x04\x00\x000\x14\xcf\xe6\x00\x03\x00\x02')
assert IGMPv3 in x
assert IGMPv3mra in x
assert x[IGMPv3mra].qryIntvl == 3
assert x[IGMPv3mra].robust == 2
assert isinstance(IGMP(str(x[IGMPv3])), IGMPv3)
= IGMP vs IVMPv3 tests
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment