From 213519bd9337bce33d8838146bc7c35f063886f6 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Wed, 25 Dec 2019 22:09:17 +0100 Subject: [PATCH 1/2] Blind chain signature verification - also adds serialisation, deserialisation and optional metadata --- src/pyelliptic/blindchain.py | 77 +++++++++++++++++++++++++++++++++ src/pyelliptic/eccblind.py | 82 +++++++++++++++++++++++++++++++++--- src/tests/test_blindsig.py | 48 ++++++++++++++++++++- 3 files changed, 200 insertions(+), 7 deletions(-) create mode 100644 src/pyelliptic/blindchain.py diff --git a/src/pyelliptic/blindchain.py b/src/pyelliptic/blindchain.py new file mode 100644 index 00000000..df1d3049 --- /dev/null +++ b/src/pyelliptic/blindchain.py @@ -0,0 +1,77 @@ +""" +Blind signature chain with a top level CA +""" + +from eccblind import ECCBlind + +try: + import msgpack +except ImportError: + try: + import umsgpack as msgpack + except ImportError: + import fallback.umsgpack.umsgpack as msgpack + +from eccblind import ECCBlind + +def encode_datetime(obj): + """ + Method to format time + """ + return {'Time': int(obj.strftime("%s"))} + + +class ECCBlindChain(object): # pylint: disable=too-many-instance-attributes + """ + # Class for ECC Blind Chain signature functionality + """ + chain = [] + ca = [] + + def __init__(self, chain=None): + if chain is not None: + self.chain = chain + + def serialize(self): + data = msgpack.packb(self.chain) + return data + + @staticmethod + def deserialize(self, chain): + """ + Deserialize the data using msgpack + """ + data = msgpack.unpackb(chain) + return ECCBlindChain(data) + + def add_level(self, pubkey, metadata, signature): + self.chain.append((pubkey, metadata, signature)) + + def add_ca(self, ca): + pubkey, metadata = ECCBlind.deserialize(ca) + self.ca.append(pubkey) + + def verify(self, msg, value): + lastpubkey = None + retval = False + for level in reversed(self.chain): + pubkey, metadata, signature = level + verifier_obj = ECCBlind(pubkey=pubkey, metadata) + if not lastpubkey: + retval = verifier_obj.verify(msg, signature, value) + else: + retval = verifier_obj.verify(lastpubkey, signature, value) + if not reval: + break + lastpubkey = pubkey + if retval: + retval = False + for ca in self.ca: + match = True + for i in range(4): + if lastpubkey[i] != ca[i]: + match = False + break + if match: + return True + return retval diff --git a/src/pyelliptic/eccblind.py b/src/pyelliptic/eccblind.py index 5b9b045e..6c85de25 100644 --- a/src/pyelliptic/eccblind.py +++ b/src/pyelliptic/eccblind.py @@ -10,9 +10,52 @@ http://www.isecure-journal.com/article_39171_47f9ec605dd3918c2793565ec21fcd7a.pd # variable names are based on the math in the paper, so they don't conform # to PEP8 +from hashlib import sha256 +import time + +try: + import msgpack +except ImportError: + try: + import umsgpack as msgpack + except ImportError: + import fallback.umsgpack.umsgpack as msgpack + from .openssl import OpenSSL +class Metadata(object): + """ + Pubkey metadata + """ + def __init__(self, exp=0, value=0): + self.exp = 0 + self.value = 0 + if exp: + self.exp = exp + if value: + self.value = value + + def serialize(self): + if self.exp or self.value: + return [self.exp, self.value] + else: + return [] + + @staticmethod + def deserialize(self, data): + exp, value = data + return Medatadata(exp, value) + + def verify(self, value): + if self.value and value > self.value: + return False + if self.exp: + if time.time() > self.exp: + return False + return True + + class ECCBlind(object): # pylint: disable=too-many-instance-attributes """ Class for ECC blind signature functionality @@ -75,7 +118,21 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes OpenSSL.EC_POINT_get_affine_coordinates_GFp(group, F, x0, y0, ctx) return x0 - def __init__(self, curve="secp256k1", pubkey=None): + @staticmethod + def deserialize(self, data): + pubkey_deserialized, meta = msgpack.unpackb(data) + if meta: + obj = ECCBlind(pubkey=pubkey_deserialized, metadata=meta) + else: + obj = ECCBlind(pubkey=pubkey_deserialized) + return obj + + def serialize(self): + data = (self.pubkey, self.metadata.serialize) + retval = msgpack.packb(data) + return retval + + def __init__(self, curve="secp256k1", pubkey=None, metadata=None): self.ctx = OpenSSL.BN_CTX_new() if pubkey: @@ -101,6 +158,14 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes self.iO = OpenSSL.EC_POINT_new(self.group) OpenSSL.EC_POINT_set_to_infinity(self.group, self.iO) + if metadata: + self._set_metadata(self, metadata) + else: + self.metadata = Metadata() + + def _set_metadata(self, metadata): + self.metadata = Metadata.deserialise(metadata) + def signer_init(self): """ Init signer @@ -119,6 +184,7 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes Requester creates a new signing request """ self.R = R + msghash = sha256(msg) # Requester: 3 random blinding factors self.F = OpenSSL.EC_POINT_new(self.group) @@ -151,7 +217,7 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes # Requester: Blinding (m' = br(m) + a) self.m = OpenSSL.BN_new() - OpenSSL.BN_bin2bn(msg, len(msg), self.m) + OpenSSL.BN_bin2bn(msghash, len(msghash), self.m) self.m_ = OpenSSL.BN_new() OpenSSL.BN_mod_mul(self.m_, self.b, self.r, self.n, self.ctx) @@ -180,14 +246,15 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes self.signature = (s, self.F) return self.signature - def verify(self, msg, signature): + def verify(self, msg, signature, value=0): """ Verify signature with certifier's pubkey """ # convert msg to BIGNUM self.m = OpenSSL.BN_new() - OpenSSL.BN_bin2bn(msg, len(msg), self.m) + msghash = sha256(msg) + OpenSSL.BN_bin2bn(msghash, len(msghash), self.m) # init s, self.F = signature @@ -206,5 +273,8 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes retval = OpenSSL.EC_POINT_cmp(self.group, lhs, rhs, self.ctx) if retval == -1: raise RuntimeError("EC_POINT_cmp returned an error") - else: - return retval == 0 + elif retval != 0: + return False + elif self.metadata: + return self.metadata.verify(value) + return True diff --git a/src/tests/test_blindsig.py b/src/tests/test_blindsig.py index b8d0248a..4760ad56 100644 --- a/src/tests/test_blindsig.py +++ b/src/tests/test_blindsig.py @@ -2,10 +2,12 @@ Test for ECC blind signatures """ import os +import time import unittest from ctypes import cast, c_char_p -from pybitmessage.pyelliptic.eccblind import ECCBlind +from pybitmessage.pyelliptic.eccblind import ECCBlind, Metadata +from pybitmessage.pyelliptic.eccblindchain import ECCBlindChain from pybitmessage.pyelliptic.openssl import OpenSSL @@ -50,3 +52,47 @@ class TestBlindSig(unittest.TestCase): # (5) Verification verifier_obj = ECCBlind(pubkey=signer_obj.pubkey) self.assertTrue(verifier_obj.verify(msg, signature)) + + # Serialization and deserialisation + pk = signer_obj.serialize() + pko = ECCBlind.deserialize(pk) + self.assertTrue(pko.verify(msg, signature)) + + def test_blind_sig_chain(self): + """Test blind signature chain using a random certifier key and a random message""" + + test_levels = 5 + value = 1 + msg = os.urandom(1024) + + chain = ECCBlindChain() + ca = ECCBlind() + signer_obj = ca + signer_pubkey = signer_obj.serialize() + + for level in range(test_levels): + if level == 0: + metadata = Metadata(exp=int(time.time()) + 100, + value=value).serialize() + requester_obj = ECCBlind(pubkey=signer_obj.pubkey, + metadata=metadata) + else: + requester_obj = ECCBlind(pubkey=signer_obj.pubkey) + point_r = signer_obj.signer_init() + + if level == test_levels - 1: + msg_blinded = requester_obj.create_signing_request(point_r, + msg) + else: + msg_blinded = requester.obj.create_signing_request(point_r, + signer_pubkey) + signature_blinded = signer_obj.blind_sign(msg_blinded) + signature = requester_obj.unblind(signature_blinded) + chain.add_level(signer_obj.pubkey, + signer_obj.metadata.serialize, + signature) + signer_obj = requester_obj + signer_pubkey = requester_obj.serialize() + sigchain = chain.serialize() + verifychain = ECCBlindChain.deserialize(sigchain) + self.assertTrue(verifychain.verify(msg, value)) -- 2.45.1 From ff1f451691c3c66fe15efa7a5e84d4b1f8978247 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sun, 29 Mar 2020 20:51:55 +0800 Subject: [PATCH 2/2] Blind signature updates - added serializing and deserializing - added a signature chain class `ECCBlindSigChain` - added more tests --- src/pyelliptic/__init__.py | 2 + src/pyelliptic/blindchain.py | 77 -------- src/pyelliptic/eccblind.py | 312 +++++++++++++++++++++----------- src/pyelliptic/eccblindchain.py | 52 ++++++ src/pyelliptic/openssl.py | 82 ++++++++- src/tests/test_blindsig.py | 264 ++++++++++++++++++++++----- src/tests/test_openssl.py | 54 ++++++ 7 files changed, 607 insertions(+), 236 deletions(-) delete mode 100644 src/pyelliptic/blindchain.py create mode 100644 src/pyelliptic/eccblindchain.py create mode 100644 src/tests/test_openssl.py diff --git a/src/pyelliptic/__init__.py b/src/pyelliptic/__init__.py index dbc1b2af..cafa89c9 100644 --- a/src/pyelliptic/__init__.py +++ b/src/pyelliptic/__init__.py @@ -12,6 +12,7 @@ This is an abandoned package maintained inside of the PyBitmessage. from .cipher import Cipher from .ecc import ECC from .eccblind import ECCBlind +from .eccblindchain import ECCBlindChain from .hash import hmac_sha256, hmac_sha512, pbkdf2 from .openssl import OpenSSL @@ -21,6 +22,7 @@ __all__ = [ 'OpenSSL', 'ECC', 'ECCBlind', + 'ECCBlindChain', 'Cipher', 'hmac_sha256', 'hmac_sha512', diff --git a/src/pyelliptic/blindchain.py b/src/pyelliptic/blindchain.py deleted file mode 100644 index df1d3049..00000000 --- a/src/pyelliptic/blindchain.py +++ /dev/null @@ -1,77 +0,0 @@ -""" -Blind signature chain with a top level CA -""" - -from eccblind import ECCBlind - -try: - import msgpack -except ImportError: - try: - import umsgpack as msgpack - except ImportError: - import fallback.umsgpack.umsgpack as msgpack - -from eccblind import ECCBlind - -def encode_datetime(obj): - """ - Method to format time - """ - return {'Time': int(obj.strftime("%s"))} - - -class ECCBlindChain(object): # pylint: disable=too-many-instance-attributes - """ - # Class for ECC Blind Chain signature functionality - """ - chain = [] - ca = [] - - def __init__(self, chain=None): - if chain is not None: - self.chain = chain - - def serialize(self): - data = msgpack.packb(self.chain) - return data - - @staticmethod - def deserialize(self, chain): - """ - Deserialize the data using msgpack - """ - data = msgpack.unpackb(chain) - return ECCBlindChain(data) - - def add_level(self, pubkey, metadata, signature): - self.chain.append((pubkey, metadata, signature)) - - def add_ca(self, ca): - pubkey, metadata = ECCBlind.deserialize(ca) - self.ca.append(pubkey) - - def verify(self, msg, value): - lastpubkey = None - retval = False - for level in reversed(self.chain): - pubkey, metadata, signature = level - verifier_obj = ECCBlind(pubkey=pubkey, metadata) - if not lastpubkey: - retval = verifier_obj.verify(msg, signature, value) - else: - retval = verifier_obj.verify(lastpubkey, signature, value) - if not reval: - break - lastpubkey = pubkey - if retval: - retval = False - for ca in self.ca: - match = True - for i in range(4): - if lastpubkey[i] != ca[i]: - match = False - break - if match: - return True - return retval diff --git a/src/pyelliptic/eccblind.py b/src/pyelliptic/eccblind.py index 6c85de25..a417451e 100644 --- a/src/pyelliptic/eccblind.py +++ b/src/pyelliptic/eccblind.py @@ -10,50 +10,69 @@ http://www.isecure-journal.com/article_39171_47f9ec605dd3918c2793565ec21fcd7a.pd # variable names are based on the math in the paper, so they don't conform # to PEP8 -from hashlib import sha256 import time - -try: - import msgpack -except ImportError: - try: - import umsgpack as msgpack - except ImportError: - import fallback.umsgpack.umsgpack as msgpack +from hashlib import sha256 +from struct import pack, unpack from .openssl import OpenSSL +# first byte in serialisation can contain data +Y_BIT = 0x01 +COMPRESSED_BIT = 0x02 -class Metadata(object): - """ - Pubkey metadata - """ - def __init__(self, exp=0, value=0): - self.exp = 0 - self.value = 0 - if exp: - self.exp = exp - if value: - self.value = value +# formats +BIGNUM = '!32s' +EC = '!B32s' +PUBKEY = '!BB33s' + + +class Expiration(object): + """Expiration of pubkey""" + @staticmethod + def deserialize(val): + """Create an object out of int""" + year = ((val & 0xF0) >> 4) + 2020 + month = val & 0x0F + assert month < 12 + return Expiration(year, month) + + def __init__(self, year, month): + assert isinstance(year, int) + assert year > 2019 and year < 2036 + assert isinstance(month, int) + assert month < 12 + self.year = year + self.month = month + self.exp = year + month / 12.0 def serialize(self): - if self.exp or self.value: - return [self.exp, self.value] - else: - return [] + """Make int out of object""" + return ((self.year - 2020) << 4) + self.month + def verify(self): + """Check if the pubkey has expired""" + now = time.gmtime() + return self.exp >= now.tm_year + (now.tm_mon - 1) / 12.0 + + +class Value(object): + """Value of a pubkey""" @staticmethod - def deserialize(self, data): - exp, value = data - return Medatadata(exp, value) + def deserialize(val): + """Make object out of int""" + return Value(val) + + def __init__(self, value=0xFF): + assert isinstance(value, int) + self.value = value + + def serialize(self): + """Make int out of object""" + return self.value & 0xFF def verify(self, value): - if self.value and value > self.value: - return False - if self.exp: - if time.time() > self.exp: - return False - return True + """Verify against supplied value""" + return value <= self.value class ECCBlind(object): # pylint: disable=too-many-instance-attributes @@ -64,8 +83,8 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes # init k = None R = None - keypair = None F = None + d = None Q = None a = None b = None @@ -76,115 +95,183 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes m_ = None s_ = None signature = None + exp = None + val = None - @staticmethod - def ec_get_random(group, ctx): + def ec_get_random(self): """ - Random point from finite field + Random integer within the EC order """ - order = OpenSSL.BN_new() - OpenSSL.EC_GROUP_get_order(group, order, ctx) - OpenSSL.BN_rand(order, OpenSSL.BN_num_bits(order), 0, 0) - return order + randomnum = OpenSSL.BN_new() + OpenSSL.BN_rand(randomnum, OpenSSL.BN_num_bits(self.n), 0, 0) + return randomnum - @staticmethod - def ec_invert(group, a, ctx): + def ec_invert(self, a): """ ECC inversion """ - order = OpenSSL.BN_new() - OpenSSL.EC_GROUP_get_order(group, order, ctx) - inverse = OpenSSL.BN_mod_inverse(0, a, order, ctx) + inverse = OpenSSL.BN_mod_inverse(0, a, self.n, self.ctx) return inverse - @staticmethod - def ec_gen_keypair(group, ctx): + def ec_gen_keypair(self): """ Generate an ECC keypair + We're using compressed keys """ - d = ECCBlind.ec_get_random(group, ctx) - Q = OpenSSL.EC_POINT_new(group) - OpenSSL.EC_POINT_mul(group, Q, d, 0, 0, 0) + d = self.ec_get_random() + Q = OpenSSL.EC_POINT_new(self.group) + OpenSSL.EC_POINT_mul(self.group, Q, d, 0, 0, 0) return (d, Q) - @staticmethod - def ec_Ftor(F, group, ctx): + def ec_Ftor(self, F): """ x0 coordinate of F """ # F = (x0, y0) x0 = OpenSSL.BN_new() y0 = OpenSSL.BN_new() - OpenSSL.EC_POINT_get_affine_coordinates_GFp(group, F, x0, y0, ctx) + OpenSSL.EC_POINT_get_affine_coordinates(self.group, F, x0, y0, self.ctx) + OpenSSL.BN_free(y0) return x0 - @staticmethod - def deserialize(self, data): - pubkey_deserialized, meta = msgpack.unpackb(data) - if meta: - obj = ECCBlind(pubkey=pubkey_deserialized, metadata=meta) - else: - obj = ECCBlind(pubkey=pubkey_deserialized) - return obj + def _ec_point_serialize(self, point): + """Make an EC point into a string""" + try: + x = OpenSSL.BN_new() + y = OpenSSL.BN_new() + OpenSSL.EC_POINT_get_affine_coordinates( + self.group, point, x, y, 0) + y_byte = (OpenSSL.BN_is_odd(y) & Y_BIT) | COMPRESSED_BIT + l_ = OpenSSL.BN_num_bytes(self.n) + try: + bx = OpenSSL.malloc(0, l_) + OpenSSL.BN_bn2binpad(x, bx, l_) + out = bx.raw + except AttributeError: + # padding manually + bx = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(x)) + OpenSSL.BN_bn2bin(x, bx) + out = bx.raw.rjust(l_, chr(0)) + return pack(EC, y_byte, out) - def serialize(self): - data = (self.pubkey, self.metadata.serialize) - retval = msgpack.packb(data) + finally: + OpenSSL.BN_clear_free(x) + OpenSSL.BN_clear_free(y) + + def _ec_point_deserialize(self, data): + """Make a string into an EC point""" + y_bit, x_raw = unpack(EC, data) + x = OpenSSL.BN_bin2bn(x_raw, OpenSSL.BN_num_bytes(self.n), 0) + y_bit &= Y_BIT + retval = OpenSSL.EC_POINT_new(self.group) + OpenSSL.EC_POINT_set_compressed_coordinates(self.group, + retval, + x, + y_bit, + self.ctx) return retval - def __init__(self, curve="secp256k1", pubkey=None, metadata=None): + def _bn_serialize(self, bn): + """Make a string out of BigNum""" + l_ = OpenSSL.BN_num_bytes(self.n) + try: + o = OpenSSL.malloc(0, l_) + OpenSSL.BN_bn2binpad(bn, o, l_) + return o.raw + except AttributeError: + o = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(bn)) + OpenSSL.BN_bn2bin(bn, o) + return o.raw.rjust(l_, chr(0)) + + def _bn_deserialize(self, data): + """Make a BigNum out of string""" + x = OpenSSL.BN_bin2bn(data, OpenSSL.BN_num_bytes(self.n), 0) + return x + + def _init_privkey(self, privkey): + """Initialise private key out of string/bytes""" + self.d = self._bn_deserialize(privkey) + + def privkey(self): + """Make a private key into a string""" + return pack(BIGNUM, self.d) + + def _init_pubkey(self, pubkey): + """Initialise pubkey out of string/bytes""" + unpacked = unpack(PUBKEY, pubkey) + self.expiration = Expiration.deserialize(unpacked[0]) + self.value = Value.deserialize(unpacked[1]) + self.Q = self._ec_point_deserialize(unpacked[2]) + + def pubkey(self): + """Make a pubkey into a string""" + return pack(PUBKEY, self.expiration.serialize(), + self.value.serialize(), + self._ec_point_serialize(self.Q)) + + def __init__(self, curve="secp256k1", pubkey=None, privkey=None, # pylint: disable=too-many-arguments + year=2025, month=11, value=0xFF): self.ctx = OpenSSL.BN_CTX_new() - if pubkey: - self.group, self.G, self.n, self.Q = pubkey - else: - self.group = OpenSSL.EC_GROUP_new_by_curve_name( - OpenSSL.get_curve(curve)) - # Order n - self.n = OpenSSL.BN_new() - OpenSSL.EC_GROUP_get_order(self.group, self.n, self.ctx) + # ECC group + self.group = OpenSSL.EC_GROUP_new_by_curve_name( + OpenSSL.get_curve(curve)) - # Generator G - self.G = OpenSSL.EC_GROUP_get0_generator(self.group) + # Order n + self.n = OpenSSL.BN_new() + OpenSSL.EC_GROUP_get_order(self.group, self.n, self.ctx) - # new keypair - self.keypair = ECCBlind.ec_gen_keypair(self.group, self.ctx) - - self.Q = self.keypair[1] - - self.pubkey = (self.group, self.G, self.n, self.Q) + # Generator G + self.G = OpenSSL.EC_GROUP_get0_generator(self.group) # Identity O (infinity) self.iO = OpenSSL.EC_POINT_new(self.group) OpenSSL.EC_POINT_set_to_infinity(self.group, self.iO) - if metadata: - self._set_metadata(self, metadata) - else: - self.metadata = Metadata() + if privkey: + assert pubkey + # load both pubkey and privkey from bytes + self._init_privkey(privkey) + self._init_pubkey(pubkey) + elif pubkey: + # load pubkey from bytes + self._init_pubkey(pubkey) + else: + # new keypair + self.d, self.Q = self.ec_gen_keypair() + if not year or not month: + now = time.gmtime() + if now.tm_mon == 12: + self.expiration = Expiration(now.tm_year + 1, 1) + else: + self.expiration = Expiration(now.tm_year, now.tm_mon + 1) + else: + self.expiration = Expiration(year, month) + self.value = Value(value) - def _set_metadata(self, metadata): - self.metadata = Metadata.deserialise(metadata) + def __del__(self): + OpenSSL.BN_free(self.n) + OpenSSL.BN_CTX_free(self.ctx) def signer_init(self): """ Init signer """ # Signer: Random integer k - self.k = ECCBlind.ec_get_random(self.group, self.ctx) + self.k = self.ec_get_random() # R = kG self.R = OpenSSL.EC_POINT_new(self.group) OpenSSL.EC_POINT_mul(self.group, self.R, self.k, 0, 0, 0) - return self.R + return self._ec_point_serialize(self.R) def create_signing_request(self, R, msg): """ Requester creates a new signing request """ - self.R = R - msghash = sha256(msg) + self.R = self._ec_point_deserialize(R) + msghash = sha256(msg).digest() # Requester: 3 random blinding factors self.F = OpenSSL.EC_POINT_new(self.group) @@ -194,12 +281,12 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes # F != O while OpenSSL.EC_POINT_cmp(self.group, self.F, self.iO, self.ctx) == 0: - self.a = ECCBlind.ec_get_random(self.group, self.ctx) - self.b = ECCBlind.ec_get_random(self.group, self.ctx) - self.c = ECCBlind.ec_get_random(self.group, self.ctx) + self.a = self.ec_get_random() + self.b = self.ec_get_random() + self.c = self.ec_get_random() # F = b^-1 * R... - self.binv = ECCBlind.ec_invert(self.group, self.b, self.ctx) + self.binv = self.ec_invert(self.b) OpenSSL.EC_POINT_mul(self.group, temp, 0, self.R, self.binv, 0) OpenSSL.EC_POINT_copy(self.F, temp) @@ -213,7 +300,7 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes OpenSSL.EC_POINT_add(self.group, self.F, self.F, temp, 0) # F = (x0, y0) - self.r = ECCBlind.ec_Ftor(self.F, self.group, self.ctx) + self.r = self.ec_Ftor(self.F) # Requester: Blinding (m' = br(m) + a) self.m = OpenSSL.BN_new() @@ -223,43 +310,48 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes OpenSSL.BN_mod_mul(self.m_, self.b, self.r, self.n, self.ctx) OpenSSL.BN_mod_mul(self.m_, self.m_, self.m, self.n, self.ctx) OpenSSL.BN_mod_add(self.m_, self.m_, self.a, self.n, self.ctx) - return self.m_ + return self._bn_serialize(self.m_) def blind_sign(self, m_): """ Signer blind-signs the request """ - self.m_ = m_ + self.m_ = self._bn_deserialize(m_) self.s_ = OpenSSL.BN_new() - OpenSSL.BN_mod_mul(self.s_, self.keypair[0], self.m_, self.n, self.ctx) + OpenSSL.BN_mod_mul(self.s_, self.d, self.m_, self.n, self.ctx) OpenSSL.BN_mod_add(self.s_, self.s_, self.k, self.n, self.ctx) - return self.s_ + OpenSSL.BN_free(self.k) + return self._bn_serialize(self.s_) def unblind(self, s_): """ Requester unblinds the signature """ - self.s_ = s_ + self.s_ = self._bn_deserialize(s_) s = OpenSSL.BN_new() OpenSSL.BN_mod_mul(s, self.binv, self.s_, self.n, self.ctx) OpenSSL.BN_mod_add(s, s, self.c, self.n, self.ctx) + OpenSSL.BN_free(self.a) + OpenSSL.BN_free(self.b) + OpenSSL.BN_free(self.c) self.signature = (s, self.F) - return self.signature + return self._bn_serialize(s) + self._ec_point_serialize(self.F) - def verify(self, msg, signature, value=0): + def verify(self, msg, signature, value=1): """ Verify signature with certifier's pubkey """ # convert msg to BIGNUM self.m = OpenSSL.BN_new() - msghash = sha256(msg) + msghash = sha256(msg).digest() OpenSSL.BN_bin2bn(msghash, len(msghash), self.m) # init - s, self.F = signature + s, self.F = (self._bn_deserialize(signature[0:32]), + self._ec_point_deserialize(signature[32:])) if self.r is None: - self.r = ECCBlind.ec_Ftor(self.F, self.group, self.ctx) + self.r = self.ec_Ftor(self.F) lhs = OpenSSL.EC_POINT_new(self.group) rhs = OpenSSL.EC_POINT_new(self.group) @@ -273,8 +365,10 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes retval = OpenSSL.EC_POINT_cmp(self.group, lhs, rhs, self.ctx) if retval == -1: raise RuntimeError("EC_POINT_cmp returned an error") + elif not self.value.verify(value): + return False + elif not self.expiration.verify(): + return False elif retval != 0: return False - elif self.metadata: - return self.metadata.verify(value) return True diff --git a/src/pyelliptic/eccblindchain.py b/src/pyelliptic/eccblindchain.py new file mode 100644 index 00000000..56e8ce2a --- /dev/null +++ b/src/pyelliptic/eccblindchain.py @@ -0,0 +1,52 @@ +""" +Blind signature chain with a top level CA +""" + +from .eccblind import ECCBlind + + +class ECCBlindChain(object): # pylint: disable=too-few-public-methods + """ + # Class for ECC Blind Chain signature functionality + """ + + def __init__(self, ca=None, chain=None): + self.chain = [] + self.ca = [] + if ca: + for i in range(0, len(ca), 35): + self.ca.append(ca[i:i + 35]) + if chain: + self.chain.append(chain[0:35]) + for i in range(35, len(chain), 100): + if len(chain[i:]) == 65: + self.chain.append(chain[i:i + 65]) + else: + self.chain.append(chain[i:i + 100]) + + def verify(self, msg, value): + """Verify a chain provides supplied message and value""" + parent = None + l_ = 0 + for level in self.chain: + l_ += 1 + pubkey = None + signature = None + if len(level) == 100: + pubkey, signature = (level[0:35], level[35:]) + elif len(level) == 35: + if level not in self.ca: + return False + parent = level + continue + else: + signature = level + verifier_obj = ECCBlind(pubkey=parent) + if pubkey: + if not verifier_obj.verify(pubkey, signature, value): + return False + parent = pubkey + else: + return verifier_obj.verify(msg=msg, signature=signature, + value=value) + return None diff --git a/src/pyelliptic/openssl.py b/src/pyelliptic/openssl.py index 4933ac44..17a8d6d1 100644 --- a/src/pyelliptic/openssl.py +++ b/src/pyelliptic/openssl.py @@ -8,6 +8,7 @@ needed openssl functionality in class _OpenSSL. """ import ctypes import sys + # pylint: disable=protected-access OpenSSL = None @@ -97,6 +98,10 @@ class _OpenSSL(object): self.BN_free.restype = None self.BN_free.argtypes = [ctypes.c_void_p] + self.BN_clear_free = self._lib.BN_clear_free + self.BN_clear_free.restype = None + self.BN_clear_free.argtypes = [ctypes.c_void_p] + self.BN_num_bits = self._lib.BN_num_bits self.BN_num_bits.restype = ctypes.c_int self.BN_num_bits.argtypes = [ctypes.c_void_p] @@ -105,6 +110,15 @@ class _OpenSSL(object): self.BN_bn2bin.restype = ctypes.c_int self.BN_bn2bin.argtypes = [ctypes.c_void_p, ctypes.c_void_p] + try: + self.BN_bn2binpad = self._lib.BN_bn2binpad + self.BN_bn2binpad.restype = ctypes.c_int + self.BN_bn2binpad.argtypes = [ctypes.c_void_p, ctypes.c_void_p, + ctypes.c_int] + except AttributeError: + # optional, we have a workaround + pass + self.BN_bin2bn = self._lib.BN_bin2bn self.BN_bin2bn.restype = ctypes.c_void_p self.BN_bin2bn.argtypes = [ctypes.c_void_p, ctypes.c_int, @@ -147,6 +161,20 @@ class _OpenSSL(object): ctypes.c_void_p, ctypes.c_void_p] + try: + self.EC_POINT_get_affine_coordinates = \ + self._lib.EC_POINT_get_affine_coordinates + except AttributeError: + # OpenSSL docs say only use this for backwards compatibility + self.EC_POINT_get_affine_coordinates = \ + self._lib.EC_POINT_get_affine_coordinates_GF2m + self.EC_POINT_get_affine_coordinates.restype = ctypes.c_int + self.EC_POINT_get_affine_coordinates.argtypes = [ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p] + self.EC_KEY_set_private_key = self._lib.EC_KEY_set_private_key self.EC_KEY_set_private_key.restype = ctypes.c_int self.EC_KEY_set_private_key.argtypes = [ctypes.c_void_p, @@ -171,6 +199,34 @@ class _OpenSSL(object): ctypes.c_void_p, ctypes.c_void_p] + try: + self.EC_POINT_set_affine_coordinates = \ + self._lib.EC_POINT_set_affine_coordinates + except AttributeError: + # OpenSSL docs say only use this for backwards compatibility + self.EC_POINT_set_affine_coordinates = \ + self._lib.EC_POINT_set_affine_coordinates_GF2m + self.EC_POINT_set_affine_coordinates.restype = ctypes.c_int + self.EC_POINT_set_affine_coordinates.argtypes = [ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p] + + try: + self.EC_POINT_set_compressed_coordinates = \ + self._lib.EC_POINT_set_compressed_coordinates + except AttributeError: + # OpenSSL docs say only use this for backwards compatibility + self.EC_POINT_set_compressed_coordinates = \ + self._lib.EC_POINT_set_compressed_coordinates_GF2m + self.EC_POINT_set_compressed_coordinates.restype = ctypes.c_int + self.EC_POINT_set_compressed_coordinates.argtypes = [ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_int, + ctypes.c_void_p] + self.EC_POINT_new = self._lib.EC_POINT_new self.EC_POINT_new.restype = ctypes.c_void_p self.EC_POINT_new.argtypes = [ctypes.c_void_p] @@ -215,10 +271,6 @@ class _OpenSSL(object): self._lib.ECDH_set_method.argtypes = [ctypes.c_void_p, ctypes.c_void_p] - self.BN_CTX_new = self._lib.BN_CTX_new - self._lib.BN_CTX_new.restype = ctypes.c_void_p - self._lib.BN_CTX_new.argtypes = [] - self.ECDH_compute_key = self._lib.ECDH_compute_key self.ECDH_compute_key.restype = ctypes.c_int self.ECDH_compute_key.argtypes = [ctypes.c_void_p, @@ -477,13 +529,19 @@ class _OpenSSL(object): self.BN_cmp.argtypes = [ctypes.c_void_p, ctypes.c_void_p] + try: + self.BN_is_odd = self._lib.BN_is_odd + self.BN_is_odd.restype = ctypes.c_int + self.BN_is_odd.argtypes = [ctypes.c_void_p] + except AttributeError: + # OpenSSL 1.1.0 implements this as a function, but earlier + # versions as macro, so we need to workaround + self.BN_is_odd = self.BN_is_odd_compatible + self.BN_bn2dec = self._lib.BN_bn2dec self.BN_bn2dec.restype = ctypes.c_char_p self.BN_bn2dec.argtypes = [ctypes.c_void_p] - self.BN_CTX_free = self._lib.BN_CTX_free - self.BN_CTX_free.argtypes = [ctypes.c_void_p] - self.EC_GROUP_new_by_curve_name = self._lib.EC_GROUP_new_by_curve_name self.EC_GROUP_new_by_curve_name.restype = ctypes.c_void_p self.EC_GROUP_new_by_curve_name.argtypes = [ctypes.c_int] @@ -600,6 +658,16 @@ class _OpenSSL(object): """ return int((self.BN_num_bits(x) + 7) / 8) + def BN_is_odd_compatible(self, x): + """ + returns if BN is odd + we assume big endianness, and that BN is initialised + """ + length = self.BN_num_bytes(x) + data = self.malloc(0, length) + OpenSSL.BN_bn2bin(x, data) + return ord(data[length - 1]) & 1 + def get_cipher(self, name): """ returns the OpenSSL cipher instance diff --git a/src/tests/test_blindsig.py b/src/tests/test_blindsig.py index 4760ad56..cae16191 100644 --- a/src/tests/test_blindsig.py +++ b/src/tests/test_blindsig.py @@ -2,14 +2,15 @@ Test for ECC blind signatures """ import os -import time import unittest -from ctypes import cast, c_char_p +from hashlib import sha256 -from pybitmessage.pyelliptic.eccblind import ECCBlind, Metadata +from pybitmessage.pyelliptic.eccblind import ECCBlind from pybitmessage.pyelliptic.eccblindchain import ECCBlindChain from pybitmessage.pyelliptic.openssl import OpenSSL +# pylint: disable=protected-access + class TestBlindSig(unittest.TestCase): """ @@ -21,78 +22,255 @@ class TestBlindSig(unittest.TestCase): # (1) Initialization signer_obj = ECCBlind() point_r = signer_obj.signer_init() + self.assertEqual(len(signer_obj.pubkey()), 35) # (2) Request - requester_obj = ECCBlind(pubkey=signer_obj.pubkey) + requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) # only 64 byte messages are planned to be used in Bitmessage msg = os.urandom(64) msg_blinded = requester_obj.create_signing_request(point_r, msg) + self.assertEqual(len(msg_blinded), 32) # check - msg_blinded_str = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(msg_blinded)) - OpenSSL.BN_bn2bin(msg_blinded, msg_blinded_str) - self.assertNotEqual(msg, cast(msg_blinded_str, c_char_p).value) + self.assertNotEqual(sha256(msg).digest(), msg_blinded) # (3) Signature Generation signature_blinded = signer_obj.blind_sign(msg_blinded) + assert isinstance(signature_blinded, str) + self.assertEqual(len(signature_blinded), 32) # (4) Extraction signature = requester_obj.unblind(signature_blinded) + assert isinstance(signature, str) + self.assertEqual(len(signature), 65) - # check - signature_blinded_str = OpenSSL.malloc(0, - OpenSSL.BN_num_bytes( - signature_blinded)) - signature_str = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(signature[0])) - OpenSSL.BN_bn2bin(signature_blinded, signature_blinded_str) - OpenSSL.BN_bn2bin(signature[0], signature_str) - self.assertNotEqual(cast(signature_str, c_char_p).value, - cast(signature_blinded_str, c_char_p).value) + self.assertNotEqual(signature, signature_blinded) # (5) Verification - verifier_obj = ECCBlind(pubkey=signer_obj.pubkey) + verifier_obj = ECCBlind(pubkey=signer_obj.pubkey()) self.assertTrue(verifier_obj.verify(msg, signature)) - # Serialization and deserialisation - pk = signer_obj.serialize() - pko = ECCBlind.deserialize(pk) - self.assertTrue(pko.verify(msg, signature)) + def test_is_odd(self): + """Test our implementation of BN_is_odd""" + for _ in range(1024): + obj = ECCBlind() + x = OpenSSL.BN_new() + y = OpenSSL.BN_new() + OpenSSL.EC_POINT_get_affine_coordinates( + obj.group, obj.Q, x, y, 0) + self.assertEqual(OpenSSL.BN_is_odd(y), + OpenSSL.BN_is_odd_compatible(y)) - def test_blind_sig_chain(self): + def test_serialize_ec_point(self): + """Test EC point serialization/deserialization""" + for _ in range(1024): + try: + obj = ECCBlind() + obj2 = ECCBlind() + randompoint = obj.Q + serialized = obj._ec_point_serialize(randompoint) + secondpoint = obj2._ec_point_deserialize(serialized) + x0 = OpenSSL.BN_new() + y0 = OpenSSL.BN_new() + OpenSSL.EC_POINT_get_affine_coordinates(obj.group, + randompoint, x0, + y0, obj.ctx) + x1 = OpenSSL.BN_new() + y1 = OpenSSL.BN_new() + OpenSSL.EC_POINT_get_affine_coordinates(obj2.group, + secondpoint, x1, + y1, obj2.ctx) + + self.assertEqual(OpenSSL.BN_cmp(y0, y1), 0) + self.assertEqual(OpenSSL.BN_cmp(x0, x1), 0) + self.assertEqual(OpenSSL.EC_POINT_cmp(obj.group, randompoint, + secondpoint, 0), 0) + finally: + OpenSSL.BN_free(x0) + OpenSSL.BN_free(x1) + OpenSSL.BN_free(y0) + OpenSSL.BN_free(y1) + del obj + del obj2 + + def test_serialize_bn(self): + """Test Bignum serialization/deserialization""" + for _ in range(1024): + obj = ECCBlind() + obj2 = ECCBlind() + randomnum = obj.d + serialized = obj._bn_serialize(randomnum) + secondnum = obj2._bn_deserialize(serialized) + self.assertEqual(OpenSSL.BN_cmp(randomnum, secondnum), 0) + + def test_blind_sig_many(self): + """Test a lot of blind signatures""" + for _ in range(1024): + self.test_blind_sig() + + def test_blind_sig_value(self): + """Test blind signature value checking""" + signer_obj = ECCBlind(value=5) + point_r = signer_obj.signer_init() + requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) + msg = os.urandom(64) + msg_blinded = requester_obj.create_signing_request(point_r, msg) + signature_blinded = signer_obj.blind_sign(msg_blinded) + signature = requester_obj.unblind(signature_blinded) + verifier_obj = ECCBlind(pubkey=signer_obj.pubkey()) + self.assertFalse(verifier_obj.verify(msg, signature, value=8)) + + def test_blind_sig_expiration(self): + """Test blind signature expiration checking""" + signer_obj = ECCBlind(year=2020, month=1) + point_r = signer_obj.signer_init() + requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) + msg = os.urandom(64) + msg_blinded = requester_obj.create_signing_request(point_r, msg) + signature_blinded = signer_obj.blind_sign(msg_blinded) + signature = requester_obj.unblind(signature_blinded) + verifier_obj = ECCBlind(pubkey=signer_obj.pubkey()) + self.assertFalse(verifier_obj.verify(msg, signature)) + + def test_blind_sig_chain(self): # pylint: disable=too-many-locals """Test blind signature chain using a random certifier key and a random message""" - test_levels = 5 - value = 1 + test_levels = 4 msg = os.urandom(1024) - chain = ECCBlindChain() ca = ECCBlind() signer_obj = ca - signer_pubkey = signer_obj.serialize() + + output = bytearray() for level in range(test_levels): - if level == 0: - metadata = Metadata(exp=int(time.time()) + 100, - value=value).serialize() - requester_obj = ECCBlind(pubkey=signer_obj.pubkey, - metadata=metadata) - else: - requester_obj = ECCBlind(pubkey=signer_obj.pubkey) + if not level: + output.extend(ca.pubkey()) + requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) + child_obj = ECCBlind() point_r = signer_obj.signer_init() + pubkey = child_obj.pubkey() if level == test_levels - 1: msg_blinded = requester_obj.create_signing_request(point_r, msg) else: - msg_blinded = requester.obj.create_signing_request(point_r, - signer_pubkey) + msg_blinded = requester_obj.create_signing_request(point_r, + pubkey) signature_blinded = signer_obj.blind_sign(msg_blinded) signature = requester_obj.unblind(signature_blinded) - chain.add_level(signer_obj.pubkey, - signer_obj.metadata.serialize, - signature) - signer_obj = requester_obj - signer_pubkey = requester_obj.serialize() - sigchain = chain.serialize() - verifychain = ECCBlindChain.deserialize(sigchain) - self.assertTrue(verifychain.verify(msg, value)) + if level != test_levels - 1: + output.extend(pubkey) + output.extend(signature) + signer_obj = child_obj + verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output)) + self.assertTrue(verifychain.verify(msg=msg, value=1)) + + def test_blind_sig_chain_wrong_ca(self): # pylint: disable=too-many-locals + """Test blind signature chain with an unlisted ca""" + + test_levels = 4 + msg = os.urandom(1024) + + ca = ECCBlind() + fake_ca = ECCBlind() + signer_obj = fake_ca + + output = bytearray() + + for level in range(test_levels): + requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) + child_obj = ECCBlind() + if not level: + # unlisted CA, but a syntactically valid pubkey + output.extend(fake_ca.pubkey()) + point_r = signer_obj.signer_init() + pubkey = child_obj.pubkey() + + if level == test_levels - 1: + msg_blinded = requester_obj.create_signing_request(point_r, + msg) + else: + msg_blinded = requester_obj.create_signing_request(point_r, + pubkey) + signature_blinded = signer_obj.blind_sign(msg_blinded) + signature = requester_obj.unblind(signature_blinded) + if level != test_levels - 1: + output.extend(pubkey) + output.extend(signature) + signer_obj = child_obj + verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output)) + self.assertFalse(verifychain.verify(msg, 1)) + + def test_blind_sig_chain_wrong_msg(self): # pylint: disable=too-many-locals + """Test blind signature chain with a fake message""" + + test_levels = 4 + msg = os.urandom(1024) + fake_msg = os.urandom(1024) + + ca = ECCBlind() + signer_obj = ca + + output = bytearray() + + for level in range(test_levels): + if not level: + output.extend(ca.pubkey()) + requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) + child_obj = ECCBlind() + point_r = signer_obj.signer_init() + pubkey = child_obj.pubkey() + + if level == test_levels - 1: + msg_blinded = requester_obj.create_signing_request(point_r, + msg) + else: + msg_blinded = requester_obj.create_signing_request(point_r, + pubkey) + signature_blinded = signer_obj.blind_sign(msg_blinded) + signature = requester_obj.unblind(signature_blinded) + if level != test_levels - 1: + output.extend(pubkey) + output.extend(signature) + signer_obj = child_obj + verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output)) + self.assertFalse(verifychain.verify(fake_msg, 1)) + + def test_blind_sig_chain_wrong_intermediary(self): # pylint: disable=too-many-locals + """Test blind signature chain using a fake intermediary pubkey""" + + test_levels = 4 + msg = os.urandom(1024) + wrong_level = 2 + + ca = ECCBlind() + signer_obj = ca + fake_intermediary = ECCBlind() + + output = bytearray() + + for level in range(test_levels): + if not level: + output.extend(ca.pubkey()) + requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) + child_obj = ECCBlind() + point_r = signer_obj.signer_init() + pubkey = child_obj.pubkey() + + if level == test_levels - 1: + msg_blinded = requester_obj.create_signing_request(point_r, + msg) + else: + msg_blinded = requester_obj.create_signing_request(point_r, + pubkey) + signature_blinded = signer_obj.blind_sign(msg_blinded) + signature = requester_obj.unblind(signature_blinded) + if level == wrong_level: + output.extend(fake_intermediary.pubkey()) + elif level != test_levels - 1: + output.extend(pubkey) + output.extend(signature) + signer_obj = child_obj + verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output)) + self.assertFalse(verifychain.verify(msg, 1)) diff --git a/src/tests/test_openssl.py b/src/tests/test_openssl.py new file mode 100644 index 00000000..e947fff3 --- /dev/null +++ b/src/tests/test_openssl.py @@ -0,0 +1,54 @@ +""" +Test if OpenSSL is working correctly +""" +import unittest + +from pybitmessage.pyelliptic.openssl import OpenSSL + +try: + OpenSSL.BN_bn2binpad + have_pad = True +except AttributeError: + have_pad = None + + +class TestOpenSSL(unittest.TestCase): + """ + Test cases for OpenSSL + """ + def test_is_odd(self): + """Test BN_is_odd implementation""" + ctx = OpenSSL.BN_CTX_new() + a = OpenSSL.BN_new() + group = OpenSSL.EC_GROUP_new_by_curve_name( + OpenSSL.get_curve("secp256k1")) + OpenSSL.EC_GROUP_get_order(group, a, ctx) + + bad = 0 + for _ in range(1024): + OpenSSL.BN_rand(a, OpenSSL.BN_num_bits(a), 0, 0) + if not OpenSSL.BN_is_odd(a) == OpenSSL.BN_is_odd_compatible(a): + bad += 1 + self.assertEqual(bad, 0) + + @unittest.skipUnless(have_pad, 'Skipping OpenSSL pad test') + def test_padding(self): + """Test an alternatie implementation of bn2binpad""" + + ctx = OpenSSL.BN_CTX_new() + a = OpenSSL.BN_new() + n = OpenSSL.BN_new() + group = OpenSSL.EC_GROUP_new_by_curve_name( + OpenSSL.get_curve("secp256k1")) + OpenSSL.EC_GROUP_get_order(group, n, ctx) + + bad = 0 + for _ in range(1024): + OpenSSL.BN_rand(a, OpenSSL.BN_num_bits(n), 0, 0) + b = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(n)) + c = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(a)) + OpenSSL.BN_bn2binpad(a, b, OpenSSL.BN_num_bytes(n)) + OpenSSL.BN_bn2bin(a, c) + if b.raw != c.raw.rjust(OpenSSL.BN_num_bytes(n), chr(0)): + bad += 1 + self.assertEqual(bad, 0) -- 2.45.1