Skip to content
Snippets Groups Projects
Commit c73cdbc5 authored by Thomas Faivre's avatar Thomas Faivre Committed by Guillaume Valadon
Browse files

layers/ipsec: fix AH dissection


The ICV size is defined by the authentication algorithm in the SA.
it can be deduced from the payloadlen field but the padding len is
unknown aswell so there is no way to find both without the SA.

Assume everything in payloadlen is the ICV until the verification called
from the SA. Fill padding when possible.

Signed-off-by: default avatarThomas Faivre <thomas.faivre@6wind.com>
parent 127f583f
No related branches found
No related tags found
No related merge requests found
......@@ -47,8 +47,8 @@ import struct
from scapy.config import conf, crypto_validator
from scapy.data import IP_PROTOS
from scapy.error import log_loading
from scapy.fields import (ByteEnumField, ByteField, StrField, XIntField,
IntField, ShortField, PacketField)
from scapy.fields import (ByteEnumField, ByteField, StrField, StrLenField,
XIntField, IntField, ShortField, PacketField)
from scapy.packet import Packet, bind_layers, Raw
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import (IPv6, IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt,
......@@ -65,14 +65,26 @@ class AH(Packet):
name = 'AH'
def __get_icv_len(self):
"""
Compute the size of the ICV based on the payloadlen field.
Padding size is included as it can only be known from the authentication
algorithm provided by the Security Association.
"""
# payloadlen = length of AH in 32-bit words (4-byte units), minus "2"
# payloadlen = 3 32-bit word fixed fields + ICV + padding - 2
# ICV = (payloadlen + 2 - 3 - padding) in 32-bit words
return (self.payloadlen - 1) * 4
fields_desc = [
ByteEnumField('nh', None, IP_PROTOS),
ByteField('payloadlen', None),
ShortField('reserved', None),
XIntField('spi', 0x0),
IntField('seq', 0),
StrField('icv', None),
StrField('padding', None),
StrLenField('icv', None, length_from=__get_icv_len),
# Padding len can only be known with the SecurityAssociation.auth_algo
StrLenField('padding', None, length_from=lambda x: 0),
]
overload_fields = {
......@@ -85,6 +97,8 @@ class AH(Packet):
bind_layers(IP, AH, proto=socket.IPPROTO_AH)
bind_layers(IPv6, AH, nh=socket.IPPROTO_AH)
bind_layers(AH, IP, nh=socket.IPPROTO_IP)
bind_layers(AH, IPv6, nh=socket.IPPROTO_IPV6)
#------------------------------------------------------------------------------
class ESP(Packet):
......@@ -527,7 +541,11 @@ class AuthAlgo(object):
clone.data = clone.data[:len(clone.data) - self.icv_size]
elif pkt.haslayer(AH):
pkt_icv = pkt[AH].icv[:self.icv_size]
if len(pkt[AH].icv) != self.icv_size:
# Fill padding since we know the actual icv_size
pkt[AH].padding = pkt[AH].icv[self.icv_size:]
pkt[AH].icv = pkt[AH].icv[:self.icv_size]
pkt_icv = pkt[AH].icv
clone = zero_mutable_fields(pkt.copy(), sending=False)
mac.update(str(clone))
......
......@@ -3090,6 +3090,72 @@ assert(e[AH].spi == sa.spi)
* simulate the alteration of the packet before verification
e[TCP].dport = 46
* integrity verification should fail
try:
d = sa.decrypt(e)
assert(False)
except IPSecIntegrityError, err:
err
#######################################
= IPv6 / AH - Transport - SHA2-256-128
p = IPv6(src='11::22', dst='22::11')
p /= TCP(sport=45012, dport=80)
p /= Raw('testdata')
p = IPv6(str(p))
p
sa = SecurityAssociation(AH, spi=0x222,
auth_algo='SHA2-256-128', auth_key='secret key')
e = sa.encrypt(p)
e
assert(isinstance(e, IPv6))
assert(e.src == '11::22' and e.dst == '22::11')
* the encrypted packet should have an AH layer
assert(e.nh == socket.IPPROTO_AH)
assert(e.haslayer(AH))
assert(e.haslayer(TCP))
assert(e[AH].spi == sa.spi)
* alter mutable fields in the packet
e.hlim = 2
* integrity verification should pass
d = sa.decrypt(e)
d
* after decryption the original packet payload should be unaltered
assert(d[TCP] == p[TCP])
#######################################
= IPv6 / AH - Transport - SHA2-256-128 - altered packet
p = IPv6(src='11::22', dst='22::11')
p /= TCP(sport=45012, dport=80)
p /= Raw('testdata')
p = IPv6(str(p))
p
sa = SecurityAssociation(AH, spi=0x222,
auth_algo='SHA2-256-128', auth_key='secret key')
e = sa.encrypt(p)
e
assert(isinstance(e, IPv6))
assert(e.src == '11::22' and e.dst == '22::11')
* the encrypted packet should have an AH layer
assert(e.nh == socket.IPPROTO_AH)
assert(e.haslayer(AH))
assert(e.haslayer(TCP))
assert(e[AH].spi == sa.spi)
* simulate the alteration of the packet before verification
e[TCP].dport = 46
* integrity verification should fail
try:
d = sa.decrypt(e)
......@@ -3158,6 +3224,74 @@ assert(e[AH].spi == sa.spi)
* simulate the alteration of the packet before verification
e.src = 'cc::ee'
* integrity verification should fail
try:
d = sa.decrypt(e)
assert(False)
except IPSecIntegrityError, err:
err
#######################################
= IPv6 / AH - Tunnel - SHA2-256-128
p = IPv6(src='11::22', dst='22::11')
p /= TCP(sport=45012, dport=80)
p /= Raw('testdata')
p = IPv6(str(p))
p
sa = SecurityAssociation(AH, spi=0x222,
auth_algo='SHA2-256-128', auth_key='secret key',
tunnel_header=IPv6(src='aa::bb', dst='bb::aa'))
e = sa.encrypt(p)
e
assert(isinstance(e, IPv6))
* after encryption packet should be encapsulated with the given ip tunnel header
assert(e.src == 'aa::bb' and e.dst == 'bb::aa')
assert(e.nh == socket.IPPROTO_AH)
assert(e.haslayer(AH))
assert(e.haslayer(TCP))
assert(e[AH].spi == sa.spi)
* alter mutable fields in the packet
e.hlim = 2
* integrity verification should pass
d = sa.decrypt(e)
d
* after decryption the original packet payload should be unaltered
assert(d == p)
#######################################
= IPv6 / AH - Tunnel - SHA2-256-128 - altered packet
p = IPv6(src='11::22', dst='22::11')
p /= TCP(sport=45012, dport=80)
p /= Raw('testdata')
p = IPv6(str(p))
p
sa = SecurityAssociation(AH, spi=0x222,
auth_algo='SHA2-256-128', auth_key='secret key',
tunnel_header=IPv6(src='aa::bb', dst='bb::aa'))
e = sa.encrypt(p)
e
assert(isinstance(e, IPv6))
* after encryption packet should be encapsulated with the given ip tunnel header
assert(e.src == 'aa::bb' and e.dst == 'bb::aa')
assert(e.nh == socket.IPPROTO_AH)
assert(e.haslayer(AH))
assert(e.haslayer(TCP))
assert(e[AH].spi == sa.spi)
* simulate the alteration of the packet before verification
e.src = 'cc::ee'
* integrity verification should fail
try:
d = sa.decrypt(e)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment