aboutsummaryrefslogtreecommitdiff
path: root/test/functional/test_framework
diff options
context:
space:
mode:
authorPieter Wuille <pieter.wuille@gmail.com>2019-04-15 16:49:18 -0700
committerPieter Wuille <pieter.wuille@gmail.com>2019-04-18 11:58:32 -0700
commit8c7b9324ca3f3ffb178bea56a96ea23f7e0383cb (patch)
treef25be12fbce29ce778be82f779f0d40c1d3b0b3c /test/functional/test_framework
parent598323911e930d67e678e464353eb570ad3bf2b7 (diff)
Pure python EC
This removes the dependency on OpenSSL for the interaction tests, by providing a pure-Python toy implementation of secp256k1.
Diffstat (limited to 'test/functional/test_framework')
-rw-r--r--test/functional/test_framework/key.py528
1 files changed, 324 insertions, 204 deletions
diff --git a/test/functional/test_framework/key.py b/test/functional/test_framework/key.py
index 1b3e510dc4..996de5f94f 100644
--- a/test/functional/test_framework/key.py
+++ b/test/functional/test_framework/key.py
@@ -1,226 +1,346 @@
-# Copyright (c) 2011 Sam Rushing
-"""ECC secp256k1 OpenSSL wrapper.
+# Copyright (c) 2019 Pieter Wuille
-WARNING: This module does not mlock() secrets; your private keys may end up on
-disk in swap! Use with caution!
+"""Test-only secp256k1 elliptic curve implementation
-This file is modified from python-bitcoinlib.
+WARNING: This code is slow, uses bad randomness, does not properly protect
+keys, and is trivially vulnerable to side channel attacks. Do not use for
+anything but tests.
"""
-import ctypes
-import ctypes.util
-import hashlib
+import random
-ssl = ctypes.cdll.LoadLibrary(ctypes.util.find_library ('ssl') or 'libeay32')
-
-ssl.BN_new.restype = ctypes.c_void_p
-ssl.BN_new.argtypes = []
-
-ssl.BN_bin2bn.restype = ctypes.c_void_p
-ssl.BN_bin2bn.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p]
-
-ssl.BN_CTX_free.restype = None
-ssl.BN_CTX_free.argtypes = [ctypes.c_void_p]
-
-ssl.BN_CTX_new.restype = ctypes.c_void_p
-ssl.BN_CTX_new.argtypes = []
-
-ssl.ECDH_compute_key.restype = ctypes.c_int
-ssl.ECDH_compute_key.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p]
-
-ssl.ECDSA_sign.restype = ctypes.c_int
-ssl.ECDSA_sign.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
-
-ssl.ECDSA_verify.restype = ctypes.c_int
-ssl.ECDSA_verify.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p]
-
-ssl.EC_KEY_free.restype = None
-ssl.EC_KEY_free.argtypes = [ctypes.c_void_p]
-
-ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p
-ssl.EC_KEY_new_by_curve_name.argtypes = [ctypes.c_int]
-
-ssl.EC_KEY_get0_group.restype = ctypes.c_void_p
-ssl.EC_KEY_get0_group.argtypes = [ctypes.c_void_p]
-
-ssl.EC_KEY_get0_public_key.restype = ctypes.c_void_p
-ssl.EC_KEY_get0_public_key.argtypes = [ctypes.c_void_p]
-
-ssl.EC_KEY_set_private_key.restype = ctypes.c_int
-ssl.EC_KEY_set_private_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
-
-ssl.EC_KEY_set_conv_form.restype = None
-ssl.EC_KEY_set_conv_form.argtypes = [ctypes.c_void_p, ctypes.c_int]
-
-ssl.EC_KEY_set_public_key.restype = ctypes.c_int
-ssl.EC_KEY_set_public_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
-
-ssl.i2o_ECPublicKey.restype = ctypes.c_void_p
-ssl.i2o_ECPublicKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
-
-ssl.EC_POINT_new.restype = ctypes.c_void_p
-ssl.EC_POINT_new.argtypes = [ctypes.c_void_p]
-
-ssl.EC_POINT_free.restype = None
-ssl.EC_POINT_free.argtypes = [ctypes.c_void_p]
-
-ssl.EC_POINT_mul.restype = ctypes.c_int
-ssl.EC_POINT_mul.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
-
-# this specifies the curve used with ECDSA.
-NID_secp256k1 = 714 # from openssl/obj_mac.h
+def modinv(a, n):
+ """Compute the modular inverse of a modulo n
+ See https://en.wikipedia.org/wiki/Extended_Euclidean_algorithm#Modular_integers
+ """
+ t1, t2 = 0, 1
+ r1, r2 = n, a
+ while r2 != 0:
+ q = r1 // r2
+ t1, t2 = t2, t1 - q * t2
+ r1, r2 = r2, r1 - q * r2
+ if r1 > 1:
+ return None
+ if t1 < 0:
+ t1 += n
+ return t1
+
+def jacobi_symbol(n, k):
+ """Compute the Jacobi symbol of n modulo k
+
+ See http://en.wikipedia.org/wiki/Jacobi_symbol
+ """
+ assert k > 0 and k & 1
+ n %= k
+ t = 0
+ while n != 0:
+ while n & 1 == 0:
+ n >>= 1
+ r = k & 7
+ t ^= (r == 3 or r == 5)
+ n, k = k, n
+ t ^= (n & k & 3 == 3)
+ n = n % k
+ if k == 1:
+ return -1 if t else 1
+ return 0
+
+def modsqrt(a, p):
+ """Compute the square root of a modulo p
+
+ For p = 3 mod 4, if a square root exists, it is equal to a**((p+1)/4) mod p.
+ """
+ assert(p % 4 == 3) # Only p = 3 mod 4 is implemented
+ sqrt = pow(a, (p + 1)//4, p)
+ if pow(sqrt, 2, p) == a % p:
+ return sqrt
+ return None
+
+class EllipticCurve:
+ def __init__(self, p, a, b):
+ """Initialize elliptic curve y^2 = x^3 + a*x + b over GF(p)."""
+ self.p = p
+ self.a = a % p
+ self.b = b % p
+
+ def affine(self, p1):
+ """Convert a Jacobian point tuple p1 to affine form, or None if at infinity."""
+ x1, y1, z1 = p1
+ if z1 == 0:
+ return None
+ inv = modinv(z1, self.p)
+ inv_2 = (inv**2) % self.p
+ inv_3 = (inv_2 * inv) % self.p
+ return ((inv_2 * x1) % self.p, (inv_3 * y1) % self.p, 1)
+
+ def negate(self, p1):
+ """Negate a Jacobian point tuple p1."""
+ x1, y1, z1 = p1
+ return (x1, (self.p - y1) % self.p, z1)
+
+ def on_curve(self, p1):
+ """Determine whether a Jacobian tuple p is on the curve (and not infinity)"""
+ x1, y1, z1 = p1
+ z2 = pow(z1, 2, self.p)
+ z4 = pow(z2, 2, self.p)
+ return z1 != 0 and (pow(x1, 3, self.p) + self.a * x1 * z4 + self.b * z2 * z4 - pow(y1, 2, self.p)) % self.p == 0
+
+ def is_x_coord(self, x):
+ """Test whether x is a valid X coordinate on the curve."""
+ x_3 = pow(x, 3, self.p)
+ return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1
+
+ def lift_x(self, x):
+ """Given an X coordinate on the curve, return a corresponding affine point."""
+ x_3 = pow(x, 3, self.p)
+ v = x_3 + self.a * x + self.b
+ y = modsqrt(v, self.p)
+ if y is None:
+ return None
+ return (x, y, 1)
+
+ def double(self, p1):
+ """Double a Jacobian tuple p1"""
+ x1, y1, z1 = p1
+ if z1 == 0:
+ return (0, 1, 0)
+ y1_2 = (y1**2) % self.p
+ y1_4 = (y1_2**2) % self.p
+ x1_2 = (x1**2) % self.p
+ s = (4*x1*y1_2) % self.p
+ m = 3*x1_2
+ if self.a:
+ m += self.a * pow(z1, 4, self.p)
+ m = m % self.p
+ x2 = (m**2 - 2*s) % self.p
+ y2 = (m*(s - x2) - 8*y1_4) % self.p
+ z2 = (2*y1*z1) % self.p
+ return (x2, y2, z2)
+
+ def add_mixed(self, p1, p2):
+ """Add a Jacobian tuple p1 and an affine tuple p2"""
+ x1, y1, z1 = p1
+ x2, y2, z2 = p2
+ assert(z2 == 1)
+ if z1 == 0:
+ return p2
+ z1_2 = (z1**2) % self.p
+ z1_3 = (z1_2 * z1) % self.p
+ u2 = (x2 * z1_2) % self.p
+ s2 = (y2 * z1_3) % self.p
+ if x1 == u2:
+ if (y1 != s2):
+ return (0, 1, 0)
+ return self.double(p1)
+ h = u2 - x1
+ r = s2 - y1
+ h_2 = (h**2) % self.p
+ h_3 = (h_2 * h) % self.p
+ u1_h_2 = (x1 * h_2) % self.p
+ x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
+ y3 = (r*(u1_h_2 - x3) - y1*h_3) % self.p
+ z3 = (h*z1) % self.p
+ return (x3, y3, z3)
+
+ def add(self, p1, p2):
+ """Add two Jacobian tuples p1 and p2"""
+ x1, y1, z1 = p1
+ x2, y2, z2 = p2
+ if z1 == 0:
+ return p2
+ if z2 == 0:
+ return p1
+ if z1 == 1:
+ return self.add_mixed(p2, p1)
+ if z2 == 1:
+ return self.add_mixed(p1, p2)
+ z1_2 = (z1**2) % self.p
+ z1_3 = (z1_2 * z1) % self.p
+ z2_2 = (z2**2) % self.p
+ z2_3 = (z2_2 * z2) % self.p
+ u1 = (x1 * z2_2) % self.p
+ u2 = (x2 * z1_2) % self.p
+ s1 = (y1 * z2_3) % self.p
+ s2 = (y2 * z1_3) % self.p
+ if u1 == u2:
+ if (s1 != s2):
+ return (0, 1, 0)
+ return self.double(p1)
+ h = u2 - u1
+ r = s2 - s1
+ h_2 = (h**2) % self.p
+ h_3 = (h_2 * h) % self.p
+ u1_h_2 = (u1 * h_2) % self.p
+ x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
+ y3 = (r*(u1_h_2 - x3) - s1*h_3) % self.p
+ z3 = (h*z1*z2) % self.p
+ return (x3, y3, z3)
+
+ def mul(self, ps):
+ """Compute a (multi) point multiplication
+
+ ps is a list of (Jacobian tuple, scalar) pairs.
+ """
+ r = (0, 1, 0)
+ for i in range(255, -1, -1):
+ r = self.double(r)
+ for (p, n) in ps:
+ if ((n >> i) & 1):
+ r = self.add(r, p)
+ return r
+
+SECP256K1 = EllipticCurve(2**256 - 2**32 - 977, 0, 7)
+SECP256K1_G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, 1)
SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2
-# Thx to Sam Devlin for the ctypes magic 64-bit fix.
-def _check_result(val, func, args):
- if val == 0:
- raise ValueError
- else:
- return ctypes.c_void_p (val)
-
-ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p
-ssl.EC_KEY_new_by_curve_name.errcheck = _check_result
-
-class CECKey():
- """Wrapper around OpenSSL's EC_KEY"""
-
- POINT_CONVERSION_COMPRESSED = 2
- POINT_CONVERSION_UNCOMPRESSED = 4
+class ECPubKey():
+ """A secp256k1 public key"""
def __init__(self):
- self.k = ssl.EC_KEY_new_by_curve_name(NID_secp256k1)
-
- def __del__(self):
- if ssl:
- ssl.EC_KEY_free(self.k)
- self.k = None
-
- def set_secretbytes(self, secret):
- priv_key = ssl.BN_bin2bn(secret, 32, ssl.BN_new())
- group = ssl.EC_KEY_get0_group(self.k)
- pub_key = ssl.EC_POINT_new(group)
- ctx = ssl.BN_CTX_new()
- if not ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx):
- raise ValueError("Could not derive public key from the supplied secret.")
- ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx)
- ssl.EC_KEY_set_private_key(self.k, priv_key)
- ssl.EC_KEY_set_public_key(self.k, pub_key)
- ssl.EC_POINT_free(pub_key)
- ssl.BN_CTX_free(ctx)
- return self.k
-
- def set_privkey(self, key):
- self.mb = ctypes.create_string_buffer(key)
- return ssl.d2i_ECPrivateKey(ctypes.byref(self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key))
-
- def set_pubkey(self, key):
- self.mb = ctypes.create_string_buffer(key)
- return ssl.o2i_ECPublicKey(ctypes.byref(self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key))
-
- def get_privkey(self):
- size = ssl.i2d_ECPrivateKey(self.k, 0)
- mb_pri = ctypes.create_string_buffer(size)
- ssl.i2d_ECPrivateKey(self.k, ctypes.byref(ctypes.pointer(mb_pri)))
- return mb_pri.raw
-
- def get_pubkey(self):
- size = ssl.i2o_ECPublicKey(self.k, 0)
- mb = ctypes.create_string_buffer(size)
- ssl.i2o_ECPublicKey(self.k, ctypes.byref(ctypes.pointer(mb)))
- return mb.raw
-
- def get_raw_ecdh_key(self, other_pubkey):
- ecdh_keybuffer = ctypes.create_string_buffer(32)
- r = ssl.ECDH_compute_key(ctypes.pointer(ecdh_keybuffer), 32,
- ssl.EC_KEY_get0_public_key(other_pubkey.k),
- self.k, 0)
- if r != 32:
- raise Exception('CKey.get_ecdh_key(): ECDH_compute_key() failed')
- return ecdh_keybuffer.raw
-
- def get_ecdh_key(self, other_pubkey, kdf=lambda k: hashlib.sha256(k).digest()):
- # FIXME: be warned it's not clear what the kdf should be as a default
- r = self.get_raw_ecdh_key(other_pubkey)
- return kdf(r)
-
- def sign(self, hash, low_s = True):
- # FIXME: need unit tests for below cases
- if not isinstance(hash, bytes):
- raise TypeError('Hash must be bytes instance; got %r' % hash.__class__)
- if len(hash) != 32:
- raise ValueError('Hash must be exactly 32 bytes long')
-
- sig_size0 = ctypes.c_uint32()
- sig_size0.value = ssl.ECDSA_size(self.k)
- mb_sig = ctypes.create_string_buffer(sig_size0.value)
- result = ssl.ECDSA_sign(0, hash, len(hash), mb_sig, ctypes.byref(sig_size0), self.k)
- assert 1 == result
- assert mb_sig.raw[0] == 0x30
- assert mb_sig.raw[1] == sig_size0.value - 2
- total_size = mb_sig.raw[1]
- assert mb_sig.raw[2] == 2
- r_size = mb_sig.raw[3]
- assert mb_sig.raw[4 + r_size] == 2
- s_size = mb_sig.raw[5 + r_size]
- s_value = int.from_bytes(mb_sig.raw[6+r_size:6+r_size+s_size], byteorder='big')
- if (not low_s) or s_value <= SECP256K1_ORDER_HALF:
- return mb_sig.raw[:sig_size0.value]
- else:
- low_s_value = SECP256K1_ORDER - s_value
- low_s_bytes = (low_s_value).to_bytes(33, byteorder='big')
- while len(low_s_bytes) > 1 and low_s_bytes[0] == 0 and low_s_bytes[1] < 0x80:
- low_s_bytes = low_s_bytes[1:]
- new_s_size = len(low_s_bytes)
- new_total_size_byte = (total_size + new_s_size - s_size).to_bytes(1,byteorder='big')
- new_s_size_byte = (new_s_size).to_bytes(1,byteorder='big')
- return b'\x30' + new_total_size_byte + mb_sig.raw[2:5+r_size] + new_s_size_byte + low_s_bytes
-
- def verify(self, hash, sig):
- """Verify a DER signature"""
- return ssl.ECDSA_verify(0, hash, len(hash), sig, len(sig), self.k) == 1
-
- def set_compressed(self, compressed):
- if compressed:
- form = self.POINT_CONVERSION_COMPRESSED
+ """Construct an uninitialized public key"""
+ self.valid = False
+
+ def set(self, data):
+ """Construct a public key from a serialization in compressed or uncompressed format"""
+ if (len(data) == 65 and data[0] == 0x04):
+ p = (int.from_bytes(data[1:33], 'big'), int.from_bytes(data[33:65], 'big'), 1)
+ self.valid = SECP256K1.on_curve(p)
+ if self.valid:
+ self.p = p
+ self.compressed = False
+ elif (len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03)):
+ x = int.from_bytes(data[1:33], 'big')
+ if SECP256K1.is_x_coord(x):
+ p = SECP256K1.lift_x(x)
+ if (p[1] & 1) != (data[0] & 1):
+ p = SECP256K1.negate(p)
+ self.p = p
+ self.valid = True
+ self.compressed = True
+ else:
+ self.valid = False
else:
- form = self.POINT_CONVERSION_UNCOMPRESSED
- ssl.EC_KEY_set_conv_form(self.k, form)
-
+ self.valid = False
-class CPubKey(bytes):
- """An encapsulated public key
-
- Attributes:
+ @property
+ def is_compressed(self):
+ return self.compressed
- is_valid - Corresponds to CPubKey.IsValid()
- is_fullyvalid - Corresponds to CPubKey.IsFullyValid()
- is_compressed - Corresponds to CPubKey.IsCompressed()
- """
+ @property
+ def is_valid(self):
+ return self.valid
+
+ def get_bytes(self):
+ assert(self.valid)
+ p = SECP256K1.affine(self.p)
+ if p is None:
+ return None
+ if self.compressed:
+ return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big')
+ else:
+ return bytes([0x04]) + p[0].to_bytes(32, 'big') + p[1].to_bytes(32, 'big')
+
+ def verify_ecdsa(self, sig, msg, low_s=True):
+ """Verify a strictly DER-encoded ECDSA signature against this pubkey."""
+ assert(self.valid)
+ if (sig[1] + 2 != len(sig)):
+ return False
+ if (len(sig) < 4):
+ return False
+ if (sig[0] != 0x30):
+ return False
+ if (sig[2] != 0x02):
+ return False
+ rlen = sig[3]
+ if (len(sig) < 6 + rlen):
+ return False
+ if rlen < 1 or rlen > 33:
+ return False
+ if sig[4] >= 0x80:
+ return False
+ if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)):
+ return False
+ r = int.from_bytes(sig[4:4+rlen], 'big')
+ if (sig[4+rlen] != 0x02):
+ return False
+ slen = sig[5+rlen]
+ if slen < 1 or slen > 33:
+ return False
+ if (len(sig) != 6 + rlen + slen):
+ return False
+ if sig[6+rlen] >= 0x80:
+ return False
+ if (slen > 1 and (sig[6+rlen] == 0) and not (sig[7+rlen] & 0x80)):
+ return False
+ s = int.from_bytes(sig[6+rlen:6+rlen+slen], 'big')
+ if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER:
+ return False
+ if low_s and s >= SECP256K1_ORDER_HALF:
+ return False
+ z = int.from_bytes(msg, 'big')
+ w = modinv(s, SECP256K1_ORDER)
+ u1 = z*w % SECP256K1_ORDER
+ u2 = r*w % SECP256K1_ORDER
+ R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)]))
+ if R is None or R[0] != r:
+ return False
+ return True
+
+class ECKey():
+ """A secp256k1 private key"""
- def __new__(cls, buf, _cec_key=None):
- self = super(CPubKey, cls).__new__(cls, buf)
- if _cec_key is None:
- _cec_key = CECKey()
- self._cec_key = _cec_key
- self.is_fullyvalid = _cec_key.set_pubkey(self) != 0
- return self
+ def __init__(self):
+ self.valid = False
+
+ def set(self, secret, compressed):
+ """Construct a private key object with given 32-byte secret and compressed flag."""
+ assert(len(secret) == 32)
+ secret = int.from_bytes(secret, 'big')
+ self.valid = (secret > 0 and secret < SECP256K1_ORDER)
+ if self.valid:
+ self.secret = secret
+ self.compressed = compressed
+
+ def generate(self, compressed=True):
+ """Generate a random private key (compressed or uncompressed)."""
+ self.set(random.randrange(1, SECP256K1_ORDER).to_bytes(32, 'big'), compressed)
+
+ def get_bytes(self):
+ """Retrieve the 32-byte representation of this key."""
+ assert(self.valid)
+ return self.secret.to_bytes(32, 'big')
@property
def is_valid(self):
- return len(self) > 0
+ return self.valid
@property
def is_compressed(self):
- return len(self) == 33
-
- def verify(self, hash, sig):
- return self._cec_key.verify(hash, sig)
-
- def __str__(self):
- return repr(self)
-
- def __repr__(self):
- return '%s(%s)' % (self.__class__.__name__, super(CPubKey, self).__repr__())
+ return self.compressed
+ def get_pubkey(self):
+ """Compute an ECPubKey object for this secret key."""
+ assert(self.valid)
+ ret = ECPubKey()
+ p = SECP256K1.mul([(SECP256K1_G, self.secret)])
+ ret.p = p
+ ret.valid = True
+ ret.compressed = self.compressed
+ return ret
+
+ def sign_ecdsa(self, msg, low_s=True):
+ """Construct a DER-encoded ECDSA signature with this key."""
+ assert(self.valid)
+ z = int.from_bytes(msg, 'big')
+ # Note: no RFC6979, but a simple random nonce (some tests rely on distinct transactions for the same operation)
+ k = random.randrange(1, SECP256K1_ORDER)
+ R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)]))
+ r = R[0] % SECP256K1_ORDER
+ s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER
+ if low_s and s > SECP256K1_ORDER_HALF:
+ s = SECP256K1_ORDER - s
+ rb = r.to_bytes((r.bit_length() + 8) // 8, 'big')
+ sb = s.to_bytes((s.bit_length() + 8) // 8, 'big')
+ return b'\x30' + bytes([4 + len(rb) + len(sb), 2, len(rb)]) + rb + bytes([2, len(sb)]) + sb