diff --git a/.travis.yml b/.travis.yml index bffddf4dd2e27f5ef9a0d0d8c9f362bfad182aec..b94ac0364757ed0ede87f9be732074f97ea9cc33 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,7 +28,7 @@ matrix: sudo: required python: 2.7 env: - - SCAPY_SUDO=sudo TEST_COMBINED_MODES=yes SCAPY_COVERAGE=yes + - SCAPY_SUDO=sudo SCAPY_COVERAGE=yes - os: linux sudo: required diff --git a/.travis/install.sh b/.travis/install.sh index 55a820288f54d49abae5ef0d9e05c7a6ef403146..20bc9b411aa73dfc9c37547d336d8d5865c37edb 100644 --- a/.travis/install.sh +++ b/.travis/install.sh @@ -7,17 +7,12 @@ then PIP_INSTALL_FLAGS="--user" fi fi -$SCAPY_SUDO pip install $PIP_INSTALL_FLAGS ecdsa mock -# Pycrypto 2.7a1 isn't available on PyPi -if [ "$TEST_COMBINED_MODES" = "yes" ] -then - curl -sL https://github.com/dlitz/pycrypto/archive/v2.7a1.tar.gz | tar xz - cd pycrypto-2.7a1 - python setup.py build - $SCAPY_SUDO python setup.py install +if python --version 2>&1 | grep -q PyPy; then + # cryptography requires PyPy >= 2.6, Travis CI uses 2.5.0 + $SCAPY_SUDO pip install $PIP_INSTALL_FLAGS ecdsa mock else - $SCAPY_SUDO pip install $PIP_INSTALL_FLAGS pycrypto + $SCAPY_SUDO pip install $PIP_INSTALL_FLAGS cryptography ecdsa mock fi # Install coverage diff --git a/.travis/test.sh b/.travis/test.sh index f9cf530a177fb098a1fb052a2501d62bc14b11bb..c8064ca8f484a40162e87acfaa19c97fb0aaeb53 100644 --- a/.travis/test.sh +++ b/.travis/test.sh @@ -12,10 +12,16 @@ then SCAPY_SUDO="" fi -# Test AEAD modes in IPsec if available -if [ "$TEST_COMBINED_MODES" != "yes" ] +# AES-CCM not implemented yet in Cryptography +# See +# - https://github.com/pyca/cryptography/issues/2968 +# - https://github.com/pyca/cryptography/issues/1141 +UT_FLAGS+=" -K combined_modes_ccm" + +if python --version 2>&1 | grep -q PyPy then - UT_FLAGS+=" -K combined_modes" + # cryptography requires PyPy >= 2.6, Travis CI uses 2.5.0 + UT_FLAGS+=" -K crypto " fi # Set PATH diff --git a/appveyor.yml b/appveyor.yml index 781dc6cbba6ddebedc342df4f6b14475ec7c3509..d634de0a519f972df67cb014dc06197706456a3e 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -19,7 +19,7 @@ install: - refreshenv # Install Python modules - - "%PYTHON%\\python -m pip install ecdsa pycrypto coverage" + - "%PYTHON%\\python -m pip install ecdsa cryptography coverage" test_script: # Set environment variables diff --git a/doc/scapy/installation.rst b/doc/scapy/installation.rst index 4ef6a71b6ddaa471434dc5999072b24ad9c377d9..d033051eef4af4745e09193ba46b7f9cfa8776be 100644 --- a/doc/scapy/installation.rst +++ b/doc/scapy/installation.rst @@ -162,7 +162,7 @@ Here are the topics involved and some examples that you can use to try if your i .. index:: single: WEP, unwep() -* WEP decryption. ``unwep()`` needs `PyCrypto <http://www.dlitz.net/software/pycrypto/>`_. Example using a `Weplap test file <http://weplab.sourceforge.net/caps/weplab-64bit-AA-managed.pcap>`_: +* WEP decryption. ``unwep()`` needs `cryptography <https://cryptography.io>`_. Example using a `Weplap test file <http://weplab.sourceforge.net/caps/weplab-64bit-AA-managed.pcap>`_: .. code-block:: python @@ -408,7 +408,7 @@ Graphs (conversations) WEP decryption - * `PyCrypto <http://www.dlitz.net/software/pycrypto/>`_: `pycrypto-2.1.0.win32-py2.5.zip <http://www.voidspace.org.uk/downloads/pycrypto-2.1.0.win32-py2.5.zip>`_ `pycrypto-2.1.0.win32-py2.6.zip <http://www.voidspace.org.uk/downloads/pycrypto-2.1.0.win32-py2.6.zip>`_ + * `cryptography <https://cryptography.io>`_: `HowTo <https://cryptography.io/en/latest/installation/#on-windows>`_ Fingerprinting diff --git a/scapy/layers/dot11.py b/scapy/layers/dot11.py index fccaae65c6bdf6b764eae97d0a58f462e7edd81a..1ddccfca661557f2dd0e06a127088eda157d2329 100644 --- a/scapy/layers/dot11.py +++ b/scapy/layers/dot11.py @@ -21,9 +21,13 @@ from scapy.layers.inet import IP, TCP try: - from Crypto.Cipher import ARC4 + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.ciphers import ( + Cipher, + algorithms, + ) except ImportError: - log_loading.info("Can't import python Crypto lib. Won't be able to decrypt WEP.") + log_loading.info("Can't import python cryptography lib. Won't be able to decrypt WEP.") ### Fields @@ -339,8 +343,12 @@ class Dot11WEP(Packet): icv = "" else: icv = p[4:8] - c = ARC4.new(self.iv+key) - p = p[:4]+c.encrypt(pay)+icv + e = Cipher( + algorithms.ARC4(self.iv+key), + None, + default_backend(), + ).encryptor() + p = p[:4]+e.update(pay)+e.finalize()+icv else: warning("No WEP key set (conf.wepkey).. strange results expected..") return p @@ -350,9 +358,13 @@ class Dot11WEP(Packet): if key is None: key = conf.wepkey if key: - c = ARC4.new(self.iv+key) - self.add_payload(LLC(c.decrypt(self.wepdata))) - + d = Cipher( + algorithms.ARC4(self.iv+key), + None, + default_backend(), + ).decryptor() + self.add_payload(LLC(d.update(self.wepdata)+d.finalize())) + bind_layers( PrismHeader, Dot11, ) bind_layers( RadioTap, Dot11, ) diff --git a/scapy/layers/ipsec.py b/scapy/layers/ipsec.py index c6670d38827134168b65d313a61838f6d8a624e1..dd9f5a9b7f76bbde36fe2e008ec0111ec7cfdef9 100644 --- a/scapy/layers/ipsec.py +++ b/scapy/layers/ipsec.py @@ -39,17 +39,15 @@ Example of use: True """ +from fractions import gcd +import os import socket import struct -from scapy.error import warning - -try: - from Crypto.Util.number import GCD as gcd -except ImportError: - from fractions import gcd +from scapy.error import warning from scapy.data import IP_PROTOS +from scapy.error import log_loading from scapy.fields import ByteEnumField, ByteField, StrField, XIntField, IntField, \ ShortField, PacketField @@ -145,22 +143,19 @@ class _ESPPlain(Packet): #------------------------------------------------------------------------------ try: - from Crypto.Cipher import AES - from Crypto.Cipher import DES - from Crypto.Cipher import DES3 - from Crypto.Cipher import CAST - from Crypto.Cipher import Blowfish - from Crypto.Util import Counter - from Crypto import Random + from cryptography.exceptions import InvalidTag + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.ciphers import ( + Cipher, + algorithms, + modes, + ) except ImportError: - # no error if pycrypto is not available but encryption won't be supported - warning("IPsec encryption not supported (pycrypto required).") - AES = None - DES = None - DES3 = None - CAST = None - Blowfish = None - Random = None + log_loading.info("Can't import python cryptography lib. " + "Disabled IPsec encryption/authentication.") + algorithms = None + Cipher = None + modes = None try: from Crypto.Cipher.AES import MODE_GCM @@ -183,7 +178,8 @@ class CryptAlgo(object): IPsec encryption algorithm """ - def __init__(self, name, cipher, mode, block_size=None, iv_size=None, key_size=None, icv_size=None): + def __init__(self, name, cipher, mode, block_size=None, iv_size=None, + key_size=None, icv_size=None, salt_size=None): """ @param name: the name of this encryption algorithm @param cipher: a Cipher module @@ -195,24 +191,25 @@ class CryptAlgo(object): @param key_size: an integer or list/tuple of integers. If specified, force the secret keys length to one of the values. Defaults to the `key_size` of the cipher. - @param icv_size: the length of the integrity check value of this algo. - Only used in this class for AEAD algorithms. + @param icv_size: the length of the Integrity Check Value of this algo. + Used by Combined Mode Algorithms e.g. GCM + @param salt_size: the length of the salt to use as the IV prefix. + Usually used by Counter modes e.g. CTR """ self.name = name self.cipher = cipher self.mode = mode - self.is_aead = (hasattr(self.cipher, 'MODE_GCM') and - self.mode == self.cipher.MODE_GCM) or \ - (hasattr(self.cipher, 'MODE_CCM') and - self.mode == self.cipher.MODE_CCM) + self.icv_size = icv_size - if icv_size is not None: - self.icv_size = icv_size + if self.mode is not None: + self.is_aead = issubclass(self.mode, modes.ModeWithAuthenticationTag) + else: + self.is_aead = False if block_size is not None: self.block_size = block_size elif cipher is not None: - self.block_size = cipher.block_size + self.block_size = cipher.block_size // 8 else: self.block_size = 1 @@ -224,10 +221,15 @@ class CryptAlgo(object): if key_size is not None: self.key_size = key_size elif cipher is not None: - self.key_size = cipher.key_size + self.key_size = tuple(i // 8 for i in cipher.key_sizes) else: self.key_size = None + if salt_size is None: + self.salt_size = 0 + else: + self.salt_size = salt_size + def check_key(self, key): """ Check that the key length is valid. @@ -240,58 +242,48 @@ class CryptAlgo(object): def generate_iv(self): """ - Generate a random initialization vector. If pycrypto is not available, - return a buffer of the correct length filled with only '\x00'. + Generate a random initialization vector. """ - if Random: - return Random.get_random_bytes(self.iv_size) - else: - return chr(0) * self.iv_size + # XXX: Handle counter modes with real counters? RFCs allow the use of + # XXX: random bytes for counters, so it is not wrong to do it that way + return os.urandom(self.iv_size - self.salt_size) - def new_cipher(self, key, iv): + def new_cipher(self, key, iv, digest=None): """ @param key: the secret key, a byte string - @param iv: the initialization vector, a byte string + @param iv: the initialization vector, a byte string. Used as the + initial nonce in counter mode + @param digest: also known as tag or icv. A byte string containing the + digest of the encrypted data. Only use this during + decryption! + @return: an initialized cipher object for this algo """ - if (hasattr(self.cipher, 'MODE_CTR') and self.mode == self.cipher.MODE_CTR - or self.is_aead): - # in counter mode, the "iv" must be incremented for each block - # it is calculated like this: - # +---------+------------------+---------+ - # | nonce | IV | counter | - # +---------+------------------+---------+ - # m bytes n bytes 4 bytes - # <--------------------------------------> - # block_size - nonce_size = self.cipher.block_size - self.iv_size - 4 - - # instead of asking for an extra parameter, we extract the last - # nonce_size bytes of the key and use them as the nonce. - # +----------------------------+---------+ - # | cipher key | nonce | - # +----------------------------+---------+ - # <---------> - # nonce_size - cipher_key, nonce = key[:-nonce_size], key[-nonce_size:] - if self.is_aead: - return self.cipher.new(cipher_key, self.mode, nonce + iv, - counter=Counter.new(4 * 8, prefix=nonce + iv)) - - return self.cipher.new(cipher_key, self.mode, - counter=Counter.new(4 * 8, prefix=nonce + iv)) + if self.is_aead and digest is not None: + # With AEAD, the mode needs the digest during decryption. + return Cipher( + self.cipher(key), + self.mode(iv, digest, len(digest)), + default_backend(), + ) else: - return self.cipher.new(key, self.mode, iv) + return Cipher( + self.cipher(key), + self.mode(iv), + default_backend(), + ) def pad(self, esp): """ Add the correct amount of padding so that the data to encrypt is exactly a multiple of the algorithm's block size. - Also, make sure that the total ESP packet length is a multiple of 4 or - 8 bytes with IP or IPv6 respectively. + Also, make sure that the total ESP packet length is a multiple of 4 + bytes. @param esp: an unencrypted _ESPPlain packet + + @return: an unencrypted _ESPPlain packet with valid padding """ # 2 extra bytes for padlen and nh data_len = len(esp.data) + 2 @@ -303,7 +295,9 @@ class CryptAlgo(object): # pad for block size esp.padlen = -data_len % align - # padding must be an array of bytes starting from 1 to padlen + # Still according to the RFC, the default value for padding *MUST* be an + # array of bytes starting from 1 to padlen + # TODO: Handle padding function according to the encryption algo esp.padding = ''.join(chr(b) for b in xrange(1, esp.padlen + 1)) # If the following test fails, it means that this algo does not comply @@ -326,19 +320,20 @@ class CryptAlgo(object): data = esp.data_for_encryption() if self.cipher: - self.check_key(key) cipher = self.new_cipher(key, esp.iv) + encryptor = cipher.encryptor() if self.is_aead: - cipher.update(struct.pack('!LL', esp.spi, esp.seq)) - data = cipher.encrypt(data) - data += cipher.digest()[:self.icv_size] + aad = struct.pack('!LL', esp.spi, esp.seq) + encryptor.authenticate_additional_data(aad) + data = encryptor.update(data) + encryptor.finalize() + data += encryptor.tag[:self.icv_size] else: - data = cipher.encrypt(data) + data = encryptor.update(data) + encryptor.finalize() return ESP(spi=esp.spi, seq=esp.seq, data=esp.iv + data) - def decrypt(self, esp, key, icv_size=0): + def decrypt(self, esp, key, icv_size=None): """ Decrypt an ESP packet @@ -347,23 +342,30 @@ class CryptAlgo(object): @param icv_size: the length of the icv used for integrity check @return: a valid ESP packet encrypted with this algorithm + @raise IPSecIntegrityError: if the integrity check fails with an AEAD + algorithm """ - self.check_key(key) - - if self.cipher and self.is_aead: - icv_size = self.icv_size + if icv_size is None: + icv_size = self.icv_size if self.is_aead else 0 iv = esp.data[:self.iv_size] data = esp.data[self.iv_size:len(esp.data) - icv_size] icv = esp.data[len(esp.data) - icv_size:] if self.cipher: - cipher = self.new_cipher(key, iv) + cipher = self.new_cipher(key, iv, icv) + decryptor = cipher.decryptor() if self.is_aead: - cipher.update(struct.pack('!LL', esp.spi, esp.seq)) + # Tag value check is done during the finalize method + decryptor.authenticate_additional_data( + struct.pack('!LL', esp.spi, esp.seq) + ) - data = cipher.decrypt(data) + try: + data = decryptor.update(data) + decryptor.finalize() + except InvalidTag as err: + raise IPSecIntegrityError(err) # extract padlen and nh padlen = ord(data[-2]) @@ -390,72 +392,53 @@ CRYPT_ALGOS = { 'NULL': CryptAlgo('NULL', cipher=None, mode=None, iv_size=0), } -if AES: +if algorithms: CRYPT_ALGOS['AES-CBC'] = CryptAlgo('AES-CBC', - cipher=AES, - mode=AES.MODE_CBC) - # specific case for counter mode: - # the last 4 bytes of the key are used to carry the nonce of the counter + cipher=algorithms.AES, + mode=modes.CBC) CRYPT_ALGOS['AES-CTR'] = CryptAlgo('AES-CTR', - cipher=AES, - mode=AES.MODE_CTR, - block_size=1, - iv_size=8, - key_size=(16 + 4, 24 + 4, 32 + 4)) - - # AEAD algorithms are only supported in pycrypto 2.7a1+ - # they also have an additional field icv_size, which is usually - # populated by an auth algo when signing and verifying signatures. - if hasattr(AES, "MODE_GCM"): - CRYPT_ALGOS['AES-GCM'] = CryptAlgo('AES-GCM', - cipher=AES, - mode=AES.MODE_GCM, - iv_size=8, - icv_size=16, - key_size=(16 + 4, 24 + 4, 32 + 4)) - if hasattr(AES, "MODE_CCM"): + cipher=algorithms.AES, + mode=modes.CTR, + salt_size=4) + CRYPT_ALGOS['AES-GCM'] = CryptAlgo('AES-GCM', + cipher=algorithms.AES, + mode=modes.GCM, + salt_size=4, + icv_size=16) + if hasattr(modes, 'CCM'): CRYPT_ALGOS['AES-CCM'] = CryptAlgo('AES-CCM', - cipher=AES, - mode=AES.MODE_CCM, - iv_size=8, - icv_size=16, - key_size=(16 + 4, 24 + 4, 32 + 4)) -if DES: - CRYPT_ALGOS['DES'] = CryptAlgo('DES', - cipher=DES, - mode=DES.MODE_CBC) -if Blowfish: + cipher=algorithms.AES, + mode=modes.CCM, + icv_size=16) + # XXX: Flagged as weak by 'cryptography'. Kept for backward compatibility CRYPT_ALGOS['Blowfish'] = CryptAlgo('Blowfish', - cipher=Blowfish, - mode=Blowfish.MODE_CBC) -if DES3: + cipher=algorithms.Blowfish, + mode=modes.CBC) + # XXX: RFC7321 states that DES *MUST NOT* be implemented. + # XXX: Keep for backward compatibility? + # Using a TripleDES cipher algorithm for DES is done by using the same 64 + # bits key 3 times (done by cryptography when given a 64 bits key) + CRYPT_ALGOS['DES'] = CryptAlgo('DES', + cipher=algorithms.TripleDES, + mode=modes.CBC, + key_size=(8,)) CRYPT_ALGOS['3DES'] = CryptAlgo('3DES', - cipher=DES3, - mode=DES3.MODE_CBC) -if CAST: + cipher=algorithms.TripleDES, + mode=modes.CBC) CRYPT_ALGOS['CAST'] = CryptAlgo('CAST', - cipher=CAST, - mode=CAST.MODE_CBC) + cipher=algorithms.CAST5, + mode=modes.CBC) #------------------------------------------------------------------------------ try: - from Crypto.Hash import HMAC - from Crypto.Hash import SHA - from Crypto.Hash import MD5 - from Crypto.Hash import SHA256 - from Crypto.Hash import SHA384 - from Crypto.Hash import SHA512 + from cryptography.hazmat.primitives.hmac import HMAC + from cryptography.hazmat.primitives.cmac import CMAC + from cryptography.hazmat.primitives import hashes except ImportError: - # no error if pycrypto is not available but authentication won't be supported + # no error if cryptography is not available but authentication won't be supported HMAC = None - SHA = None - MD5 = None - SHA256 = None - SHA384 = None -try: - from Crypto.Hash import XCBCMAC -except ImportError: - XCBCMAC = None + CMAC = None + hashes = None #------------------------------------------------------------------------------ class IPSecIntegrityError(Exception): @@ -500,11 +483,10 @@ class AuthAlgo(object): @param key: a byte string @return: an initialized mac object for this algo """ - if self.mac is XCBCMAC: - # specific case here, ciphermod instead of digestmod - return self.mac.new(key, ciphermod=self.digestmod) + if self.mac is CMAC: + return self.mac(self.digestmod(key), default_backend()) else: - return self.mac.new(key, digestmod=self.digestmod) + return self.mac(key, self.digestmod(), default_backend()) def sign(self, pkt, key): """ @@ -518,18 +500,16 @@ class AuthAlgo(object): if not self.mac: return pkt - self.check_key(key) - mac = self.new_mac(key) if pkt.haslayer(ESP): mac.update(str(pkt[ESP])) - pkt[ESP].data += mac.digest()[:self.icv_size] + pkt[ESP].data += mac.finalize()[:self.icv_size] elif pkt.haslayer(AH): clone = zero_mutable_fields(pkt.copy(), sending=True) mac.update(str(clone)) - pkt[AH].icv = mac.digest()[:self.icv_size] + pkt[AH].icv = mac.finalize()[:self.icv_size] return pkt @@ -545,8 +525,6 @@ class AuthAlgo(object): if not self.mac or self.icv_size == 0: return - self.check_key(key) - mac = self.new_mac(key) pkt_icv = 'not found' @@ -554,19 +532,17 @@ class AuthAlgo(object): if isinstance(pkt, ESP): pkt_icv = pkt.data[len(pkt.data) - self.icv_size:] - - pkt = pkt.copy() - pkt.data = pkt.data[:len(pkt.data) - self.icv_size] - mac.update(str(pkt)) - computed_icv = mac.digest()[:self.icv_size] + clone = pkt.copy() + clone.data = clone.data[:len(clone.data) - self.icv_size] elif pkt.haslayer(AH): pkt_icv = pkt[AH].icv[:self.icv_size] - clone = zero_mutable_fields(pkt.copy(), sending=False) - mac.update(str(clone)) - computed_icv = mac.digest()[:self.icv_size] + mac.update(str(clone)) + computed_icv = mac.finalize()[:self.icv_size] + + # XXX: Cannot use mac.verify because the ICV can be truncated if pkt_icv != computed_icv: raise IPSecIntegrityError('pkt_icv=%r, computed_icv=%r' % (pkt_icv, computed_icv)) @@ -579,38 +555,35 @@ AUTH_ALGOS = { 'NULL': AuthAlgo('NULL', mac=None, digestmod=None, icv_size=0), } -if HMAC: - if SHA: - AUTH_ALGOS['HMAC-SHA1-96'] = AuthAlgo('HMAC-SHA1-96', - mac=HMAC, - digestmod=SHA, - icv_size=12) - if SHA256: - AUTH_ALGOS['SHA2-256-128'] = AuthAlgo('SHA2-256-128', - mac=HMAC, - digestmod=SHA256, - icv_size=16) - if SHA384: - AUTH_ALGOS['SHA2-384-192'] = AuthAlgo('SHA2-384-192', - mac=HMAC, - digestmod=SHA384, - icv_size=24) - if SHA512: - AUTH_ALGOS['SHA2-512-256'] = AuthAlgo('SHA2-512-256', - mac=HMAC, - digestmod=SHA512, - icv_size=32) - if MD5: - AUTH_ALGOS['HMAC-MD5-96'] = AuthAlgo('HMAC-MD5-96', - mac=HMAC, - digestmod=MD5, - icv_size=12) -if AES and XCBCMAC: - AUTH_ALGOS['AES-XCBC-96'] = AuthAlgo('AES-XCBC-96', - mac=XCBCMAC, - digestmod=AES, - icv_size=12, - key_size=(16,)) +if HMAC and hashes: + # XXX: NIST has deprecated SHA1 but is required by RFC7321 + AUTH_ALGOS['HMAC-SHA1-96'] = AuthAlgo('HMAC-SHA1-96', + mac=HMAC, + digestmod=hashes.SHA1, + icv_size=12) + AUTH_ALGOS['SHA2-256-128'] = AuthAlgo('SHA2-256-128', + mac=HMAC, + digestmod=hashes.SHA256, + icv_size=16) + AUTH_ALGOS['SHA2-384-192'] = AuthAlgo('SHA2-384-192', + mac=HMAC, + digestmod=hashes.SHA384, + icv_size=24) + AUTH_ALGOS['SHA2-512-256'] = AuthAlgo('SHA2-512-256', + mac=HMAC, + digestmod=hashes.SHA512, + icv_size=32) + # XXX:Flagged as deprecated by 'cryptography'. Kept for backward compat + AUTH_ALGOS['HMAC-MD5-96'] = AuthAlgo('HMAC-MD5-96', + mac=HMAC, + digestmod=hashes.MD5, + icv_size=12) +if CMAC and algorithms: + AUTH_ALGOS['AES-CMAC-96'] = AuthAlgo('AES-CMAC-96', + mac=CMAC, + digestmod=algorithms.AES, + icv_size=12, + key_size=(16,)) #------------------------------------------------------------------------------ def split_for_transport(orig_pkt, transport_proto): @@ -781,8 +754,15 @@ class SecurityAssociation(object): raise TypeError('unsupported encryption algo %r, try %r' % (crypt_algo, CRYPT_ALGOS.keys())) self.crypt_algo = CRYPT_ALGOS[crypt_algo] - self.crypt_algo.check_key(crypt_key) - self.crypt_key = crypt_key + + if crypt_key: + salt_size = self.crypt_algo.salt_size + self.crypt_key = crypt_key[:len(crypt_key) - salt_size] + self.crypt_salt = crypt_key[len(crypt_key) - salt_size:] + else: + self.crypt_key = None + self.crypt_salt = None + else: self.crypt_algo = CRYPT_ALGOS['NULL'] self.crypt_key = None @@ -792,7 +772,6 @@ class SecurityAssociation(object): raise TypeError('unsupported integrity algo %r, try %r' % (auth_algo, AUTH_ALGOS.keys())) self.auth_algo = AUTH_ALGOS[auth_algo] - self.auth_algo.check_key(auth_key) self.auth_key = auth_key else: self.auth_algo = AUTH_ALGOS['NULL'] @@ -818,6 +797,8 @@ class SecurityAssociation(object): if iv is None: iv = self.crypt_algo.generate_iv() + if self.crypt_salt: + iv = self.crypt_salt + iv else: if len(iv) != self.crypt_algo.iv_size: raise TypeError('iv length must be %s' % self.crypt_algo.iv_size) @@ -949,6 +930,7 @@ class SecurityAssociation(object): self.auth_algo.verify(encrypted, self.auth_key) esp = self.crypt_algo.decrypt(encrypted, self.crypt_key, + self.crypt_algo.icv_size or self.auth_algo.icv_size) if self.tunnel_header: diff --git a/scapy/layers/tls/__init__.py b/scapy/layers/tls/__init__.py index 06203f002d043dd8fb394687fc3ac950aac8ddba..98ee163d88ece95c6548dff00d36d276d714d492 100644 --- a/scapy/layers/tls/__init__.py +++ b/scapy/layers/tls/__init__.py @@ -8,11 +8,11 @@ Tools for handling TLS sessions and digital certificates. """ try: - import Crypto + import cryptography except ImportError: import logging log_loading = logging.getLogger("scapy.loading") - log_loading.info("Can't import python Crypto lib. Disabled certificate manipulation tools") + log_loading.info("Can't import python cryptography lib. Disabled certificate manipulation tools") try: import ecdsa diff --git a/scapy/layers/tls/cert.py b/scapy/layers/tls/cert.py index 7ed092a8e6301e018923319c35ec2910f5dca424..56e505fb290287e45829e8ee69a3726d8b324a5c 100644 --- a/scapy/layers/tls/cert.py +++ b/scapy/layers/tls/cert.py @@ -29,7 +29,9 @@ Supports both RSA and ECDSA objects. import base64, os, time import ecdsa -from Crypto.PublicKey import RSA +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import hashes from scapy.layers.tls.crypto.curves import import_curve from scapy.layers.tls.crypto.pkcs1 import pkcs_os2ip, pkcs_i2osp, mapHashFunc @@ -243,7 +245,10 @@ class PubKeyRSA(_PKIObj, PubKey, _EncryptAndVerifyRSA): self.modulus = pubkey.modulus.val self.modulusLen = len(binrepr(pubkey.modulus.val)) self.pubExp = pubkey.publicExponent.val - self.key = RSA.construct((self.modulus, self.pubExp, )) + self.key = rsa.RSAPublicNumbers( + n=self.modulus, + e=self.pubExp + ).public_key(default_backend()) def encrypt(self, msg, t=None, h=None, mgf=None, L=None): # no ECDSA encryption support, hence no ECDSA specific keywords here return _EncryptAndVerifyRSA.encrypt(self, msg, t=t, h=h, mgf=mgf, L=L) @@ -396,7 +401,11 @@ class PrivKeyRSA(_PKIObj, PrivKey, _EncryptAndVerifyRSA, _DecryptAndSignRSA): self.exponent1 = privkey.exponent1.val self.exponent2 = privkey.exponent2.val self.coefficient = privkey.coefficient.val - self.key = RSA.construct((self.modulus, self.pubExp, self.privExp)) + self.key = rsa.RSAPrivateNumbers( + p=self.prime1, q=self.prime2, d=self.privExp, dmp1=self.exponent1, + dmq1=self.exponent2, iqmp=self.coefficient, + public_numbers=rsa.RSAPublicNumbers(n=self.modulus, e=self.pubExp), + ).private_key(default_backend()) def verify(self, msg, sig, h=None, t=None, mgf=None, sLen=None, sigdecode=None): diff --git a/scapy/layers/tls/crypto/pkcs1.py b/scapy/layers/tls/crypto/pkcs1.py index 6a3dceca18c82b847a6b349d7bc7676dcd709a81..3ae21e543cc1b70374d00107127c24e77b7158f6 100644 --- a/scapy/layers/tls/crypto/pkcs1.py +++ b/scapy/layers/tls/crypto/pkcs1.py @@ -9,8 +9,11 @@ PKCS #1 methods as defined in RFC 3447. import os, popen2, tempfile import math, random, struct -from hashlib import md5, sha1, sha224, sha256, sha384, sha512 -from Crypto.Hash import MD2, MD4 + +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding ##################################################################### @@ -105,42 +108,38 @@ def pkcs_ilen(n): # PKCS#1 v2.1, MD4 should not be used. # - 'tls' one is the concatenation of both md5 and sha1 hashes used # by SSL/TLS when signing/verifying things +def _hashWrapper(hash_algo, message, backend=default_backend()): + digest = hashes.Hash(hash_algo, backend).update(message) + return digest.finalize() + _hashFuncParams = { - "md2" : (16, - MD2.new, - lambda x: MD2.new(x).digest(), - '\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x02\x05\x00\x04\x10'), - "md4" : (16, - MD4.new, - lambda x: MD4.new(x).digest(), - '\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x04\x05\x00\x04\x10'), "md5" : (16, - md5, - lambda x: md5(x).digest(), + hashes.MD5, + lambda x: _hashWrapper(hashes.MD5, x), '\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x05\x05\x00\x04\x10'), "sha1" : (20, - sha1, - lambda x: sha1(x).digest(), + hashes.SHA1, + lambda x: _hashWrapper(hashes.SHA1, x), '\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'), "sha224" : (28, - sha224, - lambda x: sha224(x).digest(), + hashes.SHA224, + lambda x: _hashWrapper(hashes.SHA224, x), '\x30\x2d\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x04\x05\x00\x04\x1c'), "sha256" : (32, - sha256, - lambda x: sha256(x).digest(), + hashes.SHA256, + lambda x: _hashWrapper(hashes.SHA256, x), '\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20'), "sha384" : (48, - sha384, - lambda x: sha384(x).digest(), + hashes.SHA384, + lambda x: _hashWrapper(hashes.SHA384, x), '\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30'), "sha512" : (64, - sha512, - lambda x: sha512(x).digest(), + hashes.SHA512, + lambda x: _hashWrapper(hashes.SHA512, x), '\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40'), "tls" : (36, None, - lambda x: md5(x).digest() + sha1(x).digest(), + lambda x: _hashWrapper(hashes.MD5, x) + _hashWrapper(hashes.SHA1, x), '') } @@ -425,135 +424,6 @@ def create_temporary_ca_path(anchor_list, folder): ##################################################################### class _EncryptAndVerifyRSA(object): - ### Below are encryption methods - - def _rsaep(self, m): - """ - Internal method providing raw RSA encryption, i.e. simple modular - exponentiation of the given message representative 'm', a long - between 0 and n-1. - - This is the encryption primitive RSAEP described in PKCS#1 v2.1, - i.e. RFC 3447 Sect. 5.1.1. - - Input: - m: message representative, a long between 0 and n-1, where - n is the key modulus. - - Output: - ciphertext representative, a long between 0 and n-1 - - Not intended to be used directly. Please, see encrypt() method. - """ - - n = self.modulus - if isinstance(m, int): - m = long(m) - if (not isinstance(m, long)) or m > n-1: - _warning("Key._rsaep() expects a long between 0 and n-1") - return None - - return self.key.encrypt(m, "")[0] - - - def _rsaes_pkcs1_v1_5_encrypt(self, M): - """ - Implements RSAES-PKCS1-V1_5-ENCRYPT() function described in section - 7.2.1 of RFC 3447. - - Input: - M: message to be encrypted, an octet string of length mLen, where - mLen <= k-11 (k denotes the length in octets of the key modulus) - - Output: - ciphertext, an octet string of length k - - On error, None is returned. - """ - - # 1) Length checking - mLen = len(M) - k = self.modulusLen / 8 - if mLen > k - 11: - _warning("Key._rsaes_pkcs1_v1_5_encrypt(): message too " - "long (%d > %d - 11)" % (mLen, k)) - return None - - # 2) EME-PKCS1-v1_5 encoding - PS = zerofree_randstring(k - mLen - 3) # 2.a) - EM = '\x00' + '\x02' + PS + '\x00' + M # 2.b) - - # 3) RSA encryption - m = pkcs_os2ip(EM) # 3.a) - c = self._rsaep(m) # 3.b) - C = pkcs_i2osp(c, k) # 3.c) - - return C # 4) - - - def _rsaes_oaep_encrypt(self, M, h=None, mgf=None, L=None): - """ - Internal method providing RSAES-OAEP-ENCRYPT as defined in Sect. - 7.1.1 of RFC 3447. Not intended to be used directly. Please, see - encrypt() method for type "OAEP". - - Input: - M : message to be encrypted, an octet string of length mLen - where mLen <= k - 2*hLen - 2 (k denotes the length in octets - of the RSA modulus and hLen the length in octets of the hash - function output) - h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls', - 'sha256', 'sha384'). hLen denotes the length in octets of - the hash function output. 'sha1' is used by default if not - provided. - mgf: the mask generation function f : seed, maskLen -> mask - L : optional label to be associated with the message; the default - value for L, if not provided is the empty string - - Output: - ciphertext, an octet string of length k - - On error, None is returned. - """ - # The steps below are the one described in Sect. 7.1.1 of RFC 3447. - # 1) Length Checking - # 1.a) is not done - mLen = len(M) - if h is None: - h = "sha1" - if not _hashFuncParams.has_key(h): - _warning("Key._rsaes_oaep_encrypt(): unknown hash function %s." % h) - return None - hLen = _hashFuncParams[h][0] - hFun = _hashFuncParams[h][2] - k = self.modulusLen / 8 - if mLen > k - 2*hLen - 2: # 1.b) - _warning("Key._rsaes_oaep_encrypt(): message too long.") - return None - - # 2) EME-OAEP encoding - if L is None: # 2.a) - L = "" - lHash = hFun(L) - PS = '\x00'*(k - mLen - 2*hLen - 2) # 2.b) - DB = lHash + PS + '\x01' + M # 2.c) - seed = randstring(hLen) # 2.d) - if mgf is None: # 2.e) - mgf = lambda x,y: pkcs_mgf1(x,y,h) - dbMask = mgf(seed, k - hLen - 1) - maskedDB = strxor(DB, dbMask) # 2.f) - seedMask = mgf(maskedDB, hLen) # 2.g) - maskedSeed = strxor(seed, seedMask) # 2.h) - EM = '\x00' + maskedSeed + maskedDB # 2.i) - - # 3) RSA Encryption - m = pkcs_os2ip(EM) # 3.a) - c = self._rsaep(m) # 3.b) - C = pkcs_i2osp(c, k) # 3.c) - - return C # 4) - - def encrypt(self, m, t=None, h=None, mgf=None, L=None): """ Encrypt message 'm' using 't' encryption scheme where 't' can be: @@ -587,126 +457,33 @@ class _EncryptAndVerifyRSA(object): function regarding the size of 'L' (for instance, 2^61 - 1 for SHA-1). You have been warned. """ - + if h is not None: + h = mapHashFunc(h) if t is None: # Raw encryption - m = pkcs_os2ip(m) - c = self._rsaep(m) - return pkcs_i2osp(c, self.modulusLen/8) - + return self.key.encrypt( + m, + padding.AsymmetricPadding(), + ) elif t == "pkcs": - return self._rsaes_pkcs1_v1_5_encrypt(m) + return self.key.encrypt( + m, + padding.PKCS1v15(), + ) elif t == "oaep": - return self._rsaes_oaep_encrypt(m, h, mgf, L) + return self.key.encrypt( + m, + padding.OAEP( + mgf=mgf(h()), + algorithm=h(), + label=L, + ), + ) else: _warning("Key.encrypt(): Unknown encryption type (%s) provided" % t) return None - ### Below are verification related methods - - def _rsavp1(self, s): - """ - Internal method providing raw RSA verification, i.e. simple modular - exponentiation of the given signature representative 'c', an integer - between 0 and n-1. - - This is the signature verification primitive RSAVP1 described in - PKCS#1 v2.1, i.e. RFC 3447 Sect. 5.2.2. - - Input: - s: signature representative, an integer between 0 and n-1, - where n is the key modulus. - - Output: - message representative, an integer between 0 and n-1 - - Not intended to be used directly. Please, see verify() method. - """ - return self._rsaep(s) - - def _rsassa_pss_verify(self, M, S, h=None, mgf=None, sLen=None): - """ - Implements RSASSA-PSS-VERIFY() function described in Sect 8.1.2 - of RFC 3447 - - Input: - M: message whose signature is to be verified - S: signature to be verified, an octet string of length k, where k - is the length in octets of the RSA modulus n. - - Output: - True is the signature is valid. False otherwise. - """ - - # Set default parameters if not provided - if h is None: # By default, sha1 - h = "sha1" - if not _hashFuncParams.has_key(h): - _warning("Key._rsassa_pss_verify(): unknown hash function " - "provided (%s)" % h) - return False - if mgf is None: # use mgf1 with underlying hash function - mgf = lambda x,y: pkcs_mgf1(x, y, h) - if sLen is None: # use Hash output length (A.2.3 of RFC 3447) - hLen = _hashFuncParams[h][0] - sLen = hLen - - # 1) Length checking - modBits = self.modulusLen - k = modBits / 8 - if len(S) != k: - return False - - # 2) RSA verification - s = pkcs_os2ip(S) # 2.a) - m = self._rsavp1(s) # 2.b) - emLen = math.ceil((modBits - 1) / 8.) # 2.c) - EM = pkcs_i2osp(m, emLen) - - # 3) EMSA-PSS verification - Result = pkcs_emsa_pss_verify(M, EM, modBits - 1, h, mgf, sLen) - - return Result # 4) - - - def _rsassa_pkcs1_v1_5_verify(self, M, S, h): - """ - Implements RSASSA-PKCS1-v1_5-VERIFY() function as described in - Sect. 8.2.2 of RFC 3447. - - Input: - M: message whose signature is to be verified, an octet string - S: signature to be verified, an octet string of length k, where - k is the length in octets of the RSA modulus n - h: hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls', - 'sha256', 'sha384'). - - Output: - True if the signature is valid. False otherwise. - """ - - # 1) Length checking - k = self.modulusLen / 8 - if len(S) != k: - _warning("invalid signature (len(S) != k)") - return False - - # 2) RSA verification - s = pkcs_os2ip(S) # 2.a) - m = self._rsavp1(s) # 2.b) - EM = pkcs_i2osp(m, k) # 2.c) - - # 3) EMSA-PKCS1-v1_5 encoding - EMPrime = pkcs_emsa_pkcs1_v1_5_encode(M, k, h) - if EMPrime is None: - _warning("Key._rsassa_pkcs1_v1_5_verify(): unable to encode.") - return False - - # 4) Comparison - return EM == EMPrime - - def verify(self, M, S, t=None, h=None, mgf=None, sLen=None): """ Verify alleged signature 'S' is indeed the signature of message 'M' @@ -744,31 +521,33 @@ class _EncryptAndVerifyRSA(object): default value (the byte length of the hash value for provided algorithm) by providing another one with that parameter. """ - if t is None: # RSAVP1 - S = pkcs_os2ip(S) - n = self.modulus - if S > n-1: - _warning("Signature to be verified is too long for key modulus") - return False - m = self._rsavp1(S) - if m is None: - return False - l = int(math.ceil(math.log(m, 2) / 8.)) # Hack - m = pkcs_i2osp(m, l) - return M == m + if h is not None: + h = mapHashFunc(h) + if t is None: # RSAVP1 + pad_inst = padding.AsymmetricPadding() elif t == "pkcs": # RSASSA-PKCS1-v1_5-VERIFY if h is None: - h = "sha1" - return self._rsassa_pkcs1_v1_5_verify(M, S, h) - + h = hashes.SHA1 + pad_inst = padding.PKCS1v15() elif t == "pss": # RSASSA-PSS-VERIFY - return self._rsassa_pss_verify(M, S, h, mgf, sLen) + pad_inst = padding.PSS(mgf=mgf, salt_length=sLen) else: _warning("Key.verify(): Unknown signature type (%s) provided" % t) return None + try: + self.key.verify( + signature=S, + data=M, + padding=pad_inst, + algorithm=h(), + ) + return True + except InvalidSignature: + return False + class _DecryptAndSignRSA(object): ### Below are decryption related methods. Encryption ones are inherited ### from PubKey diff --git a/scapy/tools/UTscapy.py b/scapy/tools/UTscapy.py index 97ca2a8419add561fe108f10ba29aea5c70933a5..515e0ededcde73194494eef26fe06f14356257b4 100755 --- a/scapy/tools/UTscapy.py +++ b/scapy/tools/UTscapy.py @@ -112,11 +112,17 @@ class Format(EnumClass): class TestClass: def __getitem__(self, item): return getattr(self, item) - def add_keywords(self, kw): - if kw is str: - self.keywords.append(kw) - else: - self.keywords += kw + def add_keywords(self, kws): + if isinstance(kws, basestring): + kws = [kws] + for kwd in kws: + if kwd.startswith('-'): + try: + self.keywords.remove(kwd[1:]) + except KeyError: + pass + else: + self.keywords.add(kwd) class TestCampaign(TestClass): def __init__(self, title): @@ -124,13 +130,14 @@ class TestCampaign(TestClass): self.filename = None self.headcomments = "" self.campaign = [] - self.keywords = [] + self.keywords = set() self.crc = None self.sha = None self.preexec = None self.preexec_output = None def add_testset(self, testset): self.campaign.append(testset) + testset.keywords.update(self.keywords) def __iter__(self): return self.campaign.__iter__() def all_tests(self): @@ -143,11 +150,12 @@ class TestSet(TestClass): self.name = name self.tests = [] self.comments = "" - self.keywords = [] + self.keywords = set() self.crc = None self.expand = 1 def add_test(self, test): self.tests.append(test) + test.keywords.update(self.keywords) def __iter__(self): return self.tests.__iter__() @@ -160,7 +168,7 @@ class UnitTest(TestClass): self.res = True # must be True at init to have a different truth value than None self.output = "" self.num = -1 - self.keywords = [] + self.keywords = set() self.crc = None self.expand = 1 def __nonzero__(self): @@ -180,7 +188,7 @@ def parse_campaign_file(campaign_file): if l[0] == '#': continue if l[0] == "~": - (test or testset or campaign_file).add_keywords(l[1:].split()) + (test or testset or test_campaign).add_keywords(l[1:].split()) elif l[0] == "%": test_campaign.title = l[1:].strip() elif l[0] == "+": diff --git a/test/cert.uts b/test/cert.uts index 2b01c8b1ada29d91678c6e9b689096104b9ee962..876286d26b414b9c59bb317fb0c2ac67a3b6e21e 100644 --- a/test/cert.uts +++ b/test/cert.uts @@ -3,6 +3,7 @@ # Try me with: # bash test/run_tests -t test/cert.uts -F +~ crypto ########### PKCS helpers ############################################### diff --git a/test/ipsec.uts b/test/ipsec.uts index dfa8ce1c7842eadaf0f1fbb5ae6a185d8ff79a67..7a49e2855b73b007cc65b668c673f830a18fa3bd 100644 --- a/test/ipsec.uts +++ b/test/ipsec.uts @@ -2,11 +2,14 @@ % IPSec layer regression tests ############################## +~ crypto + ############################################################################### -+ IPv4 / ESP ++ IPv4 / ESP - Transport - Encryption Algorithms ####################################### -= IPv4 / ESP - Transport - AES-CBC - NULL += IPv4 / ESP - Transport - NULL - NULL +~ -crypto import socket @@ -17,7 +20,38 @@ p = IP(str(p)) p sa = SecurityAssociation(ESP, spi=0x222, - crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + crypt_algo='NULL', crypt_key=None, + auth_algo='NULL', auth_key=None) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +assert('testdata' in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Transport - DES - NULL + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='DES', crypt_key='8bytekey', auth_algo='NULL', auth_key=None) e = sa.encrypt(p) @@ -26,6 +60,7 @@ e assert(isinstance(e, IP)) assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') assert(e.chksum != p.chksum) +* the encrypted packet should have an ESP layer assert(e.proto == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) @@ -40,7 +75,7 @@ d assert(d[TCP] == p[TCP]) ####################################### -= IPv4 / ESP - Transport - NULL - HMAC-SHA1-96 += IPv4 / ESP - Transport - 3DES - NULL p = IP(src='1.1.1.1', dst='2.2.2.2') p /= TCP(sport=45012, dport=80) @@ -49,8 +84,8 @@ p = IP(str(p)) p sa = SecurityAssociation(ESP, spi=0x222, - crypt_algo='NULL', crypt_key=None, - auth_algo='HMAC-SHA1-96', auth_key='secret key') + crypt_algo='3DES', crypt_key='threedifferent8byteskeys', + auth_algo='NULL', auth_key=None) e = sa.encrypt(p) e @@ -58,20 +93,22 @@ e assert(isinstance(e, IP)) assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') assert(e.chksum != p.chksum) +* the encrypted packet should have an ESP layer assert(e.proto == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) assert(e[ESP].spi == sa.spi) -assert('testdata' in e[ESP].data) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) -* integrity verification should pass d = sa.decrypt(e) +d * after decryption the original packet payload should be unaltered assert(d[TCP] == p[TCP]) ####################################### -= IPv4 / ESP - Transport - NULL - HMAC-SHA1-96 - altered packet += IPv4 / ESP - Transport - AES-CBC - NULL p = IP(src='1.1.1.1', dst='2.2.2.2') p /= TCP(sport=45012, dport=80) @@ -80,8 +117,8 @@ p = IP(str(p)) p sa = SecurityAssociation(ESP, spi=0x222, - crypt_algo='NULL', crypt_key=None, - auth_algo='HMAC-SHA1-96', auth_key='secret key') + crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + auth_algo='NULL', auth_key=None) e = sa.encrypt(p) e @@ -93,20 +130,17 @@ assert(e.proto == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) assert(e[ESP].spi == sa.spi) -assert('testdata' in e[ESP].data) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) -* simulate the alteration of the packet before decryption -e[ESP].data = e[ESP].data.replace('\x01', '\x21') +d = sa.decrypt(e) +d -* integrity verification should fail -try: - d = sa.decrypt(e) - assert(False) -except IPSecIntegrityError, err: - err +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) ####################################### -= IPv4 / ESP - Tunnel - AES-CTR - NULL += IPv4 / ESP - Transport - AES-CTR - NULL p = IP(src='1.1.1.1', dst='2.2.2.2') p /= TCP(sport=45012, dport=80) @@ -116,6 +150,139 @@ p sa = SecurityAssociation(ESP, spi=0x222, crypt_algo='AES-CTR', crypt_key='16bytekey+4bytenonce', + auth_algo='NULL', auth_key=None) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption original packet should be preserved +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Transport - Blowfish - NULL + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='Blowfish', crypt_key='sixteenbytes key', + auth_algo='NULL', auth_key=None) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption original packet should be preserved +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Transport - CAST - NULL + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='CAST', crypt_key='sixteenbytes key', + auth_algo='NULL', auth_key=None) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption original packet should be preserved +assert(d[TCP] == p[TCP]) + +############################################################################### ++ IPv4 / ESP - Tunnel - Encryption Algorithms + +####################################### += IPv4 / ESP - Tunnel - NULL - NULL +~ -crypto + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='NULL', auth_key=None, + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +assert('testdata' in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Tunnel - DES - NULL + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='DES', crypt_key='8bytekey', auth_algo='NULL', auth_key=None, tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) @@ -126,6 +293,7 @@ assert(isinstance(e, IP)) * after encryption packet should be encapsulated with the given ip tunnel header assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') assert(e.chksum != p.chksum) +* the encrypted packet should have an ESP layer assert(e.proto == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) @@ -136,12 +304,11 @@ assert('testdata' not in e[ESP].data) d = sa.decrypt(e) d -* after decryption original packet should be preserved -assert(d == p) +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) ####################################### -= IPv4 / ESP - Tunnel - AES-GCM - NULL -~ combined_modes += IPv4 / ESP - Tunnel - 3DES - NULL p = IP(src='1.1.1.1', dst='2.2.2.2') p /= TCP(sport=45012, dport=80) @@ -150,7 +317,7 @@ p = IP(str(p)) p sa = SecurityAssociation(ESP, spi=0x222, - crypt_algo='AES-GCM', crypt_key='16bytekey+4bytenonce', + crypt_algo='3DES', crypt_key='threedifferent8byteskeys', auth_algo='NULL', auth_key=None, tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) @@ -161,6 +328,7 @@ assert(isinstance(e, IP)) * after encryption packet should be encapsulated with the given ip tunnel header assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') assert(e.chksum != p.chksum) +* the encrypted packet should have an ESP layer assert(e.proto == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) @@ -171,12 +339,11 @@ assert('testdata' not in e[ESP].data) d = sa.decrypt(e) d -* after decryption original packet should be preserved -assert(d == p) +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) ####################################### -= IPv4 / ESP - Tunnel - AES-CCM - NULL -~ combined_modes += IPv4 / ESP - Tunnel - AES-CBC - NULL p = IP(src='1.1.1.1', dst='2.2.2.2') p /= TCP(sport=45012, dport=80) @@ -185,7 +352,41 @@ p = IP(str(p)) p sa = SecurityAssociation(ESP, spi=0x222, - crypt_algo='AES-CCM', crypt_key='16bytekey+4bytenonce', + crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + auth_algo='NULL', auth_key=None, + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Tunnel - AES-CTR - NULL + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-CTR', crypt_key='16bytekey+4bytenonce', auth_algo='NULL', auth_key=None, tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) @@ -203,76 +404,2283 @@ assert(e[ESP].spi == sa.spi) * after encryption the original packet payload should NOT be readable assert('testdata' not in e[ESP].data) -d = sa.decrypt(e) -d +d = sa.decrypt(e) +d + +* after decryption original packet should be preserved +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Tunnel - Blowfish - NULL + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='Blowfish', crypt_key='sixteenbytes key', + auth_algo='NULL', auth_key=None, + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption original packet should be preserved +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Tunnel - CAST - NULL + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='CAST', crypt_key='sixteenbytes key', + auth_algo='NULL', auth_key=None, + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption original packet should be preserved +assert(d[TCP] == p[TCP]) + +############################################################################### ++ IPv4 / ESP - Transport - Authentication Algorithms + +####################################### += IPv4 / ESP - Transport - NULL - HMAC-SHA1-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='HMAC-SHA1-96', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Transport - NULL - HMAC-SHA1-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='HMAC-SHA1-96', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Transport - NULL - SHA2-256-128 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-256-128', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet should be preserved +assert(d == p) + +####################################### += IPv4 / ESP - Transport - NULL - SHA2-256-128 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-256-128', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Transport - NULL - SHA2-384-192 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-384-192', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet should be preserved +assert(d == p) + +####################################### += IPv4 / ESP - Transport - NULL - SHA2-384-192 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-384-192', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Transport - NULL - SHA2-512-256 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-512-256', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet should be preserved +assert(d == p) + +####################################### += IPv4 / ESP - Transport - NULL - SHA2-512-256 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-512-256', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Transport - NULL - HMAC-MD5-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='HMAC-MD5-96', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet should be preserved +assert(d == p) + +####################################### += IPv4 / ESP - Transport - NULL - HMAC-MD5-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='HMAC-MD5-96', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Transport - NULL - AES-CMAC-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='AES-CMAC-96', auth_key='sixteenbytes key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet should be preserved +assert(d == p) + +####################################### += IPv4 / ESP - Transport - NULL - AES-CMAC-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='AES-CMAC-96', auth_key='sixteenbytes key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +############################################################################### ++ IPv4 / ESP - Tunnel - Authentication Algorithms + +####################################### += IPv4 / ESP - Tunnel - NULL - HMAC-SHA1-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='HMAC-SHA1-96', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Tunnel - NULL - HMAC-SHA1-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='HMAC-SHA1-96', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Tunnel - NULL - SHA2-256-128 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-256-128', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet should be preserved +assert(d == p) + +####################################### += IPv4 / ESP - Tunnel - NULL - SHA2-256-128 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-256-128', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Tunnel - NULL - SHA2-384-192 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-384-192', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet should be preserved +assert(d == p) + +####################################### += IPv4 / ESP - Tunnel - NULL - SHA2-384-192 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-384-192', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Tunnel - NULL - SHA2-512-256 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-512-256', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet should be preserved +assert(d == p) + +####################################### += IPv4 / ESP - Tunnel - NULL - SHA2-512-256 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='SHA2-512-256', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Tunnel - NULL - HMAC-MD5-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='HMAC-MD5-96', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet should be preserved +assert(d == p) + +####################################### += IPv4 / ESP - Tunnel - NULL - HMAC-MD5-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='HMAC-MD5-96', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Tunnel - NULL - AES-CMAC-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='AES-CMAC-96', auth_key='sixteenbytes key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet should be preserved +assert(d == p) + +####################################### += IPv4 / ESP - Tunnel - NULL - AES-CMAC-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='AES-CMAC-96', auth_key='sixteenbytes key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should be readable +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +############################################################################### ++ IPv4 / ESP - Encryption + Authentication + +####################################### += IPv4 / ESP - Transport - AES-CBC - HMAC-SHA1-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + auth_algo='HMAC-SHA1-96', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Transport - AES-CBC - HMAC-SHA1-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + auth_algo='HMAC-SHA1-96', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].seq += 1 + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Transport - AES-GCM - NULL + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-GCM', crypt_key='16bytekey+4bytenonce', + auth_algo='NULL', auth_key=None) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption original packet should be preserved +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Transport - AES-GCM - NULL - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-GCM', crypt_key='16bytekey+4bytenonce', + auth_algo='NULL', auth_key=None) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].seq += 1 + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err +####################################### += IPv4 / ESP - Tunnel - AES-CBC - HMAC-SHA1-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + auth_algo='HMAC-SHA1-96', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Tunnel - AES-CBC - HMAC-SHA1-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + auth_algo='HMAC-SHA1-96', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].seq += 1 + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / ESP - Tunnel - AES-GCM - NULL + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-GCM', crypt_key='16bytekey+4bytenonce', + auth_algo='NULL', auth_key=None, + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption original packet should be preserved +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Tunnel - AES-GCM - NULL - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-GCM', crypt_key='16bytekey+4bytenonce', + auth_algo='NULL', auth_key=None, + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].seq += 1 + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +############################################################################### ++ IPv4 / AH - Transport + +####################################### += IPv4 / AH - Transport - HMAC-SHA1-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='HMAC-SHA1-96', auth_key='sixteenbytes key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / AH - Transport - HMAC-SHA1-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='HMAC-SHA1-96', auth_key='sixteenbytes key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before decryption +e[TCP].sport = 5 + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / AH - Transport - SHA2-256-128 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-256-128', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / AH - Transport - SHA2-256-128 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-256-128', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before verification +e[TCP].dport = 46 + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / AH - Transport - SHA2-384-192 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-384-192', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / AH - Transport - SHA2-384-192 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-384-192', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before verification +e[TCP].dport = 46 + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / AH - Transport - SHA2-512-256 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-512-256', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / AH - Transport - SHA2-512-256 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-512-256', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before verification +e[TCP].dport = 46 + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / AH - Transport - HMAC-MD5-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='HMAC-MD5-96', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / AH - Transport - HMAC-MD5-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='HMAC-MD5-96', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before verification +e[TCP].dport = 46 + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / AH - Transport - AES-CMAC-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='AES-CMAC-96', auth_key='sixteenbytes key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / AH - Transport - AES-CMAC-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='AES-CMAC-96', auth_key='sixteenbytes key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') +assert(e.chksum != p.chksum) +* the encrypted packet should have an AH layer +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before verification +e[TCP].dport = 46 + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +############################################################################### ++ IPv4 / AH - Tunnel + +####################################### += IPv4 / AH - Tunnel - HMAC-SHA1-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='HMAC-SHA1-96', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / AH - Tunnel - HMAC-SHA1-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='HMAC-SHA1-96', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before verification +e.dst = '4.4.4.4' + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / AH - Tunnel - SHA2-256-128 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-256-128', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet should be unaltered +assert(d == p) + +####################################### += IPv4 / AH - Tunnel - SHA2-256-128 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-256-128', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before verification +e.dst = '4.4.4.4' + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / AH - Tunnel - SHA2-384-192 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-384-192', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet should be unaltered +assert(d == p) + +####################################### += IPv4 / AH - Tunnel - SHA2-384-192 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-384-192', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before verification +e.dst = '4.4.4.4' + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / AH - Tunnel - SHA2-512-256 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-512-256', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet should be unaltered +assert(d == p) + +####################################### += IPv4 / AH - Tunnel - SHA2-512-256 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='SHA2-512-256', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before verification +e.dst = '4.4.4.4' + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / AH - Tunnel - HMAC-MD5-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='HMAC-MD5-96', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet should be unaltered +assert(d == p) + +####################################### += IPv4 / AH - Tunnel - HMAC-MD5-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='HMAC-MD5-96', auth_key='secret key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before verification +e.dst = '4.4.4.4' + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +####################################### += IPv4 / AH - Tunnel - AES-CMAC-96 + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='AES-CMAC-96', auth_key='sixteenbytes key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* alter mutable fields in the packet +e.ttl = 2 + +* integrity verification should pass +d = sa.decrypt(e) +d + +* after decryption the original packet should be unaltered +assert(d == p) + +####################################### += IPv4 / AH - Tunnel - AES-CMAC-96 - altered packet + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(AH, spi=0x222, + auth_algo='AES-CMAC-96', auth_key='sixteenbytes key', + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_AH) +assert(e.haslayer(AH)) +assert(e.haslayer(TCP)) +assert(e[AH].spi == sa.spi) + +* simulate the alteration of the packet before verification +e.dst = '4.4.4.4' + +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err + +############################################################################### ++ IPv6 / ESP + +####################################### += IPv6 / ESP - Transport - NULL - NULL +~ -crypto + +p = IPv6(src='11::22', dst='22::11') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IPv6(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='NULL', auth_key=None) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IPv6)) +assert(e.src == '11::22' and e.dst == '22::11') +assert(e.nh == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +assert('testdata' in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv6 / ESP - Transport - AES-CBC - NULL + +p = IPv6(src='11::22', dst='22::11') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IPv6(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + auth_algo='NULL', auth_key=None) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IPv6)) +assert(e.src == '11::22' and e.dst == '22::11') +assert(e.nh == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv4 / ESP - Tunnel - AES-GCM - NULL +~ combined_modes + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-GCM', crypt_key='16bytekey+4bytenonce', + auth_algo='NULL', auth_key=None, + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption original packet should be preserved +assert(d == p) + +####################################### += IPv4 / ESP - Tunnel - AES-CCM - NULL +~ combined_modes combined_modes_ccm + +p = IP(src='1.1.1.1', dst='2.2.2.2') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IP(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-CCM', crypt_key='16bytekey+4bytenonce', + auth_algo='NULL', auth_key=None, + tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + +e = sa.encrypt(p) +e + +assert(isinstance(e, IP)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') +assert(e.chksum != p.chksum) +assert(e.proto == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) + +d = sa.decrypt(e) +d + +* after decryption original packet should be preserved +assert(d == p) + +####################################### += IPv6 / ESP - Transport - NULL - HMAC-SHA1-96 + +p = IPv6(src='11::22', dst='22::11') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IPv6(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='HMAC-SHA1-96', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IPv6)) +assert(e.src == '11::22' and e.dst == '22::11') +assert(e.nh == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +assert('testdata' in e[ESP].data) + +* integrity verification should pass +d = sa.decrypt(e) + +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) + +####################################### += IPv6 / ESP - Transport - NULL - HMAC-SHA1-96 - altered packet + +p = IPv6(src='11::22', dst='22::11') +p /= TCP(sport=45012, dport=80) +p /= Raw('testdata') +p = IPv6(str(p)) +p + +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='NULL', crypt_key=None, + auth_algo='HMAC-SHA1-96', auth_key='secret key') + +e = sa.encrypt(p) +e + +assert(isinstance(e, IPv6)) +assert(e.src == '11::22' and e.dst == '22::11') +assert(e.nh == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +assert('testdata' in e[ESP].data) + +* simulate the alteration of the packet before decryption +e[ESP].data = e[ESP].data.replace('\x01', '\x21') -* after decryption original packet should be preserved -assert(d == p) +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err ####################################### -= IPv4 / ESP - Tunnel - NULL - SHA2-256-128 += IPv6 / ESP - Transport - AES-CBC - HMAC-SHA1-96 -p = IP(src='1.1.1.1', dst='2.2.2.2') +p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) p /= Raw('testdata') -p = IP(str(p)) +p = IPv6(str(p)) p sa = SecurityAssociation(ESP, spi=0x222, - crypt_algo='NULL', crypt_key=None, - auth_algo='SHA2-256-128', auth_key='secret key', - tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + auth_algo='HMAC-SHA1-96', auth_key='secret key') e = sa.encrypt(p) e -assert(isinstance(e, IP)) -* after encryption packet should be encapsulated with the given ip tunnel header -assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') -assert(e.chksum != p.chksum) -assert(e.proto == socket.IPPROTO_ESP) +assert(isinstance(e, IPv6)) +assert(e.src == '11::22' and e.dst == '22::11') +assert(e.nh == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) assert(e[ESP].spi == sa.spi) -* after encryption the original packet payload should be readable -assert('testdata' in e[ESP].data) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) -* integrity verification should pass d = sa.decrypt(e) +d -* after decryption the original packet should be preserved -assert(d == p) +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) ####################################### -= IPv4 / ESP - Tunnel - NULL - SHA2-256-128 - altered packet += IPv6 / ESP - Transport - AES-CBC - HMAC-SHA1-96 - altered packet -p = IP(src='1.1.1.1', dst='2.2.2.2') +p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) p /= Raw('testdata') -p = IP(str(p)) +p = IPv6(str(p)) p sa = SecurityAssociation(ESP, spi=0x222, - crypt_algo='NULL', crypt_key=None, - auth_algo='SHA2-256-128', auth_key='secret key', - tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) + crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + auth_algo='HMAC-SHA1-96', auth_key='secret key') e = sa.encrypt(p) e -assert(isinstance(e, IP)) -* after encryption packet should be encapsulated with the given ip tunnel header -assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') -assert(e.chksum != p.chksum) -assert(e.proto == socket.IPPROTO_ESP) +assert(isinstance(e, IPv6)) +assert(e.src == '11::22' and e.dst == '22::11') +assert(e.nh == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) assert(e[ESP].spi == sa.spi) -* after encryption the original packet payload should be readable -assert('testdata' in e[ESP].data) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) * simulate the alteration of the packet before decryption -e[ESP].data = e[ESP].data.replace('\x01', '\x21') +e[ESP].seq += 1 * integrity verification should fail try: @@ -281,11 +2689,8 @@ try: except IPSecIntegrityError, err: err -############################################################################### -+ IPv6 / ESP - ####################################### -= IPv6 / ESP - Transport - DES - NULL += IPv6 / ESP - Transport - AES-GCM - NULL p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) @@ -294,7 +2699,7 @@ p = IPv6(str(p)) p sa = SecurityAssociation(ESP, spi=0x222, - crypt_algo='DES', crypt_key='8bytekey', + crypt_algo='AES-GCM', crypt_key='16bytekey+4bytenonce', auth_algo='NULL', auth_key=None) e = sa.encrypt(p) @@ -302,7 +2707,6 @@ e assert(isinstance(e, IPv6)) assert(e.src == '11::22' and e.dst == '22::11') -* the encrypted packet should have an ESP layer assert(e.nh == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) @@ -313,11 +2717,11 @@ assert('testdata' not in e[ESP].data) d = sa.decrypt(e) d -* after decryption the original packet payload should be unaltered +* after decryption original packet should be preserved assert(d[TCP] == p[TCP]) ####################################### -= IPv6 / ESP - Transport - NULL - HMAC-MD5-96 += IPv6 / ESP - Transport - AES-GCM - NULL - altered packet p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) @@ -326,30 +2730,34 @@ p = IPv6(str(p)) p sa = SecurityAssociation(ESP, spi=0x222, - crypt_algo='NULL', crypt_key=None, - auth_algo='HMAC-MD5-96', auth_key='secret key') + crypt_algo='AES-GCM', crypt_key='16bytekey+4bytenonce', + auth_algo='NULL', auth_key=None) e = sa.encrypt(p) e assert(isinstance(e, IPv6)) assert(e.src == '11::22' and e.dst == '22::11') -* the encrypted packet should have an ESP layer assert(e.nh == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) assert(e[ESP].spi == sa.spi) -* after encryption the original packet payload should be readable -assert('testdata' in e[ESP].data) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) -* integrity verification should pass -d = sa.decrypt(e) +* simulate the alteration of the packet before decryption +e[ESP].seq += 1 -* after decryption the original packet payload should be unaltered -assert(d[TCP] == p[TCP]) +* integrity verification should fail +try: + d = sa.decrypt(e) + assert(False) +except IPSecIntegrityError, err: + err ####################################### -= IPv6 / ESP - Transport - NULL - HMAC-MD5-96 - altered packet += IPv6 / ESP - Tunnel - NULL - NULL +~ -crypto p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) @@ -359,33 +2767,29 @@ p sa = SecurityAssociation(ESP, spi=0x222, crypt_algo='NULL', crypt_key=None, - auth_algo='HMAC-MD5-96', auth_key='secret key') + auth_algo='NULL', auth_key=None, + tunnel_header=IPv6(src='aa::bb', dst='bb::aa')) e = sa.encrypt(p) e assert(isinstance(e, IPv6)) -assert(e.src == '11::22' and e.dst == '22::11') -* the encrypted packet should have an ESP layer +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == 'aa::bb' and e.dst == 'bb::aa') assert(e.nh == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) assert(e[ESP].spi == sa.spi) -* after encryption the original packet payload should be readable assert('testdata' in e[ESP].data) -* simulate the alteration of the packet before decryption -e[ESP].data = e[ESP].data.replace('\x01', '\x21') +d = sa.decrypt(e) +d -* integrity verification should fail -try: - d = sa.decrypt(e) - assert(False) -except IPSecIntegrityError, err: - err +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) ####################################### -= IPv6 / ESP - Tunnel - 3DES - NULL += IPv6 / ESP - Tunnel - AES-CBC - NULL p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) @@ -414,11 +2818,11 @@ assert('testdata' not in e[ESP].data) d = sa.decrypt(e) d -* after decryption original packet should be preserved -assert(d == p) +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) ####################################### -= IPv6 / ESP - Tunnel - NULL - SHA2-384-192 += IPv6 / ESP - Tunnel - NULL - HMAC-SHA1-96 p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) @@ -428,7 +2832,7 @@ p sa = SecurityAssociation(ESP, spi=0x222, crypt_algo='NULL', crypt_key=None, - auth_algo='SHA2-384-192', auth_key='secret key', + auth_algo='HMAC-SHA1-96', auth_key='secret key', tunnel_header=IPv6(src='aa::bb', dst='bb::aa')) e = sa.encrypt(p) @@ -441,17 +2845,16 @@ assert(e.nh == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) assert(e[ESP].spi == sa.spi) -* after encryption the original packet payload should be readable assert('testdata' in e[ESP].data) * integrity verification should pass d = sa.decrypt(e) -* after decryption the original packet should be preserved -assert(d == p) +* after decryption the original packet payload should be unaltered +assert(d[TCP] == p[TCP]) ####################################### -= IPv6 / ESP - Tunnel - NULL - SHA2-384-192 - altered packet += IPv6 / ESP - Tunnel - NULL - HMAC-SHA1-96 - altered packet p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) @@ -461,7 +2864,7 @@ p sa = SecurityAssociation(ESP, spi=0x222, crypt_algo='NULL', crypt_key=None, - auth_algo='SHA2-384-192', auth_key='secret key', + auth_algo='HMAC-SHA1-96', auth_key='secret key', tunnel_header=IPv6(src='aa::bb', dst='bb::aa')) e = sa.encrypt(p) @@ -474,7 +2877,6 @@ assert(e.nh == socket.IPPROTO_ESP) assert(e.haslayer(ESP)) assert(not e.haslayer(TCP)) assert(e[ESP].spi == sa.spi) -* after encryption the original packet payload should be readable assert('testdata' in e[ESP].data) * simulate the alteration of the packet before decryption @@ -487,37 +2889,33 @@ try: except IPSecIntegrityError, err: err -############################################################################### -+ IPv4 / AH - ####################################### -= IPv4 / AH - Transport - HMAC-SHA1-96 += IPv6 / ESP - Tunnel - AES-CBC - HMAC-SHA1-96 -p = IP(src='1.1.1.1', dst='2.2.2.2') +p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) p /= Raw('testdata') -p = IP(str(p)) +p = IPv6(str(p)) p -sa = SecurityAssociation(AH, spi=0x222, - auth_algo='HMAC-SHA1-96', auth_key='sixteenbytes key') +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + auth_algo='HMAC-SHA1-96', auth_key='secret key', + tunnel_header=IPv6(src='aa::bb', dst='bb::aa')) e = sa.encrypt(p) e -assert(isinstance(e, IP)) -assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') -assert(e.chksum != p.chksum) -* the encrypted packet should have an AH layer -assert(e.proto == socket.IPPROTO_AH) -assert(e.haslayer(AH)) -assert(e.haslayer(TCP)) -assert(e[AH].spi == sa.spi) - -* alter mutable fields in the packet -e.ttl = 2 +assert(isinstance(e, IPv6)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == 'aa::bb' and e.dst == 'bb::aa') +assert(e.nh == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) -* integrity verification should pass d = sa.decrypt(e) d @@ -525,31 +2923,34 @@ d assert(d[TCP] == p[TCP]) ####################################### -= IPv4 / AH - Transport - HMAC-SHA1-96 - altered packet += IPv6 / ESP - Tunnel - AES-CBC - HMAC-SHA1-96 - altered packet -p = IP(src='1.1.1.1', dst='2.2.2.2') +p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) p /= Raw('testdata') -p = IP(str(p)) +p = IPv6(str(p)) p -sa = SecurityAssociation(AH, spi=0x222, - auth_algo='HMAC-SHA1-96', auth_key='sixteenbytes key') +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-CBC', crypt_key='sixteenbytes key', + auth_algo='HMAC-SHA1-96', auth_key='secret key', + tunnel_header=IPv6(src='aa::bb', dst='bb::aa')) e = sa.encrypt(p) e -assert(isinstance(e, IP)) -assert(e.src == '1.1.1.1' and e.dst == '2.2.2.2') -assert(e.chksum != p.chksum) -* the encrypted packet should have an AH layer -assert(e.proto == socket.IPPROTO_AH) -assert(e.haslayer(AH)) -assert(e.haslayer(TCP)) -assert(e[AH].spi == sa.spi) +assert(isinstance(e, IPv6)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == 'aa::bb' and e.dst == 'bb::aa') +assert(e.nh == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) * simulate the alteration of the packet before decryption -e[TCP].sport = 5 +e[ESP].seq += 1 * integrity verification should fail try: @@ -559,65 +2960,67 @@ except IPSecIntegrityError, err: err ####################################### -= IPv4 / AH - Tunnel - SHA2-256-128 += IPv6 / ESP - Tunnel - AES-GCM - NULL -p = IP(src='1.1.1.1', dst='2.2.2.2') +p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) p /= Raw('testdata') -p = IP(str(p)) +p = IPv6(str(p)) p -sa = SecurityAssociation(AH, spi=0x222, - auth_algo='SHA2-256-128', auth_key='secret key', - tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-GCM', crypt_key='16bytekey+4bytenonce', + auth_algo='NULL', auth_key=None, + tunnel_header=IPv6(src='aa::bb', dst='bb::aa')) e = sa.encrypt(p) e -assert(isinstance(e, IP)) -assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') -assert(e.chksum != p.chksum) -assert(e.proto == socket.IPPROTO_AH) -assert(e.haslayer(AH)) -assert(e.haslayer(TCP)) -assert(e[AH].spi == sa.spi) - -* alter mutable fields in the packet -e.ttl = 2 +assert(isinstance(e, IPv6)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == 'aa::bb' and e.dst == 'bb::aa') +assert(e.nh == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) -* integrity verification should pass d = sa.decrypt(e) d -* after decryption the original packet should be unaltered -assert(d == p) +* after decryption original packet should be preserved +assert(d[TCP] == p[TCP]) ####################################### -= IPv4 / AH - Tunnel - HMAC-SHA1-96 - altered packet += IPv6 / ESP - Tunnel - AES-GCM - NULL - altered packet -p = IP(src='1.1.1.1', dst='2.2.2.2') +p = IPv6(src='11::22', dst='22::11') p /= TCP(sport=45012, dport=80) p /= Raw('testdata') -p = IP(str(p)) +p = IPv6(str(p)) p -sa = SecurityAssociation(AH, spi=0x222, - auth_algo='HMAC-SHA1-96', auth_key='secret key', - tunnel_header=IP(src='11.11.11.11', dst='22.22.22.22')) +sa = SecurityAssociation(ESP, spi=0x222, + crypt_algo='AES-GCM', crypt_key='16bytekey+4bytenonce', + auth_algo='NULL', auth_key=None, + tunnel_header=IPv6(src='aa::bb', dst='bb::aa')) e = sa.encrypt(p) e -assert(isinstance(e, IP)) -assert(e.src == '11.11.11.11' and e.dst == '22.22.22.22') -assert(e.chksum != p.chksum) -assert(e.proto == socket.IPPROTO_AH) -assert(e.haslayer(AH)) -assert(e.haslayer(TCP)) -assert(e[AH].spi == sa.spi) +assert(isinstance(e, IPv6)) +* after encryption packet should be encapsulated with the given ip tunnel header +assert(e.src == 'aa::bb' and e.dst == 'bb::aa') +assert(e.nh == socket.IPPROTO_ESP) +assert(e.haslayer(ESP)) +assert(not e.haslayer(TCP)) +assert(e[ESP].spi == sa.spi) +* after encryption the original packet payload should NOT be readable +assert('testdata' not in e[ESP].data) -* simulate the alteration of the packet before verification -e.dst = '4.4.4.4' +* simulate the alteration of the packet before decryption +e[ESP].seq += 1 * integrity verification should fail try: @@ -822,3 +3225,4 @@ e[IPv6ExtHdrRouting].segleft = 0 * integrity verification should pass d = sa.decrypt(e) d + diff --git a/test/regression.uts b/test/regression.uts index ccd9edbd73a83e8e9c5328ff6d151b4ec5c01606..41c640a052f88744958fa0db5aade328c5bf3700 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -580,10 +580,10 @@ assert(len(y[TFTP_Options].options) == 2 and y[TFTP_Option].oname == "blksize") = WEP tests -~ wifi wep Dot11 LLC SNAP IP TCP -conf.wepkey = "ABCDEFGH" +~ wifi crypto Dot11 LLC SNAP IP TCP +conf.wepkey = "Fobar" str(Dot11WEP()/LLC()/SNAP()/IP()/TCP(seq=12345678)) -assert(_ == '\x00\x00\x00\x00\x1e\xafK5G\x94\xd4m\x81\xdav\xd4,c\xf1\xfe{\xfc\xba\xd6;T\x93\xd0\t\xdb\xfc\xa5\xb9\x85\xce\x05b\x1cC\x10\xd7p\xde22&\xf0\xbcUS\x99\x83Z\\D\xa6') +assert(_ == '\x00\x00\x00\x00\xe3OjYLw\xc3x_%\xd0\xcf\xdeu-\xc3pH#\x1eK\xae\xf5\xde\xe7\xb8\x1d,\xa1\xfe\xe83\xca\xe1\xfe\xbd\xfe\xec\x00)T`\xde.\x93Td\x95C\x0f\x07\xdd') Dot11WEP(_) assert(TCP in _ and _[TCP].seq == 12345678)