Blind signature updates

- added serializing and deserializing
- added a signature chain class `ECCBlindSigChain`
- added more tests
This commit is contained in:
Peter Šurda 2020-03-29 20:51:55 +08:00
parent 213519bd93
commit ff1f451691
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
7 changed files with 607 additions and 236 deletions

View File

@ -12,6 +12,7 @@ This is an abandoned package maintained inside of the PyBitmessage.
from .cipher import Cipher from .cipher import Cipher
from .ecc import ECC from .ecc import ECC
from .eccblind import ECCBlind from .eccblind import ECCBlind
from .eccblindchain import ECCBlindChain
from .hash import hmac_sha256, hmac_sha512, pbkdf2 from .hash import hmac_sha256, hmac_sha512, pbkdf2
from .openssl import OpenSSL from .openssl import OpenSSL
@ -21,6 +22,7 @@ __all__ = [
'OpenSSL', 'OpenSSL',
'ECC', 'ECC',
'ECCBlind', 'ECCBlind',
'ECCBlindChain',
'Cipher', 'Cipher',
'hmac_sha256', 'hmac_sha256',
'hmac_sha512', 'hmac_sha512',

View File

@ -1,77 +0,0 @@
"""
Blind signature chain with a top level CA
"""
from eccblind import ECCBlind
try:
import msgpack
except ImportError:
try:
import umsgpack as msgpack
except ImportError:
import fallback.umsgpack.umsgpack as msgpack
from eccblind import ECCBlind
def encode_datetime(obj):
"""
Method to format time
"""
return {'Time': int(obj.strftime("%s"))}
class ECCBlindChain(object): # pylint: disable=too-many-instance-attributes
"""
# Class for ECC Blind Chain signature functionality
"""
chain = []
ca = []
def __init__(self, chain=None):
if chain is not None:
self.chain = chain
def serialize(self):
data = msgpack.packb(self.chain)
return data
@staticmethod
def deserialize(self, chain):
"""
Deserialize the data using msgpack
"""
data = msgpack.unpackb(chain)
return ECCBlindChain(data)
def add_level(self, pubkey, metadata, signature):
self.chain.append((pubkey, metadata, signature))
def add_ca(self, ca):
pubkey, metadata = ECCBlind.deserialize(ca)
self.ca.append(pubkey)
def verify(self, msg, value):
lastpubkey = None
retval = False
for level in reversed(self.chain):
pubkey, metadata, signature = level
verifier_obj = ECCBlind(pubkey=pubkey, metadata)
if not lastpubkey:
retval = verifier_obj.verify(msg, signature, value)
else:
retval = verifier_obj.verify(lastpubkey, signature, value)
if not reval:
break
lastpubkey = pubkey
if retval:
retval = False
for ca in self.ca:
match = True
for i in range(4):
if lastpubkey[i] != ca[i]:
match = False
break
if match:
return True
return retval

View File

@ -10,50 +10,69 @@ http://www.isecure-journal.com/article_39171_47f9ec605dd3918c2793565ec21fcd7a.pd
# variable names are based on the math in the paper, so they don't conform # variable names are based on the math in the paper, so they don't conform
# to PEP8 # to PEP8
from hashlib import sha256
import time import time
from hashlib import sha256
try: from struct import pack, unpack
import msgpack
except ImportError:
try:
import umsgpack as msgpack
except ImportError:
import fallback.umsgpack.umsgpack as msgpack
from .openssl import OpenSSL from .openssl import OpenSSL
# first byte in serialisation can contain data
Y_BIT = 0x01
COMPRESSED_BIT = 0x02
class Metadata(object): # formats
""" BIGNUM = '!32s'
Pubkey metadata EC = '!B32s'
""" PUBKEY = '!BB33s'
def __init__(self, exp=0, value=0):
self.exp = 0
self.value = 0 class Expiration(object):
if exp: """Expiration of pubkey"""
self.exp = exp @staticmethod
if value: def deserialize(val):
self.value = value """Create an object out of int"""
year = ((val & 0xF0) >> 4) + 2020
month = val & 0x0F
assert month < 12
return Expiration(year, month)
def __init__(self, year, month):
assert isinstance(year, int)
assert year > 2019 and year < 2036
assert isinstance(month, int)
assert month < 12
self.year = year
self.month = month
self.exp = year + month / 12.0
def serialize(self): def serialize(self):
if self.exp or self.value: """Make int out of object"""
return [self.exp, self.value] return ((self.year - 2020) << 4) + self.month
else:
return []
def verify(self):
"""Check if the pubkey has expired"""
now = time.gmtime()
return self.exp >= now.tm_year + (now.tm_mon - 1) / 12.0
class Value(object):
"""Value of a pubkey"""
@staticmethod @staticmethod
def deserialize(self, data): def deserialize(val):
exp, value = data """Make object out of int"""
return Medatadata(exp, value) return Value(val)
def __init__(self, value=0xFF):
assert isinstance(value, int)
self.value = value
def serialize(self):
"""Make int out of object"""
return self.value & 0xFF
def verify(self, value): def verify(self, value):
if self.value and value > self.value: """Verify against supplied value"""
return False return value <= self.value
if self.exp:
if time.time() > self.exp:
return False
return True
class ECCBlind(object): # pylint: disable=too-many-instance-attributes class ECCBlind(object): # pylint: disable=too-many-instance-attributes
@ -64,8 +83,8 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes
# init # init
k = None k = None
R = None R = None
keypair = None
F = None F = None
d = None
Q = None Q = None
a = None a = None
b = None b = None
@ -76,115 +95,183 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes
m_ = None m_ = None
s_ = None s_ = None
signature = None signature = None
exp = None
val = None
@staticmethod def ec_get_random(self):
def ec_get_random(group, ctx):
""" """
Random point from finite field Random integer within the EC order
""" """
order = OpenSSL.BN_new() randomnum = OpenSSL.BN_new()
OpenSSL.EC_GROUP_get_order(group, order, ctx) OpenSSL.BN_rand(randomnum, OpenSSL.BN_num_bits(self.n), 0, 0)
OpenSSL.BN_rand(order, OpenSSL.BN_num_bits(order), 0, 0) return randomnum
return order
@staticmethod def ec_invert(self, a):
def ec_invert(group, a, ctx):
""" """
ECC inversion ECC inversion
""" """
order = OpenSSL.BN_new() inverse = OpenSSL.BN_mod_inverse(0, a, self.n, self.ctx)
OpenSSL.EC_GROUP_get_order(group, order, ctx)
inverse = OpenSSL.BN_mod_inverse(0, a, order, ctx)
return inverse return inverse
@staticmethod def ec_gen_keypair(self):
def ec_gen_keypair(group, ctx):
""" """
Generate an ECC keypair Generate an ECC keypair
We're using compressed keys
""" """
d = ECCBlind.ec_get_random(group, ctx) d = self.ec_get_random()
Q = OpenSSL.EC_POINT_new(group) Q = OpenSSL.EC_POINT_new(self.group)
OpenSSL.EC_POINT_mul(group, Q, d, 0, 0, 0) OpenSSL.EC_POINT_mul(self.group, Q, d, 0, 0, 0)
return (d, Q) return (d, Q)
@staticmethod def ec_Ftor(self, F):
def ec_Ftor(F, group, ctx):
""" """
x0 coordinate of F x0 coordinate of F
""" """
# F = (x0, y0) # F = (x0, y0)
x0 = OpenSSL.BN_new() x0 = OpenSSL.BN_new()
y0 = OpenSSL.BN_new() y0 = OpenSSL.BN_new()
OpenSSL.EC_POINT_get_affine_coordinates_GFp(group, F, x0, y0, ctx) OpenSSL.EC_POINT_get_affine_coordinates(self.group, F, x0, y0, self.ctx)
OpenSSL.BN_free(y0)
return x0 return x0
@staticmethod def _ec_point_serialize(self, point):
def deserialize(self, data): """Make an EC point into a string"""
pubkey_deserialized, meta = msgpack.unpackb(data) try:
if meta: x = OpenSSL.BN_new()
obj = ECCBlind(pubkey=pubkey_deserialized, metadata=meta) y = OpenSSL.BN_new()
else: OpenSSL.EC_POINT_get_affine_coordinates(
obj = ECCBlind(pubkey=pubkey_deserialized) self.group, point, x, y, 0)
return obj y_byte = (OpenSSL.BN_is_odd(y) & Y_BIT) | COMPRESSED_BIT
l_ = OpenSSL.BN_num_bytes(self.n)
try:
bx = OpenSSL.malloc(0, l_)
OpenSSL.BN_bn2binpad(x, bx, l_)
out = bx.raw
except AttributeError:
# padding manually
bx = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(x))
OpenSSL.BN_bn2bin(x, bx)
out = bx.raw.rjust(l_, chr(0))
return pack(EC, y_byte, out)
def serialize(self): finally:
data = (self.pubkey, self.metadata.serialize) OpenSSL.BN_clear_free(x)
retval = msgpack.packb(data) OpenSSL.BN_clear_free(y)
def _ec_point_deserialize(self, data):
"""Make a string into an EC point"""
y_bit, x_raw = unpack(EC, data)
x = OpenSSL.BN_bin2bn(x_raw, OpenSSL.BN_num_bytes(self.n), 0)
y_bit &= Y_BIT
retval = OpenSSL.EC_POINT_new(self.group)
OpenSSL.EC_POINT_set_compressed_coordinates(self.group,
retval,
x,
y_bit,
self.ctx)
return retval return retval
def __init__(self, curve="secp256k1", pubkey=None, metadata=None): def _bn_serialize(self, bn):
"""Make a string out of BigNum"""
l_ = OpenSSL.BN_num_bytes(self.n)
try:
o = OpenSSL.malloc(0, l_)
OpenSSL.BN_bn2binpad(bn, o, l_)
return o.raw
except AttributeError:
o = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(bn))
OpenSSL.BN_bn2bin(bn, o)
return o.raw.rjust(l_, chr(0))
def _bn_deserialize(self, data):
"""Make a BigNum out of string"""
x = OpenSSL.BN_bin2bn(data, OpenSSL.BN_num_bytes(self.n), 0)
return x
def _init_privkey(self, privkey):
"""Initialise private key out of string/bytes"""
self.d = self._bn_deserialize(privkey)
def privkey(self):
"""Make a private key into a string"""
return pack(BIGNUM, self.d)
def _init_pubkey(self, pubkey):
"""Initialise pubkey out of string/bytes"""
unpacked = unpack(PUBKEY, pubkey)
self.expiration = Expiration.deserialize(unpacked[0])
self.value = Value.deserialize(unpacked[1])
self.Q = self._ec_point_deserialize(unpacked[2])
def pubkey(self):
"""Make a pubkey into a string"""
return pack(PUBKEY, self.expiration.serialize(),
self.value.serialize(),
self._ec_point_serialize(self.Q))
def __init__(self, curve="secp256k1", pubkey=None, privkey=None, # pylint: disable=too-many-arguments
year=2025, month=11, value=0xFF):
self.ctx = OpenSSL.BN_CTX_new() self.ctx = OpenSSL.BN_CTX_new()
if pubkey: # ECC group
self.group, self.G, self.n, self.Q = pubkey self.group = OpenSSL.EC_GROUP_new_by_curve_name(
else: OpenSSL.get_curve(curve))
self.group = OpenSSL.EC_GROUP_new_by_curve_name(
OpenSSL.get_curve(curve))
# Order n
self.n = OpenSSL.BN_new()
OpenSSL.EC_GROUP_get_order(self.group, self.n, self.ctx)
# Generator G # Order n
self.G = OpenSSL.EC_GROUP_get0_generator(self.group) self.n = OpenSSL.BN_new()
OpenSSL.EC_GROUP_get_order(self.group, self.n, self.ctx)
# new keypair # Generator G
self.keypair = ECCBlind.ec_gen_keypair(self.group, self.ctx) self.G = OpenSSL.EC_GROUP_get0_generator(self.group)
self.Q = self.keypair[1]
self.pubkey = (self.group, self.G, self.n, self.Q)
# Identity O (infinity) # Identity O (infinity)
self.iO = OpenSSL.EC_POINT_new(self.group) self.iO = OpenSSL.EC_POINT_new(self.group)
OpenSSL.EC_POINT_set_to_infinity(self.group, self.iO) OpenSSL.EC_POINT_set_to_infinity(self.group, self.iO)
if metadata: if privkey:
self._set_metadata(self, metadata) assert pubkey
else: # load both pubkey and privkey from bytes
self.metadata = Metadata() self._init_privkey(privkey)
self._init_pubkey(pubkey)
elif pubkey:
# load pubkey from bytes
self._init_pubkey(pubkey)
else:
# new keypair
self.d, self.Q = self.ec_gen_keypair()
if not year or not month:
now = time.gmtime()
if now.tm_mon == 12:
self.expiration = Expiration(now.tm_year + 1, 1)
else:
self.expiration = Expiration(now.tm_year, now.tm_mon + 1)
else:
self.expiration = Expiration(year, month)
self.value = Value(value)
def _set_metadata(self, metadata): def __del__(self):
self.metadata = Metadata.deserialise(metadata) OpenSSL.BN_free(self.n)
OpenSSL.BN_CTX_free(self.ctx)
def signer_init(self): def signer_init(self):
""" """
Init signer Init signer
""" """
# Signer: Random integer k # Signer: Random integer k
self.k = ECCBlind.ec_get_random(self.group, self.ctx) self.k = self.ec_get_random()
# R = kG # R = kG
self.R = OpenSSL.EC_POINT_new(self.group) self.R = OpenSSL.EC_POINT_new(self.group)
OpenSSL.EC_POINT_mul(self.group, self.R, self.k, 0, 0, 0) OpenSSL.EC_POINT_mul(self.group, self.R, self.k, 0, 0, 0)
return self.R return self._ec_point_serialize(self.R)
def create_signing_request(self, R, msg): def create_signing_request(self, R, msg):
""" """
Requester creates a new signing request Requester creates a new signing request
""" """
self.R = R self.R = self._ec_point_deserialize(R)
msghash = sha256(msg) msghash = sha256(msg).digest()
# Requester: 3 random blinding factors # Requester: 3 random blinding factors
self.F = OpenSSL.EC_POINT_new(self.group) self.F = OpenSSL.EC_POINT_new(self.group)
@ -194,12 +281,12 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes
# F != O # F != O
while OpenSSL.EC_POINT_cmp(self.group, self.F, self.iO, self.ctx) == 0: while OpenSSL.EC_POINT_cmp(self.group, self.F, self.iO, self.ctx) == 0:
self.a = ECCBlind.ec_get_random(self.group, self.ctx) self.a = self.ec_get_random()
self.b = ECCBlind.ec_get_random(self.group, self.ctx) self.b = self.ec_get_random()
self.c = ECCBlind.ec_get_random(self.group, self.ctx) self.c = self.ec_get_random()
# F = b^-1 * R... # F = b^-1 * R...
self.binv = ECCBlind.ec_invert(self.group, self.b, self.ctx) self.binv = self.ec_invert(self.b)
OpenSSL.EC_POINT_mul(self.group, temp, 0, self.R, self.binv, 0) OpenSSL.EC_POINT_mul(self.group, temp, 0, self.R, self.binv, 0)
OpenSSL.EC_POINT_copy(self.F, temp) OpenSSL.EC_POINT_copy(self.F, temp)
@ -213,7 +300,7 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes
OpenSSL.EC_POINT_add(self.group, self.F, self.F, temp, 0) OpenSSL.EC_POINT_add(self.group, self.F, self.F, temp, 0)
# F = (x0, y0) # F = (x0, y0)
self.r = ECCBlind.ec_Ftor(self.F, self.group, self.ctx) self.r = self.ec_Ftor(self.F)
# Requester: Blinding (m' = br(m) + a) # Requester: Blinding (m' = br(m) + a)
self.m = OpenSSL.BN_new() self.m = OpenSSL.BN_new()
@ -223,43 +310,48 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes
OpenSSL.BN_mod_mul(self.m_, self.b, self.r, self.n, self.ctx) OpenSSL.BN_mod_mul(self.m_, self.b, self.r, self.n, self.ctx)
OpenSSL.BN_mod_mul(self.m_, self.m_, self.m, self.n, self.ctx) OpenSSL.BN_mod_mul(self.m_, self.m_, self.m, self.n, self.ctx)
OpenSSL.BN_mod_add(self.m_, self.m_, self.a, self.n, self.ctx) OpenSSL.BN_mod_add(self.m_, self.m_, self.a, self.n, self.ctx)
return self.m_ return self._bn_serialize(self.m_)
def blind_sign(self, m_): def blind_sign(self, m_):
""" """
Signer blind-signs the request Signer blind-signs the request
""" """
self.m_ = m_ self.m_ = self._bn_deserialize(m_)
self.s_ = OpenSSL.BN_new() self.s_ = OpenSSL.BN_new()
OpenSSL.BN_mod_mul(self.s_, self.keypair[0], self.m_, self.n, self.ctx) OpenSSL.BN_mod_mul(self.s_, self.d, self.m_, self.n, self.ctx)
OpenSSL.BN_mod_add(self.s_, self.s_, self.k, self.n, self.ctx) OpenSSL.BN_mod_add(self.s_, self.s_, self.k, self.n, self.ctx)
return self.s_ OpenSSL.BN_free(self.k)
return self._bn_serialize(self.s_)
def unblind(self, s_): def unblind(self, s_):
""" """
Requester unblinds the signature Requester unblinds the signature
""" """
self.s_ = s_ self.s_ = self._bn_deserialize(s_)
s = OpenSSL.BN_new() s = OpenSSL.BN_new()
OpenSSL.BN_mod_mul(s, self.binv, self.s_, self.n, self.ctx) OpenSSL.BN_mod_mul(s, self.binv, self.s_, self.n, self.ctx)
OpenSSL.BN_mod_add(s, s, self.c, self.n, self.ctx) OpenSSL.BN_mod_add(s, s, self.c, self.n, self.ctx)
OpenSSL.BN_free(self.a)
OpenSSL.BN_free(self.b)
OpenSSL.BN_free(self.c)
self.signature = (s, self.F) self.signature = (s, self.F)
return self.signature return self._bn_serialize(s) + self._ec_point_serialize(self.F)
def verify(self, msg, signature, value=0): def verify(self, msg, signature, value=1):
""" """
Verify signature with certifier's pubkey Verify signature with certifier's pubkey
""" """
# convert msg to BIGNUM # convert msg to BIGNUM
self.m = OpenSSL.BN_new() self.m = OpenSSL.BN_new()
msghash = sha256(msg) msghash = sha256(msg).digest()
OpenSSL.BN_bin2bn(msghash, len(msghash), self.m) OpenSSL.BN_bin2bn(msghash, len(msghash), self.m)
# init # init
s, self.F = signature s, self.F = (self._bn_deserialize(signature[0:32]),
self._ec_point_deserialize(signature[32:]))
if self.r is None: if self.r is None:
self.r = ECCBlind.ec_Ftor(self.F, self.group, self.ctx) self.r = self.ec_Ftor(self.F)
lhs = OpenSSL.EC_POINT_new(self.group) lhs = OpenSSL.EC_POINT_new(self.group)
rhs = OpenSSL.EC_POINT_new(self.group) rhs = OpenSSL.EC_POINT_new(self.group)
@ -273,8 +365,10 @@ class ECCBlind(object): # pylint: disable=too-many-instance-attributes
retval = OpenSSL.EC_POINT_cmp(self.group, lhs, rhs, self.ctx) retval = OpenSSL.EC_POINT_cmp(self.group, lhs, rhs, self.ctx)
if retval == -1: if retval == -1:
raise RuntimeError("EC_POINT_cmp returned an error") raise RuntimeError("EC_POINT_cmp returned an error")
elif not self.value.verify(value):
return False
elif not self.expiration.verify():
return False
elif retval != 0: elif retval != 0:
return False return False
elif self.metadata:
return self.metadata.verify(value)
return True return True

View File

@ -0,0 +1,52 @@
"""
Blind signature chain with a top level CA
"""
from .eccblind import ECCBlind
class ECCBlindChain(object): # pylint: disable=too-few-public-methods
"""
# Class for ECC Blind Chain signature functionality
"""
def __init__(self, ca=None, chain=None):
self.chain = []
self.ca = []
if ca:
for i in range(0, len(ca), 35):
self.ca.append(ca[i:i + 35])
if chain:
self.chain.append(chain[0:35])
for i in range(35, len(chain), 100):
if len(chain[i:]) == 65:
self.chain.append(chain[i:i + 65])
else:
self.chain.append(chain[i:i + 100])
def verify(self, msg, value):
"""Verify a chain provides supplied message and value"""
parent = None
l_ = 0
for level in self.chain:
l_ += 1
pubkey = None
signature = None
if len(level) == 100:
pubkey, signature = (level[0:35], level[35:])
elif len(level) == 35:
if level not in self.ca:
return False
parent = level
continue
else:
signature = level
verifier_obj = ECCBlind(pubkey=parent)
if pubkey:
if not verifier_obj.verify(pubkey, signature, value):
return False
parent = pubkey
else:
return verifier_obj.verify(msg=msg, signature=signature,
value=value)
return None

View File

@ -8,6 +8,7 @@ needed openssl functionality in class _OpenSSL.
""" """
import ctypes import ctypes
import sys import sys
# pylint: disable=protected-access # pylint: disable=protected-access
OpenSSL = None OpenSSL = None
@ -97,6 +98,10 @@ class _OpenSSL(object):
self.BN_free.restype = None self.BN_free.restype = None
self.BN_free.argtypes = [ctypes.c_void_p] self.BN_free.argtypes = [ctypes.c_void_p]
self.BN_clear_free = self._lib.BN_clear_free
self.BN_clear_free.restype = None
self.BN_clear_free.argtypes = [ctypes.c_void_p]
self.BN_num_bits = self._lib.BN_num_bits self.BN_num_bits = self._lib.BN_num_bits
self.BN_num_bits.restype = ctypes.c_int self.BN_num_bits.restype = ctypes.c_int
self.BN_num_bits.argtypes = [ctypes.c_void_p] self.BN_num_bits.argtypes = [ctypes.c_void_p]
@ -105,6 +110,15 @@ class _OpenSSL(object):
self.BN_bn2bin.restype = ctypes.c_int self.BN_bn2bin.restype = ctypes.c_int
self.BN_bn2bin.argtypes = [ctypes.c_void_p, ctypes.c_void_p] self.BN_bn2bin.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
try:
self.BN_bn2binpad = self._lib.BN_bn2binpad
self.BN_bn2binpad.restype = ctypes.c_int
self.BN_bn2binpad.argtypes = [ctypes.c_void_p, ctypes.c_void_p,
ctypes.c_int]
except AttributeError:
# optional, we have a workaround
pass
self.BN_bin2bn = self._lib.BN_bin2bn self.BN_bin2bn = self._lib.BN_bin2bn
self.BN_bin2bn.restype = ctypes.c_void_p self.BN_bin2bn.restype = ctypes.c_void_p
self.BN_bin2bn.argtypes = [ctypes.c_void_p, ctypes.c_int, self.BN_bin2bn.argtypes = [ctypes.c_void_p, ctypes.c_int,
@ -147,6 +161,20 @@ class _OpenSSL(object):
ctypes.c_void_p, ctypes.c_void_p,
ctypes.c_void_p] ctypes.c_void_p]
try:
self.EC_POINT_get_affine_coordinates = \
self._lib.EC_POINT_get_affine_coordinates
except AttributeError:
# OpenSSL docs say only use this for backwards compatibility
self.EC_POINT_get_affine_coordinates = \
self._lib.EC_POINT_get_affine_coordinates_GF2m
self.EC_POINT_get_affine_coordinates.restype = ctypes.c_int
self.EC_POINT_get_affine_coordinates.argtypes = [ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_void_p]
self.EC_KEY_set_private_key = self._lib.EC_KEY_set_private_key self.EC_KEY_set_private_key = self._lib.EC_KEY_set_private_key
self.EC_KEY_set_private_key.restype = ctypes.c_int self.EC_KEY_set_private_key.restype = ctypes.c_int
self.EC_KEY_set_private_key.argtypes = [ctypes.c_void_p, self.EC_KEY_set_private_key.argtypes = [ctypes.c_void_p,
@ -171,6 +199,34 @@ class _OpenSSL(object):
ctypes.c_void_p, ctypes.c_void_p,
ctypes.c_void_p] ctypes.c_void_p]
try:
self.EC_POINT_set_affine_coordinates = \
self._lib.EC_POINT_set_affine_coordinates
except AttributeError:
# OpenSSL docs say only use this for backwards compatibility
self.EC_POINT_set_affine_coordinates = \
self._lib.EC_POINT_set_affine_coordinates_GF2m
self.EC_POINT_set_affine_coordinates.restype = ctypes.c_int
self.EC_POINT_set_affine_coordinates.argtypes = [ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_void_p]
try:
self.EC_POINT_set_compressed_coordinates = \
self._lib.EC_POINT_set_compressed_coordinates
except AttributeError:
# OpenSSL docs say only use this for backwards compatibility
self.EC_POINT_set_compressed_coordinates = \
self._lib.EC_POINT_set_compressed_coordinates_GF2m
self.EC_POINT_set_compressed_coordinates.restype = ctypes.c_int
self.EC_POINT_set_compressed_coordinates.argtypes = [ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_int,
ctypes.c_void_p]
self.EC_POINT_new = self._lib.EC_POINT_new self.EC_POINT_new = self._lib.EC_POINT_new
self.EC_POINT_new.restype = ctypes.c_void_p self.EC_POINT_new.restype = ctypes.c_void_p
self.EC_POINT_new.argtypes = [ctypes.c_void_p] self.EC_POINT_new.argtypes = [ctypes.c_void_p]
@ -215,10 +271,6 @@ class _OpenSSL(object):
self._lib.ECDH_set_method.argtypes = [ctypes.c_void_p, self._lib.ECDH_set_method.argtypes = [ctypes.c_void_p,
ctypes.c_void_p] ctypes.c_void_p]
self.BN_CTX_new = self._lib.BN_CTX_new
self._lib.BN_CTX_new.restype = ctypes.c_void_p
self._lib.BN_CTX_new.argtypes = []
self.ECDH_compute_key = self._lib.ECDH_compute_key self.ECDH_compute_key = self._lib.ECDH_compute_key
self.ECDH_compute_key.restype = ctypes.c_int self.ECDH_compute_key.restype = ctypes.c_int
self.ECDH_compute_key.argtypes = [ctypes.c_void_p, self.ECDH_compute_key.argtypes = [ctypes.c_void_p,
@ -477,13 +529,19 @@ class _OpenSSL(object):
self.BN_cmp.argtypes = [ctypes.c_void_p, self.BN_cmp.argtypes = [ctypes.c_void_p,
ctypes.c_void_p] ctypes.c_void_p]
try:
self.BN_is_odd = self._lib.BN_is_odd
self.BN_is_odd.restype = ctypes.c_int
self.BN_is_odd.argtypes = [ctypes.c_void_p]
except AttributeError:
# OpenSSL 1.1.0 implements this as a function, but earlier
# versions as macro, so we need to workaround
self.BN_is_odd = self.BN_is_odd_compatible
self.BN_bn2dec = self._lib.BN_bn2dec self.BN_bn2dec = self._lib.BN_bn2dec
self.BN_bn2dec.restype = ctypes.c_char_p self.BN_bn2dec.restype = ctypes.c_char_p
self.BN_bn2dec.argtypes = [ctypes.c_void_p] self.BN_bn2dec.argtypes = [ctypes.c_void_p]
self.BN_CTX_free = self._lib.BN_CTX_free
self.BN_CTX_free.argtypes = [ctypes.c_void_p]
self.EC_GROUP_new_by_curve_name = self._lib.EC_GROUP_new_by_curve_name self.EC_GROUP_new_by_curve_name = self._lib.EC_GROUP_new_by_curve_name
self.EC_GROUP_new_by_curve_name.restype = ctypes.c_void_p self.EC_GROUP_new_by_curve_name.restype = ctypes.c_void_p
self.EC_GROUP_new_by_curve_name.argtypes = [ctypes.c_int] self.EC_GROUP_new_by_curve_name.argtypes = [ctypes.c_int]
@ -600,6 +658,16 @@ class _OpenSSL(object):
""" """
return int((self.BN_num_bits(x) + 7) / 8) return int((self.BN_num_bits(x) + 7) / 8)
def BN_is_odd_compatible(self, x):
"""
returns if BN is odd
we assume big endianness, and that BN is initialised
"""
length = self.BN_num_bytes(x)
data = self.malloc(0, length)
OpenSSL.BN_bn2bin(x, data)
return ord(data[length - 1]) & 1
def get_cipher(self, name): def get_cipher(self, name):
""" """
returns the OpenSSL cipher instance returns the OpenSSL cipher instance

View File

@ -2,14 +2,15 @@
Test for ECC blind signatures Test for ECC blind signatures
""" """
import os import os
import time
import unittest import unittest
from ctypes import cast, c_char_p from hashlib import sha256
from pybitmessage.pyelliptic.eccblind import ECCBlind, Metadata from pybitmessage.pyelliptic.eccblind import ECCBlind
from pybitmessage.pyelliptic.eccblindchain import ECCBlindChain from pybitmessage.pyelliptic.eccblindchain import ECCBlindChain
from pybitmessage.pyelliptic.openssl import OpenSSL from pybitmessage.pyelliptic.openssl import OpenSSL
# pylint: disable=protected-access
class TestBlindSig(unittest.TestCase): class TestBlindSig(unittest.TestCase):
""" """
@ -21,78 +22,255 @@ class TestBlindSig(unittest.TestCase):
# (1) Initialization # (1) Initialization
signer_obj = ECCBlind() signer_obj = ECCBlind()
point_r = signer_obj.signer_init() point_r = signer_obj.signer_init()
self.assertEqual(len(signer_obj.pubkey()), 35)
# (2) Request # (2) Request
requester_obj = ECCBlind(pubkey=signer_obj.pubkey) requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
# only 64 byte messages are planned to be used in Bitmessage # only 64 byte messages are planned to be used in Bitmessage
msg = os.urandom(64) msg = os.urandom(64)
msg_blinded = requester_obj.create_signing_request(point_r, msg) msg_blinded = requester_obj.create_signing_request(point_r, msg)
self.assertEqual(len(msg_blinded), 32)
# check # check
msg_blinded_str = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(msg_blinded)) self.assertNotEqual(sha256(msg).digest(), msg_blinded)
OpenSSL.BN_bn2bin(msg_blinded, msg_blinded_str)
self.assertNotEqual(msg, cast(msg_blinded_str, c_char_p).value)
# (3) Signature Generation # (3) Signature Generation
signature_blinded = signer_obj.blind_sign(msg_blinded) signature_blinded = signer_obj.blind_sign(msg_blinded)
assert isinstance(signature_blinded, str)
self.assertEqual(len(signature_blinded), 32)
# (4) Extraction # (4) Extraction
signature = requester_obj.unblind(signature_blinded) signature = requester_obj.unblind(signature_blinded)
assert isinstance(signature, str)
self.assertEqual(len(signature), 65)
# check self.assertNotEqual(signature, signature_blinded)
signature_blinded_str = OpenSSL.malloc(0,
OpenSSL.BN_num_bytes(
signature_blinded))
signature_str = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(signature[0]))
OpenSSL.BN_bn2bin(signature_blinded, signature_blinded_str)
OpenSSL.BN_bn2bin(signature[0], signature_str)
self.assertNotEqual(cast(signature_str, c_char_p).value,
cast(signature_blinded_str, c_char_p).value)
# (5) Verification # (5) Verification
verifier_obj = ECCBlind(pubkey=signer_obj.pubkey) verifier_obj = ECCBlind(pubkey=signer_obj.pubkey())
self.assertTrue(verifier_obj.verify(msg, signature)) self.assertTrue(verifier_obj.verify(msg, signature))
# Serialization and deserialisation def test_is_odd(self):
pk = signer_obj.serialize() """Test our implementation of BN_is_odd"""
pko = ECCBlind.deserialize(pk) for _ in range(1024):
self.assertTrue(pko.verify(msg, signature)) obj = ECCBlind()
x = OpenSSL.BN_new()
y = OpenSSL.BN_new()
OpenSSL.EC_POINT_get_affine_coordinates(
obj.group, obj.Q, x, y, 0)
self.assertEqual(OpenSSL.BN_is_odd(y),
OpenSSL.BN_is_odd_compatible(y))
def test_blind_sig_chain(self): def test_serialize_ec_point(self):
"""Test EC point serialization/deserialization"""
for _ in range(1024):
try:
obj = ECCBlind()
obj2 = ECCBlind()
randompoint = obj.Q
serialized = obj._ec_point_serialize(randompoint)
secondpoint = obj2._ec_point_deserialize(serialized)
x0 = OpenSSL.BN_new()
y0 = OpenSSL.BN_new()
OpenSSL.EC_POINT_get_affine_coordinates(obj.group,
randompoint, x0,
y0, obj.ctx)
x1 = OpenSSL.BN_new()
y1 = OpenSSL.BN_new()
OpenSSL.EC_POINT_get_affine_coordinates(obj2.group,
secondpoint, x1,
y1, obj2.ctx)
self.assertEqual(OpenSSL.BN_cmp(y0, y1), 0)
self.assertEqual(OpenSSL.BN_cmp(x0, x1), 0)
self.assertEqual(OpenSSL.EC_POINT_cmp(obj.group, randompoint,
secondpoint, 0), 0)
finally:
OpenSSL.BN_free(x0)
OpenSSL.BN_free(x1)
OpenSSL.BN_free(y0)
OpenSSL.BN_free(y1)
del obj
del obj2
def test_serialize_bn(self):
"""Test Bignum serialization/deserialization"""
for _ in range(1024):
obj = ECCBlind()
obj2 = ECCBlind()
randomnum = obj.d
serialized = obj._bn_serialize(randomnum)
secondnum = obj2._bn_deserialize(serialized)
self.assertEqual(OpenSSL.BN_cmp(randomnum, secondnum), 0)
def test_blind_sig_many(self):
"""Test a lot of blind signatures"""
for _ in range(1024):
self.test_blind_sig()
def test_blind_sig_value(self):
"""Test blind signature value checking"""
signer_obj = ECCBlind(value=5)
point_r = signer_obj.signer_init()
requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
msg = os.urandom(64)
msg_blinded = requester_obj.create_signing_request(point_r, msg)
signature_blinded = signer_obj.blind_sign(msg_blinded)
signature = requester_obj.unblind(signature_blinded)
verifier_obj = ECCBlind(pubkey=signer_obj.pubkey())
self.assertFalse(verifier_obj.verify(msg, signature, value=8))
def test_blind_sig_expiration(self):
"""Test blind signature expiration checking"""
signer_obj = ECCBlind(year=2020, month=1)
point_r = signer_obj.signer_init()
requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
msg = os.urandom(64)
msg_blinded = requester_obj.create_signing_request(point_r, msg)
signature_blinded = signer_obj.blind_sign(msg_blinded)
signature = requester_obj.unblind(signature_blinded)
verifier_obj = ECCBlind(pubkey=signer_obj.pubkey())
self.assertFalse(verifier_obj.verify(msg, signature))
def test_blind_sig_chain(self): # pylint: disable=too-many-locals
"""Test blind signature chain using a random certifier key and a random message""" """Test blind signature chain using a random certifier key and a random message"""
test_levels = 5 test_levels = 4
value = 1
msg = os.urandom(1024) msg = os.urandom(1024)
chain = ECCBlindChain()
ca = ECCBlind() ca = ECCBlind()
signer_obj = ca signer_obj = ca
signer_pubkey = signer_obj.serialize()
output = bytearray()
for level in range(test_levels): for level in range(test_levels):
if level == 0: if not level:
metadata = Metadata(exp=int(time.time()) + 100, output.extend(ca.pubkey())
value=value).serialize() requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
requester_obj = ECCBlind(pubkey=signer_obj.pubkey, child_obj = ECCBlind()
metadata=metadata)
else:
requester_obj = ECCBlind(pubkey=signer_obj.pubkey)
point_r = signer_obj.signer_init() point_r = signer_obj.signer_init()
pubkey = child_obj.pubkey()
if level == test_levels - 1: if level == test_levels - 1:
msg_blinded = requester_obj.create_signing_request(point_r, msg_blinded = requester_obj.create_signing_request(point_r,
msg) msg)
else: else:
msg_blinded = requester.obj.create_signing_request(point_r, msg_blinded = requester_obj.create_signing_request(point_r,
signer_pubkey) pubkey)
signature_blinded = signer_obj.blind_sign(msg_blinded) signature_blinded = signer_obj.blind_sign(msg_blinded)
signature = requester_obj.unblind(signature_blinded) signature = requester_obj.unblind(signature_blinded)
chain.add_level(signer_obj.pubkey, if level != test_levels - 1:
signer_obj.metadata.serialize, output.extend(pubkey)
signature) output.extend(signature)
signer_obj = requester_obj signer_obj = child_obj
signer_pubkey = requester_obj.serialize() verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output))
sigchain = chain.serialize() self.assertTrue(verifychain.verify(msg=msg, value=1))
verifychain = ECCBlindChain.deserialize(sigchain)
self.assertTrue(verifychain.verify(msg, value)) def test_blind_sig_chain_wrong_ca(self): # pylint: disable=too-many-locals
"""Test blind signature chain with an unlisted ca"""
test_levels = 4
msg = os.urandom(1024)
ca = ECCBlind()
fake_ca = ECCBlind()
signer_obj = fake_ca
output = bytearray()
for level in range(test_levels):
requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
child_obj = ECCBlind()
if not level:
# unlisted CA, but a syntactically valid pubkey
output.extend(fake_ca.pubkey())
point_r = signer_obj.signer_init()
pubkey = child_obj.pubkey()
if level == test_levels - 1:
msg_blinded = requester_obj.create_signing_request(point_r,
msg)
else:
msg_blinded = requester_obj.create_signing_request(point_r,
pubkey)
signature_blinded = signer_obj.blind_sign(msg_blinded)
signature = requester_obj.unblind(signature_blinded)
if level != test_levels - 1:
output.extend(pubkey)
output.extend(signature)
signer_obj = child_obj
verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output))
self.assertFalse(verifychain.verify(msg, 1))
def test_blind_sig_chain_wrong_msg(self): # pylint: disable=too-many-locals
"""Test blind signature chain with a fake message"""
test_levels = 4
msg = os.urandom(1024)
fake_msg = os.urandom(1024)
ca = ECCBlind()
signer_obj = ca
output = bytearray()
for level in range(test_levels):
if not level:
output.extend(ca.pubkey())
requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
child_obj = ECCBlind()
point_r = signer_obj.signer_init()
pubkey = child_obj.pubkey()
if level == test_levels - 1:
msg_blinded = requester_obj.create_signing_request(point_r,
msg)
else:
msg_blinded = requester_obj.create_signing_request(point_r,
pubkey)
signature_blinded = signer_obj.blind_sign(msg_blinded)
signature = requester_obj.unblind(signature_blinded)
if level != test_levels - 1:
output.extend(pubkey)
output.extend(signature)
signer_obj = child_obj
verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output))
self.assertFalse(verifychain.verify(fake_msg, 1))
def test_blind_sig_chain_wrong_intermediary(self): # pylint: disable=too-many-locals
"""Test blind signature chain using a fake intermediary pubkey"""
test_levels = 4
msg = os.urandom(1024)
wrong_level = 2
ca = ECCBlind()
signer_obj = ca
fake_intermediary = ECCBlind()
output = bytearray()
for level in range(test_levels):
if not level:
output.extend(ca.pubkey())
requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
child_obj = ECCBlind()
point_r = signer_obj.signer_init()
pubkey = child_obj.pubkey()
if level == test_levels - 1:
msg_blinded = requester_obj.create_signing_request(point_r,
msg)
else:
msg_blinded = requester_obj.create_signing_request(point_r,
pubkey)
signature_blinded = signer_obj.blind_sign(msg_blinded)
signature = requester_obj.unblind(signature_blinded)
if level == wrong_level:
output.extend(fake_intermediary.pubkey())
elif level != test_levels - 1:
output.extend(pubkey)
output.extend(signature)
signer_obj = child_obj
verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output))
self.assertFalse(verifychain.verify(msg, 1))

54
src/tests/test_openssl.py Normal file
View File

@ -0,0 +1,54 @@
"""
Test if OpenSSL is working correctly
"""
import unittest
from pybitmessage.pyelliptic.openssl import OpenSSL
try:
OpenSSL.BN_bn2binpad
have_pad = True
except AttributeError:
have_pad = None
class TestOpenSSL(unittest.TestCase):
"""
Test cases for OpenSSL
"""
def test_is_odd(self):
"""Test BN_is_odd implementation"""
ctx = OpenSSL.BN_CTX_new()
a = OpenSSL.BN_new()
group = OpenSSL.EC_GROUP_new_by_curve_name(
OpenSSL.get_curve("secp256k1"))
OpenSSL.EC_GROUP_get_order(group, a, ctx)
bad = 0
for _ in range(1024):
OpenSSL.BN_rand(a, OpenSSL.BN_num_bits(a), 0, 0)
if not OpenSSL.BN_is_odd(a) == OpenSSL.BN_is_odd_compatible(a):
bad += 1
self.assertEqual(bad, 0)
@unittest.skipUnless(have_pad, 'Skipping OpenSSL pad test')
def test_padding(self):
"""Test an alternatie implementation of bn2binpad"""
ctx = OpenSSL.BN_CTX_new()
a = OpenSSL.BN_new()
n = OpenSSL.BN_new()
group = OpenSSL.EC_GROUP_new_by_curve_name(
OpenSSL.get_curve("secp256k1"))
OpenSSL.EC_GROUP_get_order(group, n, ctx)
bad = 0
for _ in range(1024):
OpenSSL.BN_rand(a, OpenSSL.BN_num_bits(n), 0, 0)
b = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(n))
c = OpenSSL.malloc(0, OpenSSL.BN_num_bytes(a))
OpenSSL.BN_bn2binpad(a, b, OpenSSL.BN_num_bytes(n))
OpenSSL.BN_bn2bin(a, c)
if b.raw != c.raw.rjust(OpenSSL.BN_num_bytes(n), chr(0)):
bad += 1
self.assertEqual(bad, 0)