From f9968d0a8512b0e355076a0ff31de2a79677b009 Mon Sep 17 00:00:00 2001 From: mtu <maxence.tury@ssi.gouv.fr> Date: Mon, 25 Jan 2016 12:59:01 +0100 Subject: [PATCH] Heavy modifications to x509.py : comprehensive support for many more fields, extensions and keys (both RSA and ECDSA) --- scapy/layers/x509.py | 1062 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 970 insertions(+), 92 deletions(-) diff --git a/scapy/layers/x509.py b/scapy/layers/x509.py index 18aaa5e3..31df1185 100644 --- a/scapy/layers/x509.py +++ b/scapy/layers/x509.py @@ -1,6 +1,7 @@ ## This file is part of Scapy ## See http://www.secdev.org/projects/scapy for more informations ## Copyright (C) Philippe Biondi <phil@secdev.org> +## Enhanced by Maxence Tury <maxence.tury@ssi.gouv.fr> ## This program is published under a GPLv2 license """ @@ -10,99 +11,976 @@ X.509 certificates. from scapy.asn1packet import * from scapy.asn1fields import * -########## -## X509 ## -########## - -######[ ASN1 class ]###### - -class ASN1_Class_X509(ASN1_Class_UNIVERSAL): - name="X509" - CONT0 = 0xa0 - CONT1 = 0xa1 - CONT2 = 0xa2 - CONT3 = 0xa3 - -class ASN1_X509_CONT0(ASN1_SEQUENCE): - tag = ASN1_Class_X509.CONT0 - -class ASN1_X509_CONT1(ASN1_SEQUENCE): - tag = ASN1_Class_X509.CONT1 - -class ASN1_X509_CONT2(ASN1_SEQUENCE): - tag = ASN1_Class_X509.CONT2 - -class ASN1_X509_CONT3(ASN1_SEQUENCE): - tag = ASN1_Class_X509.CONT3 - -######[ BER codecs ]####### - -class BERcodec_X509_CONT0(BERcodec_SEQUENCE): - tag = ASN1_Class_X509.CONT0 - -class BERcodec_X509_CONT1(BERcodec_SEQUENCE): - tag = ASN1_Class_X509.CONT1 - -class BERcodec_X509_CONT2(BERcodec_SEQUENCE): - tag = ASN1_Class_X509.CONT2 - -class BERcodec_X509_CONT3(BERcodec_SEQUENCE): - tag = ASN1_Class_X509.CONT3 - -######[ ASN1 fields ]###### - -class ASN1F_X509_CONT0(ASN1F_SEQUENCE): - ASN1_tag = ASN1_Class_X509.CONT0 - -class ASN1F_X509_CONT1(ASN1F_SEQUENCE): - ASN1_tag = ASN1_Class_X509.CONT1 - -class ASN1F_X509_CONT2(ASN1F_SEQUENCE): - ASN1_tag = ASN1_Class_X509.CONT2 - -class ASN1F_X509_CONT3(ASN1F_SEQUENCE): - ASN1_tag = ASN1_Class_X509.CONT3 - -######[ X509 packets ]###### - -class X509RDN(ASN1_Packet): - ASN1_codec = ASN1_Codecs.BER - ASN1_root = ASN1F_SET( - ASN1F_SEQUENCE( ASN1F_OID("oid","2.5.4.6"), - ASN1F_PRINTABLE_STRING("value","") - ) - ) - -class X509v3Ext(ASN1_Packet): - ASN1_codec = ASN1_Codecs.BER - ASN1_root = ASN1F_field("val",ASN1_NULL(0)) - - -class X509Cert(ASN1_Packet): - ASN1_codec = ASN1_Codecs.BER - ASN1_root = ASN1F_SEQUENCE( - ASN1F_SEQUENCE( - ASN1F_optionnal(ASN1F_X509_CONT0(ASN1F_INTEGER("version",3))), - ASN1F_INTEGER("sn",1), - ASN1F_SEQUENCE(ASN1F_OID("sign_algo","1.2.840.113549.1.1.5"), - ASN1F_field("sa_value",ASN1_NULL(0))), - ASN1F_SEQUENCE_OF("issuer",[],X509RDN), - ASN1F_SEQUENCE(ASN1F_UTC_TIME("not_before",ZuluTime(-600)), # ten minutes ago - ASN1F_UTC_TIME("not_after",ZuluTime(+86400))), # for 24h - ASN1F_SEQUENCE_OF("subject",[],X509RDN), - ASN1F_SEQUENCE( - ASN1F_SEQUENCE(ASN1F_OID("pubkey_algo","1.2.840.113549.1.1.1"), - ASN1F_field("pk_value",ASN1_NULL(0))), - ASN1F_BIT_STRING("pubkey","") - ), - ASN1F_optionnal(ASN1F_X509_CONT3(ASN1F_SEQUENCE_OF("x509v3ext",[],X509v3Ext))), - - ), - ASN1F_SEQUENCE(ASN1F_OID("sign_algo2","1.2.840.113549.1.1.5"), - ASN1F_field("sa2_value",ASN1_NULL(0))), - ASN1F_BIT_STRING("signature","") - ) +class ASN1P_OID(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_OID("oid", "0") + +class ASN1P_INTEGER(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_INTEGER("number", 0) + + +####################### +##### RSA packets ##### +####################### +##### based on RFC 3447 + +# It could be interesting to use os.urandom and try to generate +# a new modulus each time RSAPublicKey is called with default values. +# (We might have to dig into scapy field initialization mechanisms...) +# NEVER rely on the key below, which is provided only for debugging purposes. +class RSAPublicKey(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_INTEGER("modulus", 10), + ASN1F_INTEGER("publicExponent", 3)) + +class RSAOtherPrimeInfo(ASN1_Packet): + ASN1_codec = ASN1_Codecs.DER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_INTEGER("prime", 0), + ASN1F_INTEGER("exponent", 0), + ASN1F_INTEGER("coefficient", 0)) + +class RSAPrivateKey(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_enum_INTEGER("version", 0, ["two-prime", "multi"]), + ASN1F_INTEGER("modulus", 10), + ASN1F_INTEGER("publicExponent", 3), + ASN1F_INTEGER("privateExponent", 3), + ASN1F_INTEGER("prime1", 2), + ASN1F_INTEGER("prime2", 5), + ASN1F_INTEGER("exponent1", 0), + ASN1F_INTEGER("exponent2", 3), + ASN1F_INTEGER("coefficient", 1), + ASN1F_optional( + ASN1F_SEQUENCE_OF("otherPrimeInfos", None, + RSAOtherPrimeInfo))) + +#################################### +########## ECDSA packets ########### +#################################### +#### based on RFC 3279 & 5480 & 5915 + +class ECFieldID(ASN1_Packet): +# No characteristic-two-field support for now. + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_OID("fieldType", "prime-field"), + ASN1F_INTEGER("prime", 0)) + +class ECCurve(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_STRING("a", ""), + ASN1F_STRING("b", ""), + ASN1F_optional( + ASN1F_BIT_STRING("seed", None))) + +class ECSpecifiedDomain(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_enum_INTEGER("version", 1, {1: "ecpVer1"}), + ASN1F_PACKET("fieldID", ECFieldID(), ECFieldID), + ASN1F_PACKET("curve", ECCurve(), ECCurve), + ASN1F_STRING("base", ""), + ASN1F_INTEGER("order", 0), + ASN1F_optional( + ASN1F_INTEGER("cofactor", None))) + +class ECParameters(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_CHOICE("curve", ASN1_OID("ansip384r1"), + ASN1F_OID, # for named curves + ASN1F_NULL, # for implicit curves + ASN1F_PACKET("curve", None, ECSpecifiedDomain)) + +class ECDSAPublicKey(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_BIT_STRING("ecPoint", "") + +class ECDSAPrivateKey(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_enum_INTEGER("version", 1, {1: "ecPrivkeyVer1"}), + ASN1F_STRING("privateKey", ""), + ASN1F_optional( + ASN1F_PACKET("parameters", None, ECParameters, + explicit_tag=0xa0)), + ASN1F_optional( + ASN1F_PACKET("publicKey", None, + ECDSAPublicKey, + explicit_tag=0xa1))) + +class ECDSASignature(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_INTEGER("r", 0), + ASN1F_INTEGER("s", 0)) + + +###################### +#### X509 packets #### +###################### +#### based on RFC 5280 + + +####### Names ####### + +class ASN1F_X509_DirectoryString(ASN1F_CHOICE): +# we include ASN1 bit strings for rare instances of x500 addresses + def __init__(self, name, default, **kwargs): + ASN1F_CHOICE.__init__(self, name, default, + ASN1F_PRINTABLE_STRING, ASN1F_UTF8_STRING, + ASN1F_IA5_STRING, ASN1F_T61_STRING, + ASN1F_UNIVERSAL_STRING, ASN1F_BIT_STRING, + **kwargs) + +class X509_AttributeValue(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_CHOICE("value", ASN1_PRINTABLE_STRING("FR"), + ASN1F_PRINTABLE_STRING, ASN1F_UTF8_STRING, + ASN1F_IA5_STRING, ASN1F_T61_STRING, + ASN1F_UNIVERSAL_STRING) + +class X509_Attribute(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_OID("type", "2.5.4.6"), + ASN1F_SET_OF("values", + [X509_AttributeValue()], + X509_AttributeValue)) +class X509_AttributeTypeAndValue(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_OID("type", "2.5.4.6"), + ASN1F_X509_DirectoryString("value", + ASN1_PRINTABLE_STRING("FR"))) + +class X509_RDN(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SET_OF("rdn", [X509_AttributeTypeAndValue()], + X509_AttributeTypeAndValue) +class X509_OtherName(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_OID("type-id", "0"), + ASN1F_CHOICE("value", None, + ASN1F_IA5_STRING, ASN1F_ISO646_STRING, + ASN1F_BMP_STRING, ASN1F_UTF8_STRING, + explicit_tag=0xa0)) + +class X509_RFC822Name(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_IA5_STRING("rfc822Name", "") + +class X509_DNSName(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_IA5_STRING("dNSName", "") + +#XXX write me +class X509_X400Address(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_field("x400Address", "") + +default_directoryName = [ + X509_RDN(), + X509_RDN( + rdn=[X509_AttributeTypeAndValue( + type="2.5.4.10", + value=ASN1_PRINTABLE_STRING("Scapy, Inc."))]), + X509_RDN( + rdn=[X509_AttributeTypeAndValue( + type="2.5.4.3", + value=ASN1_PRINTABLE_STRING("Scapy Default Name"))]) + ] + +class X509_DirectoryName(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("directoryName", default_directoryName, + X509_RDN) + +class X509_EDIPartyName(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_optional( + ASN1F_X509_DirectoryString("nameAssigner", None, + explicit_tag=0xa0)), + ASN1F_X509_DirectoryString("partyName", None, + explicit_tag=0xa1)) + +class X509_URI(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_IA5_STRING("uniformResourceIdentifier", "") + +class X509_IPAddress(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_STRING("iPAddress", "") + +class X509_RegisteredID(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_OID("registeredID", "") + +class X509_GeneralName(ASN1_Packet): + dirName = X509_DirectoryName() + dirName.overload_fields[hash(X509_DirectoryName)] = {"exp": 0xa4, + "imp": None} + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_CHOICE("generalName", dirName, + ASN1F_PACKET("otherName", None, X509_OtherName, + implicit_tag=0xa0), + ASN1F_PACKET("rfc822Name", None, X509_RFC822Name, + implicit_tag=0x81), + ASN1F_PACKET("dNSName", None, X509_DNSName, + implicit_tag=0x82), + ASN1F_PACKET("x400Address", None, X509_X400Address, + explicit_tag=0xa3), + ASN1F_PACKET("directoryName", None, X509_DirectoryName, + explicit_tag=0xa4), + ASN1F_PACKET("ediPartyName", None, X509_EDIPartyName, + explicit_tag=0xa5), + ASN1F_PACKET("uniformResourceIdentifier", None, X509_URI, + implicit_tag=0x86), + ASN1F_PACKET("ipAddress", None, X509_IPAddress, + implicit_tag=0x87), + ASN1F_PACKET("registeredID", None, X509_RegisteredID, + implicit_tag=0x88)) + + +####### Extensions ####### + +class X509_ExtAuthorityKeyIdentifier(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_optional( + ASN1F_STRING("keyIdentifier", "\xff"*20, + implicit_tag=0x80)), + ASN1F_optional( + ASN1F_SEQUENCE_OF("authorityCertIssuer", None, + X509_GeneralName, + implicit_tag=0xa1)), + ASN1F_optional( + ASN1F_INTEGER("authorityCertSerialNumber", None, + implicit_tag=0x82))) + +class X509_ExtSubjectDirectoryAttributes(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("subjectDirectoryAttributes", + [X509_Attribute()], + X509_Attribute) + +class X509_ExtSubjectKeyIdentifier(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_STRING("keyIdentifier", "xff"*20) + +class X509_ExtFullName(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("fullName", [X509_GeneralName()], + X509_GeneralName, implicit_tag=0xa0) + +class X509_ExtNameRelativeToCRLIssuer(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_PACKET("nameRelativeToCRLIssuer", X509_RDN(), X509_RDN, + implicit_tag=0xa1) + +class X509_ExtDistributionPointName(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_CHOICE("distributionPointName", None, + X509_ExtFullName, X509_ExtNameRelativeToCRLIssuer) + +reasons_mapping = ["unused", + "keyCompromise", + "cACompromise", + "affiliationChanged", + "superseded", + "cessationOfOperation", + "certificateHold", + "privilegeWithdrawn", + "aACompromise"] + +class X509_ExtDistributionPoint(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_optional( + ASN1F_PACKET("distributionPoint", + X509_ExtDistributionPointName(), + X509_ExtDistributionPointName, + explicit_tag=0xa0)), + ASN1F_optional( + ASN1F_FLAGS("reasons", None, reasons_mapping, + implicit_tag=0x81)), + ASN1F_optional( + ASN1F_SEQUENCE_OF("cRLIssuer", None, + X509_GeneralName, + implicit_tag=0xa2))) + +ku_mapping = ["digitalSignature", + "nonRepudiation", + "keyEncipherment", + "dataEncipherment", + "keyAgreement", + "keyCertSign", + "cRLSign", + "encipherOnly", + "decipherOnly"] + +class X509_ExtKeyUsage(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_FLAGS("keyUsage", "101", ku_mapping) + def get_keyUsage(self): + return self.ASN1_root.get_flags(self) + +class X509_ExtPrivateKeyUsagePeriod(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_optional( + ASN1F_GENERALIZED_TIME("notBefore", + str(GeneralizedTime(-600)), + implicit_tag=0x80)), + ASN1F_optional( + ASN1F_GENERALIZED_TIME("notAfter", + str(GeneralizedTime(+86400)), + implicit_tag=0x81))) + +class X509_PolicyMapping(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_OID("issuerDomainPolicy", None), + ASN1F_OID("subjectDomainPolicy", None)) + +class X509_ExtPolicyMappings(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("policyMappings", [], X509_PolicyMapping) + +class X509_ExtBasicConstraints(ASN1_Packet): +# The cA field should not be optional, but some certs omit it for False. + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_optional( + ASN1F_BOOLEAN("cA", False)), + ASN1F_optional( + ASN1F_INTEGER("pathLenConstraint", None))) + +class X509_ExtCRLNumber(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_INTEGER("cRLNumber", 0) +cRL_reasons = ["unspecified", + "keyCompromise", + "cACompromise", + "affiliationChanged", + "superseded", + "cessationOfOperation", + "certificateHold", + "unused_reasonCode", + "removeFromCRL", + "privilegeWithdrawn", + "aACompromise"] + +class X509_ExtReasonCode(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_ENUMERATED("cRLReason", 0, cRL_reasons) + +class X509_ExtDeltaCRLIndicator(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_INTEGER("deltaCRLIndicator", 0) + +class X509_ExtIssuingDistributionPoint(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_optional( + ASN1F_PACKET("distributionPoint", + X509_ExtDistributionPointName(), + X509_ExtDistributionPointName, + explicit_tag=0xa0)), + ASN1F_BOOLEAN("onlyContainsUserCerts", False, + implicit_tag=0x81), + ASN1F_BOOLEAN("onlyContainsCACerts", False, + implicit_tag=0x82), + ASN1F_optional( + ASN1F_FLAGS("onlySomeReasons", None, + reasons_mapping, + implicit_tag=0x83)), + ASN1F_BOOLEAN("indirectCRL", False, + implicit_tag=0x84), + ASN1F_BOOLEAN("onlyContainsAttributeCerts", False, + implicit_tag=0x85)) + +class X509_ExtCertificateIssuer(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("certificateIssuer", [], X509_GeneralName) + +class X509_ExtInvalidityDate(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_GENERALIZED_TIME("invalidityDate", str(ZuluTime(+86400))) + +class X509_ExtSubjectAltName(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("subjectAltName", [], X509_GeneralName) + +class X509_ExtIssuerAltName(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("issuerAltName", [], X509_GeneralName) + +class X509_ExtGeneralSubtree(ASN1_Packet): +# 'minimum' is not optional in RFC 5280, yet it is in some implementations. + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_PACKET("base", X509_GeneralName(), X509_GeneralName), + ASN1F_optional( + ASN1F_INTEGER("minimum", None, implicit_tag=0x80)), + ASN1F_optional( + ASN1F_INTEGER("maximum", None, implicit_tag=0x81))) + +class X509_ExtNameConstraints(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_optional( + ASN1F_SEQUENCE_OF("permittedSubtrees", None, + X509_ExtGeneralSubtree, + implicit_tag=0xa0)), + ASN1F_optional( + ASN1F_SEQUENCE_OF("excludedSubtrees", None, + X509_ExtGeneralSubtree, + implicit_tag=0xa1))) + +class X509_ExtPolicyConstraints(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_optional( + ASN1F_INTEGER("requireExplicitPolicy", None, + implicit_tag=0x80)), + ASN1F_optional( + ASN1F_INTEGER("inhibitPolicyMapping", None, + implicit_tag=0x81))) + +class X509_ExtExtendedKeyUsage(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("extendedKeyUsage", [], ASN1P_OID) + def get_extendedKeyUsage(self): + eku_array = self.extendedKeyUsage + return [eku.oid.oidname for eku in eku_array] + +class X509_ExtNoticeReference(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_CHOICE("organization", + ASN1_UTF8_STRING("Dummy Organization"), + ASN1F_IA5_STRING, ASN1F_ISO646_STRING, + ASN1F_BMP_STRING, ASN1F_UTF8_STRING), + ASN1F_SEQUENCE_OF("noticeNumbers", [], ASN1P_INTEGER)) + +class X509_ExtUserNotice(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_optional( + ASN1F_PACKET("noticeRef", None, + X509_ExtNoticeReference)), + ASN1F_optional( + ASN1F_CHOICE("explicitText", + ASN1_UTF8_STRING("Dummy ExplicitText"), + ASN1F_IA5_STRING, ASN1F_ISO646_STRING, + ASN1F_BMP_STRING, ASN1F_UTF8_STRING))) + +class X509_ExtPolicyQualifierInfo(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_OID("policyQualifierId", "1.3.6.1.5.5.7.2.1"), + ASN1F_CHOICE("qualifier", ASN1_IA5_STRING("cps_str"), + ASN1F_IA5_STRING, X509_ExtUserNotice)) + +class X509_ExtPolicyInformation(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_OID("policyIdentifier", "2.5.29.32.0"), + ASN1F_optional( + ASN1F_SEQUENCE_OF("policyQualifiers", None, + X509_ExtPolicyQualifierInfo))) + +class X509_ExtCertificatePolicies(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("certificatePolicies", + [X509_ExtPolicyInformation()], + X509_ExtPolicyInformation) + +class X509_ExtCRLDistributionPoints(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("cRLDistributionPoints", + [X509_ExtDistributionPoint()], + X509_ExtDistributionPoint) + +class X509_ExtInhibitAnyPolicy(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_INTEGER("skipCerts", 0) + +class X509_ExtFreshestCRL(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("cRLDistributionPoints", + [X509_ExtDistributionPoint()], + X509_ExtDistributionPoint) + +class X509_AccessDescription(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_OID("accessMethod", "0"), + ASN1F_PACKET("accessLocation", X509_GeneralName(), + X509_GeneralName)) + +class X509_ExtAuthInfoAccess(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("authorityInfoAccess", + [X509_AccessDescription()], + X509_AccessDescription) + +class X509_ExtQcStatement(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_OID("statementId", "0.4.0.1862.1.1"), + ASN1F_optional( + ASN1F_field("statementInfo", None))) + +class X509_ExtQcStatements(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("qcStatements", + [X509_ExtQcStatement()], + X509_ExtQcStatement) + +class X509_ExtSubjInfoAccess(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE_OF("subjectInfoAccess", + [X509_AccessDescription()], + X509_AccessDescription) + +class X509_ExtNetscapeCertType(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_BIT_STRING("netscapeCertType", "") + +class X509_ExtComment(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_CHOICE("comment", + ASN1_UTF8_STRING("Dummy comment."), + ASN1F_IA5_STRING, ASN1F_ISO646_STRING, + ASN1F_BMP_STRING, ASN1F_UTF8_STRING) + +class X509_ExtDefault(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_field("value", None) + +# oid-info.com shows that some extensions share multiple OIDs. +# Here we only reproduce those written in RFC5280. +ext_mapping = { + "2.5.29.9" : X509_ExtSubjectDirectoryAttributes, + "2.5.29.14" : X509_ExtSubjectKeyIdentifier, + "2.5.29.15" : X509_ExtKeyUsage, + "2.5.29.16" : X509_ExtPrivateKeyUsagePeriod, + "2.5.29.17" : X509_ExtSubjectAltName, + "2.5.29.18" : X509_ExtIssuerAltName, + "2.5.29.19" : X509_ExtBasicConstraints, + "2.5.29.20" : X509_ExtCRLNumber, + "2.5.29.21" : X509_ExtReasonCode, + "2.5.29.24" : X509_ExtInvalidityDate, + "2.5.29.27" : X509_ExtDeltaCRLIndicator, + "2.5.29.28" : X509_ExtIssuingDistributionPoint, + "2.5.29.29" : X509_ExtCertificateIssuer, + "2.5.29.30" : X509_ExtNameConstraints, + "2.5.29.31" : X509_ExtCRLDistributionPoints, + "2.5.29.32" : X509_ExtCertificatePolicies, + "2.5.29.33" : X509_ExtPolicyMappings, + "2.5.29.35" : X509_ExtAuthorityKeyIdentifier, + "2.5.29.36" : X509_ExtPolicyConstraints, + "2.5.29.37" : X509_ExtExtendedKeyUsage, + "2.5.29.46" : X509_ExtFreshestCRL, + "2.5.29.54" : X509_ExtInhibitAnyPolicy, + "2.16.840.1.113730.1.1" : X509_ExtNetscapeCertType, + "2.16.840.1.113730.1.13" : X509_ExtComment, + "1.3.6.1.5.5.7.1.1" : X509_ExtAuthInfoAccess, + "1.3.6.1.5.5.7.1.3" : X509_ExtQcStatements, + "1.3.6.1.5.5.7.1.11" : X509_ExtSubjInfoAccess + } + +class ASN1F_EXT_SEQUENCE(ASN1F_SEQUENCE): +# We use explicit_tag=0x04 with extnValue as STRING encapsulation. + def __init__(self, **kargs): + seq = [ASN1F_OID("extnID", "2.5.29.19"), + ASN1F_optional( + ASN1F_BOOLEAN("critical", False)), + ASN1F_PACKET("extnValue", + X509_ExtBasicConstraints(), + X509_ExtBasicConstraints, + explicit_tag=0x04)] + ASN1F_SEQUENCE.__init__(self, *seq, **kargs) + def dissect(self, pkt, s): + s = BER_tagging_dec(s, implicit_tag=self.implicit_tag, + explicit_tag=self.explicit_tag, + safe=self.flexible_tag) + codec = self.ASN1_tag.get_codec(pkt.ASN1_codec) + i,s,remain = codec.check_type_check_len(s) + extnID = self.seq[0] + critical = self.seq[1] + try: + oid,s = extnID.m2i(pkt, s) + extnID.set_val(pkt, oid) + s = critical.dissect(pkt, s) + encapsed = X509_ExtDefault + if oid.val in ext_mapping.keys(): + encapsed = ext_mapping[oid.val] + self.seq[2].cls = encapsed + self.seq[2].cls.ASN1_root.flexible_tag = True + # there are too many private extensions not to be flexible here + self.seq[2].default = encapsed() + s = self.seq[2].dissect(pkt, s) + if not self.flexible_tag and len(s) > 0: + err_msg = "extension sequence length issue" + raise BER_Decoding_Error(err_msg, remaining=s) + except ASN1F_badsequence,e: + raise Exception("could not parse extensions") + return remain + +class X509_Extension(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_EXT_SEQUENCE() + + +####### Public key wrapper ####### + +class X509_AlgorithmIdentifier(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_OID("algorithm", "1.2.840.113549.1.1.11"), + ASN1F_optional( + ASN1F_CHOICE("parameters", ASN1_NULL(0), + ASN1F_NULL, ECParameters))) + +class ASN1F_X509_SubjectPublicKeyInfoRSA(ASN1F_SEQUENCE): + def __init__(self, **kargs): + seq = [ASN1F_PACKET("signatureAlgorithm", + X509_AlgorithmIdentifier(), + X509_AlgorithmIdentifier), + ASN1F_BIT_STRING_ENCAPS("subjectPublicKey", + RSAPublicKey(), + RSAPublicKey)] + ASN1F_SEQUENCE.__init__(self, *seq, **kargs) + +class ASN1F_X509_SubjectPublicKeyInfo(ASN1F_SEQUENCE): + def __init__(self, **kargs): + seq = [ASN1F_PACKET("signatureAlgorithm", + X509_AlgorithmIdentifier(), + X509_AlgorithmIdentifier), + ASN1F_BIT_STRING("subjectPublicKey", None)] + ASN1F_SEQUENCE.__init__(self, *seq, **kargs) + def m2i(self, pkt, x): + c,s = ASN1F_SEQUENCE.m2i(self, pkt, x) + keytype = pkt.fields["signatureAlgorithm"].algorithm.oidname + if "rsa" in keytype.lower(): + return ASN1F_X509_SubjectPublicKeyInfoRSA().m2i(pkt, x) + elif keytype == "ecPublicKey": + return c,s + else: + raise Exception("could not parse subjectPublicKeyInfo") + def dissect(self, pkt, s): + c,x = self.m2i(pkt, s) + return x + def build(self, pkt): + if "signatureAlgorithm" in pkt.fields: + ktype = pkt.fields['signatureAlgorithm'].algorithm.oidname + else: + ktype = pkt.default_fields["signatureAlgorithm"].algorithm.oidname + if "rsa" in ktype.lower(): + pkt.default_fields["subjectPublicKey"] = RSAPublicKey() + return ASN1F_X509_SubjectPublicKeyInfoRSA().build(pkt) + elif ktype == "ecPublicKey": + return ASN1F_SEQUENCE.build(self, pkt) + else: + raise Exception("could not build subjectPublicKeyInfo") + +class X509_SubjectPublicKeyInfo(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_X509_SubjectPublicKeyInfo() + + +####### TBSCertificate & Certificate ####### + +default_issuer = [ + X509_RDN(), + X509_RDN( + rdn=[X509_AttributeTypeAndValue( + type="2.5.4.10", + value=ASN1_PRINTABLE_STRING("Scapy, Inc."))]), + X509_RDN( + rdn=[X509_AttributeTypeAndValue( + type="2.5.4.3", + value=ASN1_PRINTABLE_STRING("Scapy Default Issuer"))]) + ] + +default_subject = [ + X509_RDN(), + X509_RDN( + rdn=[X509_AttributeTypeAndValue( + type="2.5.4.10", + value=ASN1_PRINTABLE_STRING("Scapy, Inc."))]), + X509_RDN( + rdn=[X509_AttributeTypeAndValue( + type="2.5.4.3", + value=ASN1_PRINTABLE_STRING("Scapy Default Subject"))]) + ] + +class X509_Validity(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE( + ASN1F_CHOICE("not_before", + ASN1_UTC_TIME(str(ZuluTime(-600))), + ASN1F_UTC_TIME, ASN1F_GENERALIZED_TIME), + ASN1F_CHOICE("not_after", + ASN1_UTC_TIME(str(ZuluTime(+86400))), + ASN1F_UTC_TIME, ASN1F_GENERALIZED_TIME)) + +attrName_mapping = [ + ("countryName" , "C"), + ("stateOrProvinceName" , "ST"), + ("localityName" , "L"), + ("organizationName" , "O"), + ("organizationUnitName" , "OU"), + ("commonName" , "CN") + ] +attrName_specials = [name for name, symbol in attrName_mapping] + +class X509_TBSCertificate(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1F_extensions = ASN1F_optional( + ASN1F_SEQUENCE_OF("extensions", + [X509_Extension()], + X509_Extension, + explicit_tag=0xa3)) + ASN1F_extensions.islist = 1 + ASN1_root = ASN1F_SEQUENCE( + ASN1F_optional( + ASN1F_enum_INTEGER("version", 0x2, ["v1", "v2", "v3"], + explicit_tag=0xa0)), + ASN1F_INTEGER("serialNumber", 1), + ASN1F_PACKET("signature", + X509_AlgorithmIdentifier(), + X509_AlgorithmIdentifier), + ASN1F_SEQUENCE_OF("issuer", default_issuer, X509_RDN), + ASN1F_PACKET("validity", + X509_Validity(), + X509_Validity), + ASN1F_SEQUENCE_OF("subject", default_subject, X509_RDN), + ASN1F_PACKET("subjectPublicKeyInfo", + X509_SubjectPublicKeyInfo(), + X509_SubjectPublicKeyInfo), + ASN1F_optional( + ASN1F_BIT_STRING("issuerUniqueID", None, + implicit_tag=0x81)), + ASN1F_optional( + ASN1F_BIT_STRING("subjectUniqueID", None, + implicit_tag=0x82)), + ASN1F_extensions) + + def get_issuer(self): + attrs = self.issuer + attrsDict = {} + for attr in attrs: + # we assume there is only one name in each rdn ASN1_SET + attrsDict[attr.rdn[0].type.oidname] = attr.rdn[0].value.val + return attrsDict + def get_issuer_str(self): + """ + Returns a one-line string containing every type/value + in a rather specific order. sorted() built-in ensures unicity. + """ + name_str = "" + attrsDict = self.get_issuer() + for attrType, attrSymbol in attrName_mapping: + if attrType in attrsDict.keys(): + name_str += "/" + attrSymbol + "=" + name_str += attrsDict[attrType] + for attrType in sorted(attrsDict.keys()): + if attrType not in attrName_specials: + name_str += "/" + attrType + "=" + name_str += attrsDict[attrType] + return name_str + def get_subject(self): + attrs = self.subject + attrsDict = {} + for attr in attrs: + # we assume there is only one name in each rdn ASN1_SET + attrsDict[attr.rdn[0].type.oidname] = attr.rdn[0].value.val + return attrsDict + def get_subject_str(self): + name_str = "" + attrsDict = self.get_subject() + for attrType, attrSymbol in attrName_mapping: + if attrType in attrsDict.keys(): + name_str += "/" + attrSymbol + "=" + name_str += attrsDict[attrType] + for attrType in sorted(attrsDict.keys()): + if attrType not in attrName_specials: + name_str += "/" + attrType + "=" + name_str += attrsDict[attrType] + return name_str + +class ASN1F_X509_CertECDSA(ASN1F_SEQUENCE): + def __init__(self, **kargs): + seq = [ASN1F_PACKET("tbsCertificate", + X509_TBSCertificate(), + X509_TBSCertificate), + ASN1F_PACKET("signatureAlgorithm", + X509_AlgorithmIdentifier(), + X509_AlgorithmIdentifier), + ASN1F_BIT_STRING_ENCAPS("signatureValue", + ECDSASignature(), + ECDSASignature)] + ASN1F_SEQUENCE.__init__(self, *seq, **kargs) + +class ASN1F_X509_Cert(ASN1F_SEQUENCE): + def __init__(self, **kargs): + seq = [ASN1F_PACKET("tbsCertificate", + X509_TBSCertificate(), + X509_TBSCertificate), + ASN1F_PACKET("signatureAlgorithm", + X509_AlgorithmIdentifier(), + X509_AlgorithmIdentifier), + ASN1F_BIT_STRING("signatureValue", + "defaultsignature"*2)] + ASN1F_SEQUENCE.__init__(self, *seq, **kargs) + def m2i(self, pkt, x): + c,s = ASN1F_SEQUENCE.m2i(self, pkt, x) + sigtype = pkt.fields["signatureAlgorithm"].algorithm.oidname + if "rsa" in sigtype.lower(): + return c,s + elif "ecdsa" in sigtype.lower(): + return ASN1F_X509_CertECDSA().m2i(pkt, x) + else: + raise Exception("could not parse certificate") + def dissect(self, pkt, s): + c,x = self.m2i(pkt, s) + return x + def build(self, pkt): + if "signatureAlgorithm" in pkt.fields: + sigtype = pkt.fields['signatureAlgorithm'].algorithm.oidname + else: + sigtype = pkt.default_fields["signatureAlgorithm"].algorithm.oidname + if "rsa" in sigtype.lower(): + return ASN1F_SEQUENCE.build(self, pkt) + elif "ecdsa" in sigtype.lower(): + pkt.default_fields["signatureValue"] = ECDSASignature() + return ASN1F_X509_CertECDSA().build(pkt) + else: + raise Exception("could not build certificate") + +class X509_Cert(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_X509_Cert() + +####### TBSCertList & CRL ####### + +class X509_RevokedCertificate(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_SEQUENCE(ASN1F_INTEGER("serialNumber", 1), + ASN1F_UTC_TIME("revocationDate", + str(ZuluTime(+86400))), + ASN1F_optional( + ASN1F_SEQUENCE_OF("crlEntryExtensions", + None, X509_Extension))) + +class X509_TBSCertList(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1F_crlExtensions = ASN1F_optional( + ASN1F_SEQUENCE_OF("crlExtensions", None, + X509_Extension, + explicit_tag=0xa0)) + ASN1F_crlExtensions.islist = 1 + ASN1_root = ASN1F_SEQUENCE( + ASN1F_optional( + ASN1F_enum_INTEGER("version", 1, ["v1", "v2"])), + ASN1F_PACKET("signature", + X509_AlgorithmIdentifier(), + X509_AlgorithmIdentifier), + ASN1F_SEQUENCE_OF("issuer", default_issuer, X509_RDN), + ASN1F_UTC_TIME("this_update", str(ZuluTime(-1))), + ASN1F_optional( + ASN1F_UTC_TIME("next_update", None)), + ASN1F_optional( + ASN1F_SEQUENCE_OF("revokedCertificates", None, + X509_RevokedCertificate)), + ASN1F_crlExtensions) + def get_issuer(self): + attrs = self.issuer + attrsDict = {} + for attr in attrs: + # we assume there is only one name in each rdn ASN1_SET + attrsDict[attr.rdn[0].type.oidname] = attr.rdn[0].value.val + return attrsDict + def get_issuer_str(self): + """ + Returns a one-line string containing every type/value + in a rather specific order. sorted() built-in ensures unicity. + """ + name_str = "" + attrsDict = self.get_issuer() + for attrType, attrSymbol in attrName_mapping: + if attrType in attrsDict.keys(): + name_str += "/" + attrSymbol + "=" + name_str += attrsDict[attrType] + for attrType in sorted(attrsDict.keys()): + if attrType not in attrName_specials: + name_str += "/" + attrType + "=" + name_str += attrsDict[attrType] + return name_str + +class ASN1F_X509_CRLECDSA(ASN1F_SEQUENCE): + def __init__(self, **kargs): + seq = [ASN1F_PACKET("tbsCertList", + X509_TBSCertList(), + X509_TBSCertList), + ASN1F_PACKET("signatureAlgorithm", + X509_AlgorithmIdentifier(), + X509_AlgorithmIdentifier), + ASN1F_BIT_STRING_ENCAPS("signatureValue", + ECDSASignature(), + ECDSASignature)] + ASN1F_SEQUENCE.__init__(self, *seq, **kargs) + +class ASN1F_X509_CRL(ASN1F_SEQUENCE): + def __init__(self, **kargs): + seq = [ASN1F_PACKET("tbsCertList", + X509_TBSCertList(), + X509_TBSCertList), + ASN1F_PACKET("signatureAlgorithm", + X509_AlgorithmIdentifier(), + X509_AlgorithmIdentifier), + ASN1F_BIT_STRING("signatureValue", + "defaultsignature"*2)] + ASN1F_SEQUENCE.__init__(self, *seq, **kargs) + def m2i(self, pkt, x): + c,s = ASN1F_SEQUENCE.m2i(self, pkt, x) + sigtype = pkt.fields["signatureAlgorithm"].algorithm.oidname + if "rsa" in sigtype.lower(): + return c,s + elif "ecdsa" in sigtype.lower(): + return ASN1F_X509_CRLECDSA().m2i(pkt, x) + else: + raise Exception("could not parse certificate") + def dissect(self, pkt, s): + c,x = self.m2i(pkt, s) + return x + def build(self, pkt): + if "signatureAlgorithm" in pkt.fields: + sigtype = pkt.fields['signatureAlgorithm'].algorithm.oidname + else: + sigtype = pkt.default_fields["signatureAlgorithm"].algorithm.oidname + if "rsa" in sigtype.lower(): + return ASN1F_SEQUENCE.build(self, pkt) + elif "ecdsa" in sigtype.lower(): + pkt.default_fields["signatureValue"] = ECDSASignature() + return ASN1F_X509_CRLECDSA().build(pkt) + else: + raise Exception("could not build certificate") + +class X509_CRL(ASN1_Packet): + ASN1_codec = ASN1_Codecs.BER + ASN1_root = ASN1F_X509_CRL() -- GitLab