diff --git a/scapy/asn1/ber.py b/scapy/asn1/ber.py index fe180e1cc2f517350f381c667807b35c3f080fb1..2cc5bd4efa1ec2c5aa614bde3eba8faa196b05e7 100644 --- a/scapy/asn1/ber.py +++ b/scapy/asn1/ber.py @@ -1,10 +1,12 @@ ## This file is part of Scapy ## See http://www.secdev.org/projects/scapy for more informations ## Copyright (C) Philippe Biondi <phil@secdev.org> +## Modified by Maxence Tury <maxence.tury@ssi.gouv.fr> +## Acknowledgment: Ralph Broenink ## This program is published under a GPLv2 license """ -Basic Encoding Rules (BER) for ASN.1 +Basic Encoding Rules (BER) for ASN1.1 """ from scapy.error import warning @@ -87,8 +89,7 @@ def BER_num_enc(l, size=1): l >>= 7 size -= 1 return "".join([chr(k) for k in x]) -def BER_num_dec(s): - x = 0 +def BER_num_dec(s, x=0): for i, c in enumerate(s): c = ord(c) x <<= 7 @@ -99,6 +100,43 @@ def BER_num_dec(s): raise BER_Decoding_Error("BER_num_dec: unfinished number description", remaining=s) return x, s[i+1:] +# The function below provides low-tag and high-tag identifier partial support. +# Class and primitive/constructed bit decoding is not supported yet. +# For now Scapy considers this information to always be part of the identifier +# e.g. we need BER_id_dec("\x30") to be 0x30 so that Scapy calls a SEQUENCE, +# even though the real id is 0x10 once bit 6 (constructed) has been removed. +def BER_id_dec(s): + x = ord(s[0]) + if x & 0x1f != 0x1f: + return x,s[1:] + return BER_num_dec(s[1:], x&0xe0) + +# The functions below provide implicit and explicit tagging support. +def BER_tagging_dec(s, hidden_tag=None, implicit_tag=None, + explicit_tag=None, safe=False): + if len(s) > 0: + err_msg = "BER_tagging_dec: observed tag does not match expected tag" + if implicit_tag is not None: + ber_id,s = BER_id_dec(s) + if ber_id != implicit_tag: + if not safe: + raise BER_Decoding_Error(err_msg, remaining=s) + s = chr(hidden_tag) + s + elif explicit_tag is not None: + ber_id,s = BER_id_dec(s) + if ber_id != explicit_tag: + if not safe: + raise BER_Decoding_Error(err_msg, remaining=s) + l,s = BER_len_dec(s) + return s +def BER_tagging_enc(s, implicit_tag=None, explicit_tag=None): + if len(s) > 0: + if implicit_tag is not None: + s = chr(implicit_tag) + s[1:] + elif explicit_tag is not None: + s = chr(explicit_tag) + BER_len_enc(len(s)) + s + return s + #####[ BER classes ]##### class BERcodec_metaclass(type): @@ -128,10 +166,11 @@ class BERcodec_Object: @classmethod def check_type(cls, s): cls.check_string(s) - if cls.tag != ord(s[0]): + tag, remainder = BER_id_dec(s) + if cls.tag != tag: raise BER_BadTag_Decoding_Error("%s: Got tag [%i/%#x] while expecting %r" % - (cls.__name__, ord(s[0]), ord(s[0]),cls.tag), remaining=s) - return s[1:] + (cls.__name__, tag, tag, cls.tag), remaining=s) + return remainder @classmethod def check_type_get_len(cls, s): s2 = cls.check_type(s) @@ -152,7 +191,7 @@ class BERcodec_Object: if context is None: context = cls.tag.context cls.check_string(s) - p = ord(s[0]) + p,_ = BER_id_dec(s) if p not in context: t = s if len(t) > 18: @@ -187,11 +226,13 @@ class BERcodec_Object: else: return BERcodec_INTEGER.enc(int(s)) - - ASN1_Codecs.BER.register_stem(BERcodec_Object) +########################## +#### BERcodec objects #### +########################## + class BERcodec_INTEGER(BERcodec_Object): tag = ASN1_Class_UNIVERSAL.INTEGER @classmethod @@ -223,12 +264,46 @@ class BERcodec_INTEGER(BERcodec_Object): x |= ord(c) return cls.asn1_object(x),t - class BERcodec_BOOLEAN(BERcodec_INTEGER): tag = ASN1_Class_UNIVERSAL.BOOLEAN -class BERcodec_ENUMERATED(BERcodec_INTEGER): - tag = ASN1_Class_UNIVERSAL.ENUMERATED +class BERcodec_BIT_STRING(BERcodec_Object): + tag = ASN1_Class_UNIVERSAL.BIT_STRING + @classmethod + def do_dec(cls, s, context=None, safe=False): + # /!\ the unused_bits information is lost after this decoding + l,s,t = cls.check_type_check_len(s) + if len(s) > 0: + unused_bits = ord(s[0]) + if safe and unused_bits > 7: + raise BER_Decoding_Error("BERcodec_BIT_STRING: too many unused_bits advertised", remaining=s) + s = "".join(format(ord(x), 'b').zfill(8) for x in s[1:]) + if unused_bits > 0: + s = s[:-unused_bits] + return cls.tag.asn1_object(s),t + else: + raise BER_Decoding_Error("BERcodec_BIT_STRING found no content (not even unused_bits byte)", remaining=s) + @classmethod + def enc(cls,s): + # /!\ this is DER encoding (bit strings are only zero-bit padded) + if len(s) % 8 == 0: + unused_bits = 0 + else: + unused_bits = 8 - len(s)%8 + s += "0"*unused_bits + s = "".join(chr(int("".join(x),2)) for x in zip(*[iter(s)]*8)) + s = chr(unused_bits) + s + return chr(cls.tag)+BER_len_enc(len(s))+s + +class BERcodec_STRING(BERcodec_Object): + tag = ASN1_Class_UNIVERSAL.STRING + @classmethod + def enc(cls,s): + return chr(cls.tag)+BER_len_enc(len(s))+s + @classmethod + def do_dec(cls, s, context=None, safe=False): + l,s,t = cls.check_type_check_len(s) + return cls.tag.asn1_object(s),t class BERcodec_NULL(BERcodec_INTEGER): tag = ASN1_Class_UNIVERSAL.NULL @@ -237,23 +312,35 @@ class BERcodec_NULL(BERcodec_INTEGER): if i == 0: return chr(cls.tag)+"\0" else: - return BERcodec_INTEGER.enc(i) - -class BERcodec_SEP(BERcodec_NULL): - tag = ASN1_Class_UNIVERSAL.SEP + return super(cls,cls).enc(i) -class BERcodec_STRING(BERcodec_Object): - tag = ASN1_Class_UNIVERSAL.STRING +class BERcodec_OID(BERcodec_Object): + tag = ASN1_Class_UNIVERSAL.OID @classmethod - def enc(cls,s): + def enc(cls, oid): + lst = [int(x) for x in oid.strip(".").split(".")] + if len(lst) >= 2: + lst[1] += 40*lst[0] + del(lst[0]) + s = "".join([BER_num_enc(k) for k in lst]) return chr(cls.tag)+BER_len_enc(len(s))+s @classmethod def do_dec(cls, s, context=None, safe=False): l,s,t = cls.check_type_check_len(s) - return cls.tag.asn1_object(s),t + lst = [] + while s: + l,s = BER_num_dec(s) + lst.append(l) + if (len(lst) > 0): + lst.insert(0,lst[0]/40) + lst[1] %= 40 + return cls.asn1_object(".".join([str(k) for k in lst])), t -class BERcodec_BIT_STRING(BERcodec_STRING): - tag = ASN1_Class_UNIVERSAL.BIT_STRING +class BERcodec_ENUMERATED(BERcodec_INTEGER): + tag = ASN1_Class_UNIVERSAL.ENUMERATED + +class BERcodec_UTF8_STRING(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.UTF8_STRING class BERcodec_PRINTABLE_STRING(BERcodec_STRING): tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING @@ -264,49 +351,20 @@ class BERcodec_T61_STRING (BERcodec_STRING): class BERcodec_IA5_STRING(BERcodec_STRING): tag = ASN1_Class_UNIVERSAL.IA5_STRING -class BERcodec_NUMERIC_STRING(BERcodec_STRING): - tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING - -class BERcodec_VIDEOTEX_STRING(BERcodec_STRING): - tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING - -class BERcodec_IPADDRESS(BERcodec_STRING): - tag = ASN1_Class_UNIVERSAL.IPADDRESS - - @classmethod - def enc(cls, ipaddr_ascii): - try: - s = inet_aton(ipaddr_ascii) - except Exception: - raise BER_Encoding_Error("IPv4 address could not be encoded") - return chr(cls.tag)+BER_len_enc(len(s))+s - - @classmethod - def do_dec(cls, s, context=None, safe=False): - l,s,t = cls.check_type_check_len(s) - try: - ipaddr_ascii = inet_ntoa(s) - except Exception: - raise BER_Decoding_Error("IP address could not be decoded", decoded=obj) - return cls.asn1_object(ipaddr_ascii), t - class BERcodec_UTC_TIME(BERcodec_STRING): tag = ASN1_Class_UNIVERSAL.UTC_TIME class BERcodec_GENERALIZED_TIME(BERcodec_STRING): tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME -class BERcodec_TIME_TICKS(BERcodec_INTEGER): - tag = ASN1_Class_UNIVERSAL.TIME_TICKS +class BERcodec_ISO646_STRING(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.ISO646_STRING -class BERcodec_GAUGE32(BERcodec_INTEGER): - tag = ASN1_Class_UNIVERSAL.GAUGE32 +class BERcodec_UNIVERSAL_STRING(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.UNIVERSAL_STRING -class BERcodec_COUNTER32(BERcodec_INTEGER): - tag = ASN1_Class_UNIVERSAL.COUNTER32 - -class BERcodec_COUNTER64(BERcodec_INTEGER): - tag = ASN1_Class_UNIVERSAL.COUNTER64 +class BERcodec_BMP_STRING (BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.BMP_STRING class BERcodec_SEQUENCE(BERcodec_Object): tag = ASN1_Class_UNIVERSAL.SEQUENCE @@ -339,28 +397,29 @@ class BERcodec_SEQUENCE(BERcodec_Object): class BERcodec_SET(BERcodec_SEQUENCE): tag = ASN1_Class_UNIVERSAL.SET - -class BERcodec_OID(BERcodec_Object): - tag = ASN1_Class_UNIVERSAL.OID - +class BERcodec_IPADDRESS(BERcodec_STRING): + tag = ASN1_Class_UNIVERSAL.IPADDRESS @classmethod - def enc(cls, oid): - lst = [int(x) for x in oid.strip(".").split(".")] - if len(lst) >= 2: - lst[1] += 40*lst[0] - del(lst[0]) - s = "".join([BER_num_enc(k) for k in lst]) + def enc(cls, ipaddr_ascii): + try: + s = inet_aton(ipaddr_ascii) + except Exception: + raise BER_Encoding_Error("IPv4 address could not be encoded") return chr(cls.tag)+BER_len_enc(len(s))+s @classmethod def do_dec(cls, s, context=None, safe=False): l,s,t = cls.check_type_check_len(s) - lst = [] - while s: - l,s = BER_num_dec(s) - lst.append(l) - if (len(lst) > 0): - lst.insert(0,lst[0]/40) - lst[1] %= 40 - return cls.asn1_object(".".join([str(k) for k in lst])), t + try: + ipaddr_ascii = inet_ntoa(s) + except Exception: + raise BER_Decoding_Error("IP address could not be decoded", decoded=obj) + return cls.asn1_object(ipaddr_ascii), t + +class BERcodec_COUNTER32(BERcodec_INTEGER): + tag = ASN1_Class_UNIVERSAL.COUNTER32 + +class BERcodec_TIME_TICKS(BERcodec_INTEGER): + tag = ASN1_Class_UNIVERSAL.TIME_TICKS +