test case for sqlthread version 1

This commit is contained in:
Muzahid 2021-02-22 22:09:25 +05:30
parent b6155b1af3
commit aed71c11ab
Signed by untrusted user: cis-muzahid
GPG Key ID: 1DC85E7D3AB613EA
2 changed files with 96 additions and 257 deletions

View File

@ -17,7 +17,7 @@ import state
import tr import tr
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from debug import logger from debug import logger
pylint: disable=attribute-defined-outside-init,protected-access # pylint: disable=attribute-defined-outside-init,protected-access
class UpgradeDB(): class UpgradeDB():
@ -70,7 +70,6 @@ class UpgradeDB():
For version 1 and 3 For version 1 and 3
Add a new column to the inventory table to store tags. Add a new column to the inventory table to store tags.
""" """
logger.debug( logger.debug(
'In messages.dat database, adding tag field to' 'In messages.dat database, adding tag field to'
' the inventory table.') ' the inventory table.')

View File

@ -3,272 +3,112 @@ Test for ECC blind signatures
""" """
import os import os
import unittest import unittest
from hashlib import sha256 import shutil # used for moving the messages.dat file
import sqlite3
import sys
import threading
import time
# print sys.path
# import state
from ..state import appdata
from ..helper_sql import sqlStoredProcedure
from pybitmessage.pyelliptic import ECCBlind, ECCBlindChain, OpenSSL # import threading
# from hashlib import sha256
# from ..helper_sql import sqlQuery
# from ..version import softwareVersion
# from common import cleanup
from ..class_sqlThread import (sqlThread, UpgradeDB)
# pylint: disable=protected-access class TestSqlThread(unittest.TestCase):
class TestBlindSig(unittest.TestCase):
""" """
Test case for ECC blind signature Test case for SQLThread
""" """
def test_blind_sig(self):
"""Test full sequence using a random certifier key and a random message"""
# See page 127 of the paper
# (1) Initialization
signer_obj = ECCBlind()
point_r = signer_obj.signer_init()
self.assertEqual(len(signer_obj.pubkey()), 35)
# (2) Request # def __init__(self):
requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) # initialise Db connection
# only 64 byte messages are planned to be used in Bitmessage conn = sqlite3.connect(appdata + 'messages.dat')
msg = os.urandom(64) conn.text_factory = str
msg_blinded = requester_obj.create_signing_request(point_r, msg) cur = conn.cursor()
self.assertEqual(len(msg_blinded), 32)
# check @classmethod
self.assertNotEqual(sha256(msg).digest(), msg_blinded) def setUpClass(cls):
# Start SQL thread
sqlLookup = sqlThread()
sqlLookup.daemon = False
sqlLookup.start()
# (3) Signature Generation @classmethod
signature_blinded = signer_obj.blind_sign(msg_blinded) def tearDownClass(cls):
assert isinstance(signature_blinded, bytes) # Stop sql thread
self.assertEqual(len(signature_blinded), 32) sqlStoredProcedure('exit')
print "...TearDown"
# (4) Extraction def db_connection(self):
signature = requester_obj.unblind(signature_blinded) conn = sqlite3.connect(appdata + 'messages.dat')
assert isinstance(signature, bytes) conn.text_factory = str
self.assertEqual(len(signature), 65) return conn.cursor()
self.assertNotEqual(signature, signature_blinded) def normalize_version(self):
try:
# (5) Verification self.cur.execute(
verifier_obj = ECCBlind(pubkey=signer_obj.pubkey()) '''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text,'''
self.assertTrue(verifier_obj.verify(msg, signature)) ''' received text, message text, folder text, encodingtype int, read bool, sighash blob,'''
''' UNIQUE(msgid) ON CONFLICT REPLACE)''')
def test_is_odd(self): self.cur.execute(
"""Test our implementation of BN_is_odd""" '''CREATE TABLE sent (msgid blob, toaddress text, toripe blob, fromaddress text, subject text,'''
for _ in range(1024): ''' message text, ackdata blob, senttime integer, lastactiontime integer,'''
obj = ECCBlind() ''' sleeptill integer, status text, retrynumber integer, folder text, encodingtype int, ttl int)''')
x = OpenSSL.BN_new() self.cur.execute(
y = OpenSSL.BN_new() '''CREATE TABLE subscriptions (label text, address text, enabled bool)''')
OpenSSL.EC_POINT_get_affine_coordinates( self.cur.execute(
obj.group, obj.Q, x, y, 0) '''CREATE TABLE addressbook (label text, address text, UNIQUE(address) ON CONFLICT IGNORE)''')
self.assertEqual(OpenSSL.BN_is_odd(y), self.cur.execute(
OpenSSL.BN_is_odd_compatible(y)) '''CREATE TABLE blacklist (label text, address text, enabled bool)''')
self.cur.execute(
def test_serialize_ec_point(self): '''CREATE TABLE whitelist (label text, address text, enabled bool)''')
"""Test EC point serialization/deserialization""" self.cur.execute(
for _ in range(1024): '''CREATE TABLE pubkeys (address text, addressversion int, transmitdata blob, time int,'''
try: ''' usedpersonally text, UNIQUE(address) ON CONFLICT REPLACE)''')
obj = ECCBlind() # self.cur.execute(
obj2 = ECCBlind() # '''CREATE TABLE inventory (hash blob, objecttype int, streamnumber int, payload blob,'''
randompoint = obj.Q # ''' expirestime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''')
serialized = obj._ec_point_serialize(randompoint) self.cur.execute(
secondpoint = obj2._ec_point_deserialize(serialized) '''CREATE TABLE inventory (hash blob, objecttype int, streamnumber int, payload blob,'''
x0 = OpenSSL.BN_new() ''' expirestime integer, UNIQUE(hash) ON CONFLICT REPLACE)''')
y0 = OpenSSL.BN_new() # self.cur.execute(
OpenSSL.EC_POINT_get_affine_coordinates(obj.group, # '''INSERT INTO subscriptions VALUES'''
randompoint, x0, # '''('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''')
y0, obj.ctx) self.cur.execute('''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''')
x1 = OpenSSL.BN_new() # self.cur.execute('''INSERT INTO settings VALUES('version','11')''')
y1 = OpenSSL.BN_new() # self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
OpenSSL.EC_POINT_get_affine_coordinates(obj2.group, # int(time.time()),))
secondpoint, x1, self.cur.execute(
y1, obj2.ctx) '''CREATE TABLE objectprocessorqueue'''
''' (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''')
self.assertEqual(OpenSSL.BN_cmp(y0, y1), 0) self.conn.commit()
self.assertEqual(OpenSSL.BN_cmp(x0, x1), 0) except Exception as err:
self.assertEqual(OpenSSL.EC_POINT_cmp(obj.group, randompoint, print err.message
secondpoint, 0), 0) if str(err) == 'table inbox already exists':
finally: print "table inbox already exists"
OpenSSL.BN_free(x0)
OpenSSL.BN_free(x1)
OpenSSL.BN_free(y0)
OpenSSL.BN_free(y1)
del obj
del obj2
def test_serialize_bn(self):
"""Test Bignum serialization/deserialization"""
for _ in range(1024):
obj = ECCBlind()
obj2 = ECCBlind()
randomnum = obj.d
serialized = obj._bn_serialize(randomnum)
secondnum = obj2._bn_deserialize(serialized)
self.assertEqual(OpenSSL.BN_cmp(randomnum, secondnum), 0)
def test_blind_sig_many(self):
"""Test a lot of blind signatures"""
for _ in range(1024):
self.test_blind_sig()
def test_blind_sig_value(self):
"""Test blind signature value checking"""
signer_obj = ECCBlind(value=5)
point_r = signer_obj.signer_init()
requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
msg = os.urandom(64)
msg_blinded = requester_obj.create_signing_request(point_r, msg)
signature_blinded = signer_obj.blind_sign(msg_blinded)
signature = requester_obj.unblind(signature_blinded)
verifier_obj = ECCBlind(pubkey=signer_obj.pubkey())
self.assertFalse(verifier_obj.verify(msg, signature, value=8))
def test_blind_sig_expiration(self):
"""Test blind signature expiration checking"""
signer_obj = ECCBlind(year=2020, month=1)
point_r = signer_obj.signer_init()
requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
msg = os.urandom(64)
msg_blinded = requester_obj.create_signing_request(point_r, msg)
signature_blinded = signer_obj.blind_sign(msg_blinded)
signature = requester_obj.unblind(signature_blinded)
verifier_obj = ECCBlind(pubkey=signer_obj.pubkey())
self.assertFalse(verifier_obj.verify(msg, signature))
def test_blind_sig_chain(self): # pylint: disable=too-many-locals
"""Test blind signature chain using a random certifier key and a random message"""
test_levels = 4
msg = os.urandom(1024)
ca = ECCBlind()
signer_obj = ca
output = bytearray()
for level in range(test_levels):
if not level:
output.extend(ca.pubkey())
requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
child_obj = ECCBlind()
point_r = signer_obj.signer_init()
pubkey = child_obj.pubkey()
if level == test_levels - 1:
msg_blinded = requester_obj.create_signing_request(point_r,
msg)
else: else:
msg_blinded = requester_obj.create_signing_request(point_r, sys.stderr.write(
pubkey) 'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err))
signature_blinded = signer_obj.blind_sign(msg_blinded) os._exit(0)
signature = requester_obj.unblind(signature_blinded)
if level != test_levels - 1:
output.extend(pubkey)
output.extend(signature)
signer_obj = child_obj
verifychain = ECCBlindChain(ca=ca.pubkey(), chain=bytes(output))
self.assertTrue(verifychain.verify(msg=msg, value=1))
def test_blind_sig_chain_wrong_ca(self): # pylint: disable=too-many-locals def clean_db(self):
"""Test blind signature chain with an unlisted ca""" tables = list(self.cur.execute("select name from sqlite_master where type is 'table'"))
return self.cur.executescript(';'.join(["drop table if exists %s" % i for i in tables]))
test_levels = 4 def test_sql_thread_version_one(self):
msg = os.urandom(1024) """
Test with version 1
"""
ca = ECCBlind() self.clean_db()
fake_ca = ECCBlind() self.normalize_version()
signer_obj = fake_ca self.cur.execute('''INSERT INTO settings VALUES('version','1')''')
upgrade_db = UpgradeDB()
upgrade_db.cur = self.cur
upgrade_db.upgrade_schema_data_1()
output = bytearray()
for level in range(test_levels):
requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
child_obj = ECCBlind()
if not level:
# unlisted CA, but a syntactically valid pubkey
output.extend(fake_ca.pubkey())
point_r = signer_obj.signer_init()
pubkey = child_obj.pubkey()
if level == test_levels - 1:
msg_blinded = requester_obj.create_signing_request(point_r,
msg)
else:
msg_blinded = requester_obj.create_signing_request(point_r,
pubkey)
signature_blinded = signer_obj.blind_sign(msg_blinded)
signature = requester_obj.unblind(signature_blinded)
if level != test_levels - 1:
output.extend(pubkey)
output.extend(signature)
signer_obj = child_obj
verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output))
self.assertFalse(verifychain.verify(msg, 1))
def test_blind_sig_chain_wrong_msg(self): # pylint: disable=too-many-locals
"""Test blind signature chain with a fake message"""
test_levels = 4
msg = os.urandom(1024)
fake_msg = os.urandom(1024)
ca = ECCBlind()
signer_obj = ca
output = bytearray()
for level in range(test_levels):
if not level:
output.extend(ca.pubkey())
requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
child_obj = ECCBlind()
point_r = signer_obj.signer_init()
pubkey = child_obj.pubkey()
if level == test_levels - 1:
msg_blinded = requester_obj.create_signing_request(point_r,
msg)
else:
msg_blinded = requester_obj.create_signing_request(point_r,
pubkey)
signature_blinded = signer_obj.blind_sign(msg_blinded)
signature = requester_obj.unblind(signature_blinded)
if level != test_levels - 1:
output.extend(pubkey)
output.extend(signature)
signer_obj = child_obj
verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output))
self.assertFalse(verifychain.verify(fake_msg, 1))
def test_blind_sig_chain_wrong_intermediary(self): # pylint: disable=too-many-locals
"""Test blind signature chain using a fake intermediary pubkey"""
test_levels = 4
msg = os.urandom(1024)
wrong_level = 2
ca = ECCBlind()
signer_obj = ca
fake_intermediary = ECCBlind()
output = bytearray()
for level in range(test_levels):
if not level:
output.extend(ca.pubkey())
requester_obj = ECCBlind(pubkey=signer_obj.pubkey())
child_obj = ECCBlind()
point_r = signer_obj.signer_init()
pubkey = child_obj.pubkey()
if level == test_levels - 1:
msg_blinded = requester_obj.create_signing_request(point_r,
msg)
else:
msg_blinded = requester_obj.create_signing_request(point_r,
pubkey)
signature_blinded = signer_obj.blind_sign(msg_blinded)
signature = requester_obj.unblind(signature_blinded)
if level == wrong_level:
output.extend(fake_intermediary.pubkey())
elif level != test_levels - 1:
output.extend(pubkey)
output.extend(signature)
signer_obj = child_obj
verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output))
self.assertFalse(verifychain.verify(msg, 1))