Source code for tlslite.x509

# Authors:
#   Trevor Perrin
#   Google - parsing subject field
#
# See the LICENSE file for legal information regarding use of this file.

"""Class representing an X.509 certificate."""

from ecdsa.keys import VerifyingKey

from tlslite.errors import TLSDecryptionFailed

from .utils.asn1parser import ASN1Parser
from .utils.cryptomath import *
from .utils.keyfactory import (
    _createPublicRSAKey,
    _create_public_ecdsa_key,
    _create_public_dsa_key,
    _create_public_eddsa_key,
    _create_public_mldsa_key,
)
from .utils.pem import *
from .utils.compat import compatHMAC, b2a_hex
from .constants import TLS_1_3_BRAINPOOL_SIG_SCHEMES, AlgorithmOID, RSA_PSS_OID, ExtensionType, \
    SignatureAlgorithm, HashAlgorithm, SignatureScheme
from .utils.ecc import curve_name_to_hash_name
from .errors import TLSIllegalParameterException

[docs] class X509(object): """ This class represents an X.509 certificate. :vartype bytes: bytearray :ivar bytes: The DER-encoded ASN.1 certificate :vartype publicKey: ~tlslite.utils.rsakey.RSAKey :ivar publicKey: The subject public key from the certificate. :vartype subject: bytearray :ivar subject: The DER-encoded ASN.1 subject distinguished name. :vartype certAlg: str :ivar certAlg: algorithm of the public key, "rsa" for RSASSA-PKCS#1 v1.5, "rsa-pss" for RSASSA-PSS, "ecdsa" for ECDSA """
[docs] def __init__(self): """Create empty certificate object.""" self.bytes = bytearray(0) self.serial_number = None self.subject_public_key = None self.publicKey = None self.subject = None self.certAlg = None self.sigalg = None self.issuer = None
def __hash__(self): """Calculate hash of object.""" return hash(bytes(self.bytes)) def __eq__(self, other): """Compare other object for equality.""" if not hasattr(other, "bytes"): return NotImplemented return self.bytes == other.bytes def __ne__(self, other): """Compare with other object for inequality.""" if not hasattr(other, "bytes"): return NotImplemented return not self == other
[docs] def parse(self, s): """ Parse a PEM-encoded X.509 certificate. :type s: str :param s: A PEM-encoded X.509 certificate (i.e. a base64-encoded certificate wrapped with "-----BEGIN CERTIFICATE-----" and "-----END CERTIFICATE-----" tags). """ bytes = dePem(s, "CERTIFICATE") self.parseBinary(bytes) return self
[docs] def parseBinary(self, cert_bytes): """ Parse a DER-encoded X.509 certificate. :type bytes: L{str} (in python2) or L{bytearray} of unsigned bytes :param bytes: A DER-encoded X.509 certificate. """ self.bytes = bytearray(cert_bytes) parser = ASN1Parser(self.bytes) # Get the SignatureAlgorithm signature_algorithm_identifier = parser.getChild(1) self.sigalg = bytes(signature_algorithm_identifier.getChildBytes(0)) # Finally get the (hash, signature) pair coresponding to it # If it is rsa-pss we need to check the aditional parameters field # to extract the hash algorithm if self.sigalg == RSA_PSS_OID: sigalg_hash = signature_algorithm_identifier.getChild(1) sigalg_hash = bytes(sigalg_hash.getChild(0).value) self.sigalg = AlgorithmOID.oid[sigalg_hash] else: self.sigalg = AlgorithmOID.oid[self.sigalg] # Get the tbsCertificate tbs_certificate = parser.getChild(0) # Is the optional version field present? # This determines which index the key is at. if tbs_certificate.value[0] == 0xA0: serial_number_index = 1 subject_public_key_info_index = 6 else: serial_number_index = 0 subject_public_key_info_index = 5 # Get serial number self.serial_number = bytesToNumber( tbs_certificate.getChild(serial_number_index).value ) # Get the issuer self.issuer = tbs_certificate.getChildBytes( subject_public_key_info_index - 3) # Get the subject self.subject = tbs_certificate.getChildBytes( subject_public_key_info_index - 1) # Get the subjectPublicKeyInfo subject_public_key_info = tbs_certificate.getChild( subject_public_key_info_index) # Get the AlgorithmIdentifier alg_identifier = subject_public_key_info.getChild(0) alg_identifier_len = alg_identifier.getChildCount() self.certAlg = get_algorithm(alg_identifier) # for RSA the parameters of AlgorithmIdentifier shuld be a NULL if self.certAlg == "rsa": if alg_identifier_len != 2: raise SyntaxError("Missing parameters in AlgorithmIdentifier") params = alg_identifier.getChild(1) if params.value != bytearray(0): raise SyntaxError( "Unexpected non-NULL parameters in " "AlgorithmIdentifier" ) elif self.certAlg == "ecdsa": self.publicKey = _ecdsa_pubkey_parsing( tbs_certificate.getChildBytes(subject_public_key_info_index) ) return elif self.certAlg == "dsa": self.publicKey = _dsa_pubkey_parsing(subject_public_key_info) return elif self.certAlg == "Ed25519" or self.certAlg == "Ed448": self.publicKey = _eddsa_pubkey_parsing( tbs_certificate.getChildBytes(subject_public_key_info_index) ) return elif self.certAlg in ("mldsa44", "mldsa65", "mldsa87"): self.publicKey = _mldsa_pubkey_parsing( tbs_certificate.getChildBytes(subject_public_key_info_index) ) return else: # rsa-pss pass # ignore parameters, if any - don't apply key restrictions self.publicKey, self.subject_public_key = _rsa_pubkey_parsing( subject_public_key_info, self.certAlg )
[docs] def getFingerprint(self): """ Get the hex-encoded fingerprint of this certificate. :rtype: str :returns: A hex-encoded fingerprint. """ return b2a_hex(SHA1(self.bytes))
[docs] def writeBytes(self): """Serialise object to a DER encoded string.""" return self.bytes
[docs] def get_algorithm(alg_identifier): """ Retrive the algoritm from the AlgorithmIdentifier """ # first item of AlgorithmIdentifier is the algorithm alg = alg_identifier.getChild(0) alg_oid = alg.value if list(alg_oid) == [42, 134, 72, 134, 247, 13, 1, 1, 1]: return "rsa" elif list(alg_oid) == [42, 134, 72, 134, 247, 13, 1, 1, 10]: return "rsa-pss" elif list(alg_oid) == [42, 134, 72, 206, 56, 4, 1]: return "dsa" elif list(alg_oid) == [42, 134, 72, 206, 61, 2, 1]: return "ecdsa" elif list(alg_oid) == [43, 101, 112]: return "Ed25519" elif list(alg_oid) == [43, 101, 113]: return "Ed448" elif list(alg_oid) == [96, 134, 72, 1, 101, 3, 4, 3, 17]: return "mldsa44" elif list(alg_oid) == [96, 134, 72, 1, 101, 3, 4, 3, 18]: return "mldsa65" elif list(alg_oid) == [96, 134, 72, 1, 101, 3, 4, 3, 19]: return "mldsa87" else: raise SyntaxError("Unrecognized AlgorithmIdentifier")
def _eddsa_pubkey_parsing(subject_public_key_info): """ Convert the raw DER encoded EdDSA parameters into public key object. :param subject_public_key_info: bytes like object with the DER encoded public key in it """ try: # python ecdsa knows how to parse curve OIDs so re-use that # code public_key = VerifyingKey.from_der(compatHMAC(subject_public_key_info)) except Exception: raise SyntaxError("Malformed or unsupported public key in " "certificate") return _create_public_eddsa_key(public_key) def _mldsa_pubkey_parsing(subject_public_key_info): """ Convert the raw DER encoded ML-DSA parameters into a public key object. :param subject_public_key_info: bytes like object with the DER encoded public key in it """ from dilithium_py.ml_dsa.pkcs import pk_from_der try: # dilithium py can do parsing of SPKI on its own so use it public_key = pk_from_der(subject_public_key_info) except Exception: raise SyntaxError("Malformed or unsupported public key in certificate") return _create_public_mldsa_key(public_key) def _rsa_pubkey_parsing(subject_public_key_info, cert_alg): """ Parse the RSA public key from the certificate. :param subject_public_key_info: ASN1Parser object with subject public key info of X.509 certificate """ # Get the subjectPublicKey subject_public_key = subject_public_key_info.getChild(1) self_subject_public_key = subject_public_key_info.getChildBytes(1) self_subject_public_key = ASN1Parser(self_subject_public_key).value[1:] # Adjust for BIT STRING encapsulation if subject_public_key.value[0]: raise SyntaxError() subject_public_key = ASN1Parser(subject_public_key.value[1:]) # Get the modulus and exponent modulus = subject_public_key.getChild(0) public_exponent = subject_public_key.getChild(1) # Decode them into numbers # pylint: disable=invalid-name # 'n' and 'e' are the universally used parameters in RSA algorithm # definition n = bytesToNumber(modulus.value) e = bytesToNumber(public_exponent.value) # Create a public key instance public_key = _createPublicRSAKey(n, e, cert_alg) return public_key, self_subject_public_key # pylint: enable=invalid-name def _ecdsa_pubkey_parsing(subject_public_key_info): """ Convert the raw DER encoded ECDSA parameters into public key object :param subject_public_key_info: bytes like object with DER encoded public key in it """ try: # python ecdsa knows how to parse curve OIDs so re-use that # code public_key = VerifyingKey.from_der(compatHMAC(subject_public_key_info)) except Exception: raise SyntaxError("Malformed or unsupported public key in " "certificate") # pylint: disable=invalid-name x = public_key.pubkey.point.x() y = public_key.pubkey.point.y() curve_name = public_key.curve.name return _create_public_ecdsa_key(x, y, curve_name) # pylint: enable=invalid-name def _dsa_pubkey_parsing(subject_public_key_info): """ Convert the raw DER encoded DSA parameters into public key object :param subject_public_key_info: bytes like object with DER encoded global parameters and public key in it """ global_parameters = (subject_public_key_info.getChild(0)).getChild(1) # Get the subjectPublicKey public_key = subject_public_key_info.getChild(1) # Adjust for BIT STRING encapsulation and get hex value if public_key.value[0]: raise SyntaxError() # pylint: disable=invalid-name y = ASN1Parser(public_key.value[1:]) # Get the {A, p, q} p = global_parameters.getChild(0) q = global_parameters.getChild(1) g = global_parameters.getChild(2) # Decode them into numbers y = bytesToNumber(y.value) p = bytesToNumber(p.value) q = bytesToNumber(q.value) g = bytesToNumber(g.value) # Create a public key instance return _create_public_dsa_key(p, q, g, y) # pylint: enable=invalid-name
[docs] class Credential(object): """ This class represents a credential. :vartype valid_time: int :ivar valid_time: time, after which the delegated credential is no longer valid. :vartype dc_cert_verify_algorithm: tuple(int,int) :ivar dc_cert_verify_algorithm: the signature algorithm of the credential key pair. :vartype subject_public_key_info: bytearray (ASN1_subjectPublicKeyInfo) :ivar subject_public_key_info: the credential's public key, a DER- encoded. """
[docs] def __init__( self, valid_time=0, dc_cert_verify_algorithm=None, subject_public_key_info=None, bytes=None ): """Create empty credential object.""" self.valid_time = valid_time self.dc_cert_verify_algorithm = dc_cert_verify_algorithm self.subject_public_key_info = subject_public_key_info self.bytes = bytes self.subject_public_key = None self.pub_key_alg = None self.pub_key = None
[docs] def parse_pub_key(self): """ Parse a DER-encoded [X.690] SubjectPublicKeyInfo to extract public key """ parser = ASN1Parser(self.subject_public_key_info) alg_identifier = parser.getChild(0) alg_identifier_len = parser.getChildCount() self.pub_key_alg = get_algorithm(alg_identifier) if self.pub_key_alg == "rsa": if alg_identifier_len != 2: raise SyntaxError("Missing parameters in AlgorithmIdentifier") params = alg_identifier.getChild(1) if params.value != bytearray(0): raise SyntaxError( "Unexpected non-NULL parameters in " "AlgorithmIdentifier" ) elif self.pub_key_alg == "ecdsa": self.pub_key = _ecdsa_pubkey_parsing(self.subject_public_key_info) return elif self.pub_key_alg == "dsa": self.pub_key = _dsa_pubkey_parsing(parser) return elif self.pub_key_alg == "Ed25519" or self.pub_key_alg == "Ed448": self.pub_key = _eddsa_pubkey_parsing( self.subject_public_key_info ) return else: pass self.pub_key, self.subject_public_key = _rsa_pubkey_parsing( parser, self.pub_key_alg)
[docs] @staticmethod def marshal(valid_time, dc_cert_verify_algorithm, subject_public_key_info): """ Encode the credential raw bytes. :vartype valid_time: int :ivar valid_time: time, after which the delegated credential is no longer valid. :vartype dc_cert_verify_algorithm: tuple(int,int) :ivar dc_cert_verify_algorithm: the signature algorithm of the credential key pair. :vartype subject_public_key_info: bytearray (ASN1_subjectPublicKeyInfo) :ivar subject_public_key_info: the credential's public key, a DER- encoded. """ cred_writer = Writer() cred_writer.addFour(valid_time) cred_writer.addOne(dc_cert_verify_algorithm[0]) cred_writer.addOne(dc_cert_verify_algorithm[1]) cred_writer.add_var_bytes(subject_public_key_info, 3) return cred_writer.bytes
[docs] class DelegatedCredential(object): """ This class represents a delegated credential. :vartype cred: Credential :ivar cred: the credential structure :vartype algorithm: tuple(int,int) :ivar algorithm: The signature algorithm used to create DelegatedCredential.signature. :vartype signature: bytearray :ivar signature: The delegation, a signature that binds the credential to the end-entity certificate's public key. """
[docs] def __init__(self, cred=None, algorithm=None, signature=None): """Create empty credential object.""" self.cred = cred self.algorithm = algorithm self.signature = signature
[docs] def parse(self, parser): """Parsing Delegating Credendial.""" valid_time = parser.get(4) dc_cert_verify_algorithm = (parser.get(1), parser.get(1)) subject_public_key_info = parser.getVarBytes(3) cred_raw_bytes = Credential.marshal(valid_time, dc_cert_verify_algorithm, subject_public_key_info) self.cred = Credential( valid_time=valid_time, dc_cert_verify_algorithm=dc_cert_verify_algorithm, subject_public_key_info=subject_public_key_info, bytes=cred_raw_bytes ) self.cred.parse_pub_key() self.algorithm = (parser.get(1), parser.get(1)) self.signature = parser.getVarBytes(2) return self
[docs] def write(self): """Serialise object to a DER encoded string.""" writer = Writer() writer.addFour( self.cred.valid_time) writer.addOne( self.cred.dc_cert_verify_algorithm[0]) writer.addOne( self.cred.dc_cert_verify_algorithm[1]) writer.add_var_bytes( self.cred.subject_public_key_info, 3) writer.addOne(self.algorithm[0]) writer.addOne(self.algorithm[1]) writer.add_var_bytes(self.signature, 2) return writer.bytes
[docs] def verify(self, certificate_entry, client_hello, cert_verify): """ Verify that the delegated credential is valid. Checks if the the delegated credential did not expire, the algorithms are of a type advertized by the client, the verify alg matches the scheme in peer's message. """ certificate = certificate_entry.certificate dc_sig_list = client_hello.getExtension( ExtensionType.delegated_credential) if self.cred.dc_cert_verify_algorithm not in dc_sig_list.sigalgs: raise TLSIllegalParameterException( "The dc_cert_verify_algorithm of the delegated " "credential algorithm must be a type " "advertised by the client in ClientHello extension.") sig_list = client_hello.getExtension(ExtensionType.signature_algorithms) if self.algorithm not in sig_list.sigalgs: raise TLSIllegalParameterException( "The algorithm field MUST be of a type advertised by " "the client in the signature_algorithms extension of " "the ClientHello message.") dc_cert_verify_algorithm = self.cred.dc_cert_verify_algorithm if dc_cert_verify_algorithm != cert_verify.signatureAlgorithm: raise TLSIllegalParameterException( "The dc_cert_verify_algorithm does not match " "the scheme indicated in the peer's " "CertificateVerify message") sig_context = DelegatedCredential.compute_certificate_dc_sig_context( certificate.bytes, self.cred.bytes, self.algorithm) cert_pub_key = certificate.publicKey sig_scheme = self.algorithm if sig_scheme in (SignatureScheme.ed25519, SignatureScheme.ed448): pad_type = None hash_name = "intrinsic" salt_len = None method = cert_pub_key.hashAndVerify elif sig_scheme[1] == SignatureAlgorithm.ecdsa: pad_type = None hash_name = HashAlgorithm.toRepr(sig_scheme[0]) matching_hash = curve_name_to_hash_name( cert_pub_key.curve_name) if hash_name != matching_hash: raise TLSIllegalParameterException( "server selected signature method invalid for the " "certificate it presented (curve mismatch)") salt_len = None method = cert_pub_key.hashAndVerify elif sig_scheme in TLS_1_3_BRAINPOOL_SIG_SCHEMES: scheme = SignatureScheme.toRepr(sig_scheme) pad_type = None hash_name = SignatureScheme.getHash(scheme) salt_len = None method = cert_pub_key.hashAndVerify else: scheme = SignatureScheme.toRepr(sig_scheme) pad_type = SignatureScheme.getPadding(scheme) hash_name = SignatureScheme.getHash(scheme) salt_len = getattr(hashlib, hash_name)().digest_size method = cert_pub_key.hashAndVerify if not method(self.signature, sig_context, pad_type, hash_name, salt_len): raise TLSDecryptionFailed("server Delegated Credential " \ "signature verification failed.") return True
[docs] @staticmethod def compute_certificate_dc_sig_context(cert_bytes, cred_bytes, dc_alg): """ Reconstructs the certificate signature context over the delegated credential. """ verify_bytes = bytearray(b'\x20' * 64 + b'TLS, server delegated credentials' + b'\x00' + cert_bytes + cred_bytes + bytearray(dc_alg)) return verify_bytes