From 087df702931f32eeb3a29957a8fe3fa31749d642 Mon Sep 17 00:00:00 2001 From: Simo Sorce Date: Tue, 28 Apr 2020 10:08:02 +0200 Subject: [PATCH] Switch to python cryptography Original patch: https://github.com/simo5/dnspython/commit/bfe84d523bd4fde7b2655857d78bba85ed05f43c The same change in master: https://github.com/rthalley/dnspython/pull/449 --- dns/dnssec.py | 168 ++++++++++++++++++++----------------------- setup.py | 2 +- tests/test_dnssec.py | 12 +--- 3 files changed, 79 insertions(+), 103 deletions(-) diff --git a/dns/dnssec.py b/dns/dnssec.py index 35da6b5..73e92da 100644 --- a/dns/dnssec.py +++ b/dns/dnssec.py @@ -17,6 +17,7 @@ """Common DNSSEC-related functions and constants.""" +import hashlib # used in make_ds() to avoid pycrypto dependency from io import BytesIO import struct import time @@ -165,10 +166,10 @@ def make_ds(name, key, algorithm, origin=None): if algorithm.upper() == 'SHA1': dsalg = 1 - hash = SHA1.new() + hash = hashlib.sha1() elif algorithm.upper() == 'SHA256': dsalg = 2 - hash = SHA256.new() + hash = hashlib.sha256() else: raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) @@ -214,7 +215,7 @@ def _is_dsa(algorithm): def _is_ecdsa(algorithm): - return _have_ecdsa and (algorithm in (ECDSAP256SHA256, ECDSAP384SHA384)) + return (algorithm in (ECDSAP256SHA256, ECDSAP384SHA384)) def _is_md5(algorithm): @@ -240,18 +241,26 @@ def _is_sha512(algorithm): def _make_hash(algorithm): if _is_md5(algorithm): - return MD5.new() + return hashes.MD5() if _is_sha1(algorithm): - return SHA1.new() + return hashes.SHA1() if _is_sha256(algorithm): - return SHA256.new() + return hashes.SHA256() if _is_sha384(algorithm): - return SHA384.new() + return hashes.SHA384() if _is_sha512(algorithm): - return SHA512.new() + return hashes.SHA512() + if algorithm == ED25519: + return hashes.SHA512() + if algorithm == ED448: + return hashes.SHAKE256(114) raise ValidationFailure('unknown hash for algorithm %u' % algorithm) +def _bytes_to_long(b): + return int.from_bytes(b, 'big') + + def _make_algorithm_id(algorithm): if _is_md5(algorithm): oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05] @@ -316,8 +325,6 @@ def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): if rrsig.inception > now: raise ValidationFailure('not yet valid') - hash = _make_hash(rrsig.algorithm) - if _is_rsa(rrsig.algorithm): keyptr = candidate_key.key (bytes_,) = struct.unpack('!B', keyptr[0:1]) @@ -328,9 +335,9 @@ def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): rsa_e = keyptr[0:bytes_] rsa_n = keyptr[bytes_:] try: - pubkey = CryptoRSA.construct( - (number.bytes_to_long(rsa_n), - number.bytes_to_long(rsa_e))) + public_key = rsa.RSAPublicNumbers( + _bytes_to_long(rsa_e), + _bytes_to_long(rsa_n)).public_key(default_backend()) except ValueError: raise ValidationFailure('invalid public key') sig = rrsig.signature @@ -346,42 +353,47 @@ def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): dsa_g = keyptr[0:octets] keyptr = keyptr[octets:] dsa_y = keyptr[0:octets] - pubkey = CryptoDSA.construct( - (number.bytes_to_long(dsa_y), - number.bytes_to_long(dsa_g), - number.bytes_to_long(dsa_p), - number.bytes_to_long(dsa_q))) - sig = rrsig.signature[1:] + try: + public_key = dsa.DSAPublicNumbers( + _bytes_to_long(dsa_y), + dsa.DSAParameterNumbers( + _bytes_to_long(dsa_p), + _bytes_to_long(dsa_q), + _bytes_to_long(dsa_g))).public_key(default_backend()) + except ValueError: + raise ValidationFailure('invalid public key') + sig_r = rrsig.signature[1:21] + sig_s = rrsig.signature[21:] + sig = utils.encode_dss_signature(_bytes_to_long(sig_r), + _bytes_to_long(sig_s)) elif _is_ecdsa(rrsig.algorithm): - # use ecdsa for NIST-384p -- not currently supported by pycryptodome - keyptr = candidate_key.key - if rrsig.algorithm == ECDSAP256SHA256: - curve = ecdsa.curves.NIST256p - key_len = 32 + curve = ec.SECP256R1() + octets = 32 elif rrsig.algorithm == ECDSAP384SHA384: - curve = ecdsa.curves.NIST384p - key_len = 48 - - x = number.bytes_to_long(keyptr[0:key_len]) - y = number.bytes_to_long(keyptr[key_len:key_len * 2]) - if not ecdsa.ecdsa.point_is_valid(curve.generator, x, y): - raise ValidationFailure('invalid ECDSA key') - point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order) - verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point, - curve) - pubkey = ECKeyWrapper(verifying_key, key_len) - r = rrsig.signature[:key_len] - s = rrsig.signature[key_len:] - sig = ecdsa.ecdsa.Signature(number.bytes_to_long(r), - number.bytes_to_long(s)) + curve = ec.SECP384R1() + octets = 48 + ecdsa_x = keyptr[0:octets] + ecdsa_y = keyptr[octets:octets * 2] + try: + public_key = ec.EllipticCurvePublicNumbers( + curve=curve, + x=_bytes_to_long(ecdsa_x), + y=_bytes_to_long(ecdsa_y)).public_key(default_backend()) + except ValueError: + raise ValidationFailure('invalid public key') + sig_r = rrsig.signature[0:octets] + sig_s = rrsig.signature[octets:] + sig = utils.encode_dss_signature(_bytes_to_long(sig_r), + _bytes_to_long(sig_s)) else: raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) - hash.update(_to_rdata(rrsig, origin)[:18]) - hash.update(rrsig.signer.to_digestable(origin)) + data = b'' + data += _to_rdata(rrsig, origin)[:18] + data += rrsig.signer.to_digestable(origin) if rrsig.labels < len(rrname) - 1: suffix = rrname.split(rrsig.labels + 1)[1] @@ -391,25 +403,21 @@ def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): rrsig.original_ttl) rrlist = sorted(rdataset) for rr in rrlist: - hash.update(rrnamebuf) - hash.update(rrfixed) + data += rrnamebuf + data += rrfixed rrdata = rr.to_digestable(origin) rrlen = struct.pack('!H', len(rrdata)) - hash.update(rrlen) - hash.update(rrdata) + data += rrlen + data += rrdata + chosen_hash = _make_hash(rrsig.algorithm) try: if _is_rsa(rrsig.algorithm): - verifier = pkcs1_15.new(pubkey) - # will raise ValueError if verify fails: - verifier.verify(hash, sig) + public_key.verify(sig, data, padding.PKCS1v15(), chosen_hash) elif _is_dsa(rrsig.algorithm): - verifier = DSS.new(pubkey, 'fips-186-3') - verifier.verify(hash, sig) + public_key.verify(sig, data, chosen_hash) elif _is_ecdsa(rrsig.algorithm): - digest = hash.digest() - if not pubkey.verify(digest, sig): - raise ValueError + public_key.verify(sig, data, ec.ECDSA(chosen_hash)) else: # Raise here for code clarity; this won't actually ever happen # since if the algorithm is really unknown we'd already have @@ -417,7 +425,7 @@ def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) # If we got here, we successfully verified so we can return without error return - except ValueError: + except InvalidSignature: # this happens on an individual validation failure continue # nothing verified -- raise failure: @@ -472,48 +480,24 @@ def _validate(rrset, rrsigset, keys, origin=None, now=None): raise ValidationFailure("no RRSIGs validated") -def _need_pycrypto(*args, **kwargs): - raise NotImplementedError("DNSSEC validation requires pycryptodome/pycryptodomex") +def _need_pyca(*args, **kwargs): + raise NotImplementedError("DNSSEC validation requires python cryptography") try: - try: - # test we're using pycryptodome, not pycrypto (which misses SHA1 for example) - from Crypto.Hash import MD5, SHA1, SHA256, SHA384, SHA512 - from Crypto.PublicKey import RSA as CryptoRSA, DSA as CryptoDSA - from Crypto.Signature import pkcs1_15, DSS - from Crypto.Util import number - except ImportError: - from Cryptodome.Hash import MD5, SHA1, SHA256, SHA384, SHA512 - from Cryptodome.PublicKey import RSA as CryptoRSA, DSA as CryptoDSA - from Cryptodome.Signature import pkcs1_15, DSS - from Cryptodome.Util import number + from cryptography.exceptions import InvalidSignature + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import padding + from cryptography.hazmat.primitives.asymmetric import utils + from cryptography.hazmat.primitives.asymmetric import dsa + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.asymmetric import rsa except ImportError: - validate = _need_pycrypto - validate_rrsig = _need_pycrypto - _have_pycrypto = False - _have_ecdsa = False + validate = _need_pyca + validate_rrsig = _need_pyca + _have_pyca = False else: validate = _validate validate_rrsig = _validate_rrsig - _have_pycrypto = True - - try: - import ecdsa - import ecdsa.ecdsa - import ecdsa.ellipticcurve - import ecdsa.keys - except ImportError: - _have_ecdsa = False - else: - _have_ecdsa = True - - class ECKeyWrapper(object): - - def __init__(self, key, key_len): - self.key = key - self.key_len = key_len - - def verify(self, digest, sig): - diglong = number.bytes_to_long(digest) - return self.key.pubkey.verifies(diglong, sig) + _have_pyca = True diff --git a/setup.py b/setup.py index 743d43c..2ee38a7 100755 --- a/setup.py +++ b/setup.py @@ -75,7 +75,7 @@ direct manipulation of DNS zones, messages, names, and records.""", 'provides': ['dns'], 'extras_require': { 'IDNA': ['idna>=2.1'], - 'DNSSEC': ['pycryptodome', 'ecdsa>=0.13'], + 'DNSSEC': ['cryptography>=2.3'], }, 'ext_modules': ext_modules if compile_cython else None, 'zip_safe': False if compile_cython else None, diff --git a/tests/test_dnssec.py b/tests/test_dnssec.py index c87862a..20b52b2 100644 --- a/tests/test_dnssec.py +++ b/tests/test_dnssec.py @@ -151,8 +151,8 @@ abs_ecdsa384_soa_rrsig = dns.rrset.from_text('example.', 86400, 'IN', 'RRSIG', -@unittest.skipUnless(dns.dnssec._have_pycrypto, - "Pycryptodome cannot be imported") +@unittest.skipUnless(dns.dnssec._have_pyca, + "Python Cryptography cannot be imported") class DNSSECValidatorTestCase(unittest.TestCase): def testAbsoluteRSAGood(self): # type: () -> None @@ -199,28 +199,20 @@ class DNSSECValidatorTestCase(unittest.TestCase): ds = dns.dnssec.make_ds(abs_example, example_sep_key, 'SHA256') self.failUnless(ds == example_ds_sha256) - @unittest.skipUnless(dns.dnssec._have_ecdsa, - "python ECDSA cannot be imported") def testAbsoluteECDSA256Good(self): # type: () -> None dns.dnssec.validate(abs_ecdsa256_soa, abs_ecdsa256_soa_rrsig, abs_ecdsa256_keys, None, when3) - @unittest.skipUnless(dns.dnssec._have_ecdsa, - "python ECDSA cannot be imported") def testAbsoluteECDSA256Bad(self): # type: () -> None def bad(): # type: () -> None dns.dnssec.validate(abs_other_ecdsa256_soa, abs_ecdsa256_soa_rrsig, abs_ecdsa256_keys, None, when3) self.failUnlessRaises(dns.dnssec.ValidationFailure, bad) - @unittest.skipUnless(dns.dnssec._have_ecdsa, - "python ECDSA cannot be imported") def testAbsoluteECDSA384Good(self): # type: () -> None dns.dnssec.validate(abs_ecdsa384_soa, abs_ecdsa384_soa_rrsig, abs_ecdsa384_keys, None, when4) - @unittest.skipUnless(dns.dnssec._have_ecdsa, - "python ECDSA cannot be imported") def testAbsoluteECDSA384Bad(self): # type: () -> None def bad(): # type: () -> None dns.dnssec.validate(abs_other_ecdsa384_soa, abs_ecdsa384_soa_rrsig, -- 2.26.2