From aed71c11abb2a3571bb1c24d780e61142c26c0dc Mon Sep 17 00:00:00 2001 From: Muzahid Date: Mon, 22 Feb 2021 22:09:25 +0530 Subject: [PATCH] test case for sqlthread version 1 --- src/class_sqlThread.py | 3 +- src/tests/test_sqlthread.py | 350 ++++++++++-------------------------- 2 files changed, 96 insertions(+), 257 deletions(-) diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index 56b85de7..db6db07b 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -17,7 +17,7 @@ import state import tr from bmconfigparser import BMConfigParser from debug import logger -pylint: disable=attribute-defined-outside-init,protected-access +# pylint: disable=attribute-defined-outside-init,protected-access class UpgradeDB(): @@ -70,7 +70,6 @@ class UpgradeDB(): For version 1 and 3 Add a new column to the inventory table to store tags. """ - logger.debug( 'In messages.dat database, adding tag field to' ' the inventory table.') diff --git a/src/tests/test_sqlthread.py b/src/tests/test_sqlthread.py index 4994d87c..f45e7139 100644 --- a/src/tests/test_sqlthread.py +++ b/src/tests/test_sqlthread.py @@ -3,272 +3,112 @@ Test for ECC blind signatures """ import os import unittest -from hashlib import sha256 +import shutil # used for moving the messages.dat file +import sqlite3 +import sys +import threading +import time +# print sys.path +# import state +from ..state import appdata +from ..helper_sql import sqlStoredProcedure -from pybitmessage.pyelliptic import ECCBlind, ECCBlindChain, OpenSSL +# import threading +# from hashlib import sha256 +# from ..helper_sql import sqlQuery +# from ..version import softwareVersion +# from common import cleanup +from ..class_sqlThread import (sqlThread, UpgradeDB) -# pylint: disable=protected-access - - -class TestBlindSig(unittest.TestCase): +class TestSqlThread(unittest.TestCase): """ - Test case for ECC blind signature + Test case for SQLThread """ - def test_blind_sig(self): - """Test full sequence using a random certifier key and a random message""" - # See page 127 of the paper - # (1) Initialization - signer_obj = ECCBlind() - point_r = signer_obj.signer_init() - self.assertEqual(len(signer_obj.pubkey()), 35) - # (2) Request - requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) - # only 64 byte messages are planned to be used in Bitmessage - msg = os.urandom(64) - msg_blinded = requester_obj.create_signing_request(point_r, msg) - self.assertEqual(len(msg_blinded), 32) + # def __init__(self): + # initialise Db connection + conn = sqlite3.connect(appdata + 'messages.dat') + conn.text_factory = str + cur = conn.cursor() - # check - self.assertNotEqual(sha256(msg).digest(), msg_blinded) + @classmethod + def setUpClass(cls): + # Start SQL thread + sqlLookup = sqlThread() + sqlLookup.daemon = False + sqlLookup.start() - # (3) Signature Generation - signature_blinded = signer_obj.blind_sign(msg_blinded) - assert isinstance(signature_blinded, bytes) - self.assertEqual(len(signature_blinded), 32) + @classmethod + def tearDownClass(cls): + # Stop sql thread + sqlStoredProcedure('exit') + print "...TearDown" - # (4) Extraction - signature = requester_obj.unblind(signature_blinded) - assert isinstance(signature, bytes) - self.assertEqual(len(signature), 65) + def db_connection(self): + conn = sqlite3.connect(appdata + 'messages.dat') + conn.text_factory = str + return conn.cursor() - self.assertNotEqual(signature, signature_blinded) - - # (5) Verification - verifier_obj = ECCBlind(pubkey=signer_obj.pubkey()) - self.assertTrue(verifier_obj.verify(msg, signature)) - - def test_is_odd(self): - """Test our implementation of BN_is_odd""" - for _ in range(1024): - obj = ECCBlind() - x = OpenSSL.BN_new() - y = OpenSSL.BN_new() - OpenSSL.EC_POINT_get_affine_coordinates( - obj.group, obj.Q, x, y, 0) - self.assertEqual(OpenSSL.BN_is_odd(y), - OpenSSL.BN_is_odd_compatible(y)) - - def test_serialize_ec_point(self): - """Test EC point serialization/deserialization""" - for _ in range(1024): - try: - obj = ECCBlind() - obj2 = ECCBlind() - randompoint = obj.Q - serialized = obj._ec_point_serialize(randompoint) - secondpoint = obj2._ec_point_deserialize(serialized) - x0 = OpenSSL.BN_new() - y0 = OpenSSL.BN_new() - OpenSSL.EC_POINT_get_affine_coordinates(obj.group, - randompoint, x0, - y0, obj.ctx) - x1 = OpenSSL.BN_new() - y1 = OpenSSL.BN_new() - OpenSSL.EC_POINT_get_affine_coordinates(obj2.group, - secondpoint, x1, - y1, obj2.ctx) - - self.assertEqual(OpenSSL.BN_cmp(y0, y1), 0) - self.assertEqual(OpenSSL.BN_cmp(x0, x1), 0) - self.assertEqual(OpenSSL.EC_POINT_cmp(obj.group, randompoint, - secondpoint, 0), 0) - finally: - OpenSSL.BN_free(x0) - OpenSSL.BN_free(x1) - OpenSSL.BN_free(y0) - OpenSSL.BN_free(y1) - del obj - del obj2 - - def test_serialize_bn(self): - """Test Bignum serialization/deserialization""" - for _ in range(1024): - obj = ECCBlind() - obj2 = ECCBlind() - randomnum = obj.d - serialized = obj._bn_serialize(randomnum) - secondnum = obj2._bn_deserialize(serialized) - self.assertEqual(OpenSSL.BN_cmp(randomnum, secondnum), 0) - - def test_blind_sig_many(self): - """Test a lot of blind signatures""" - for _ in range(1024): - self.test_blind_sig() - - def test_blind_sig_value(self): - """Test blind signature value checking""" - signer_obj = ECCBlind(value=5) - point_r = signer_obj.signer_init() - requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) - msg = os.urandom(64) - msg_blinded = requester_obj.create_signing_request(point_r, msg) - signature_blinded = signer_obj.blind_sign(msg_blinded) - signature = requester_obj.unblind(signature_blinded) - verifier_obj = ECCBlind(pubkey=signer_obj.pubkey()) - self.assertFalse(verifier_obj.verify(msg, signature, value=8)) - - def test_blind_sig_expiration(self): - """Test blind signature expiration checking""" - signer_obj = ECCBlind(year=2020, month=1) - point_r = signer_obj.signer_init() - requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) - msg = os.urandom(64) - msg_blinded = requester_obj.create_signing_request(point_r, msg) - signature_blinded = signer_obj.blind_sign(msg_blinded) - signature = requester_obj.unblind(signature_blinded) - verifier_obj = ECCBlind(pubkey=signer_obj.pubkey()) - self.assertFalse(verifier_obj.verify(msg, signature)) - - def test_blind_sig_chain(self): # pylint: disable=too-many-locals - """Test blind signature chain using a random certifier key and a random message""" - - test_levels = 4 - msg = os.urandom(1024) - - ca = ECCBlind() - signer_obj = ca - - output = bytearray() - - for level in range(test_levels): - if not level: - output.extend(ca.pubkey()) - requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) - child_obj = ECCBlind() - point_r = signer_obj.signer_init() - pubkey = child_obj.pubkey() - - if level == test_levels - 1: - msg_blinded = requester_obj.create_signing_request(point_r, - msg) + def normalize_version(self): + try: + self.cur.execute( + '''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text,''' + ''' received text, message text, folder text, encodingtype int, read bool, sighash blob,''' + ''' UNIQUE(msgid) ON CONFLICT REPLACE)''') + self.cur.execute( + '''CREATE TABLE sent (msgid blob, toaddress text, toripe blob, fromaddress text, subject text,''' + ''' message text, ackdata blob, senttime integer, lastactiontime integer,''' + ''' sleeptill integer, status text, retrynumber integer, folder text, encodingtype int, ttl int)''') + self.cur.execute( + '''CREATE TABLE subscriptions (label text, address text, enabled bool)''') + self.cur.execute( + '''CREATE TABLE addressbook (label text, address text, UNIQUE(address) ON CONFLICT IGNORE)''') + self.cur.execute( + '''CREATE TABLE blacklist (label text, address text, enabled bool)''') + self.cur.execute( + '''CREATE TABLE whitelist (label text, address text, enabled bool)''') + self.cur.execute( + '''CREATE TABLE pubkeys (address text, addressversion int, transmitdata blob, time int,''' + ''' usedpersonally text, UNIQUE(address) ON CONFLICT REPLACE)''') + # self.cur.execute( + # '''CREATE TABLE inventory (hash blob, objecttype int, streamnumber int, payload blob,''' + # ''' expirestime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''') + self.cur.execute( + '''CREATE TABLE inventory (hash blob, objecttype int, streamnumber int, payload blob,''' + ''' expirestime integer, UNIQUE(hash) ON CONFLICT REPLACE)''') + # self.cur.execute( + # '''INSERT INTO subscriptions VALUES''' + # '''('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''') + self.cur.execute('''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''') + # self.cur.execute('''INSERT INTO settings VALUES('version','11')''') + # self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', ( + # int(time.time()),)) + self.cur.execute( + '''CREATE TABLE objectprocessorqueue''' + ''' (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''') + self.conn.commit() + except Exception as err: + print err.message + if str(err) == 'table inbox already exists': + print "table inbox already exists" else: - msg_blinded = requester_obj.create_signing_request(point_r, - pubkey) - signature_blinded = signer_obj.blind_sign(msg_blinded) - signature = requester_obj.unblind(signature_blinded) - if level != test_levels - 1: - output.extend(pubkey) - output.extend(signature) - signer_obj = child_obj - verifychain = ECCBlindChain(ca=ca.pubkey(), chain=bytes(output)) - self.assertTrue(verifychain.verify(msg=msg, value=1)) + sys.stderr.write( + 'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err)) + os._exit(0) - def test_blind_sig_chain_wrong_ca(self): # pylint: disable=too-many-locals - """Test blind signature chain with an unlisted ca""" + def clean_db(self): + tables = list(self.cur.execute("select name from sqlite_master where type is 'table'")) + return self.cur.executescript(';'.join(["drop table if exists %s" % i for i in tables])) - test_levels = 4 - msg = os.urandom(1024) + def test_sql_thread_version_one(self): + """ + Test with version 1 + """ - ca = ECCBlind() - fake_ca = ECCBlind() - signer_obj = fake_ca + self.clean_db() + self.normalize_version() + self.cur.execute('''INSERT INTO settings VALUES('version','1')''') + upgrade_db = UpgradeDB() + upgrade_db.cur = self.cur + upgrade_db.upgrade_schema_data_1() - output = bytearray() - - for level in range(test_levels): - requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) - child_obj = ECCBlind() - if not level: - # unlisted CA, but a syntactically valid pubkey - output.extend(fake_ca.pubkey()) - point_r = signer_obj.signer_init() - pubkey = child_obj.pubkey() - - if level == test_levels - 1: - msg_blinded = requester_obj.create_signing_request(point_r, - msg) - else: - msg_blinded = requester_obj.create_signing_request(point_r, - pubkey) - signature_blinded = signer_obj.blind_sign(msg_blinded) - signature = requester_obj.unblind(signature_blinded) - if level != test_levels - 1: - output.extend(pubkey) - output.extend(signature) - signer_obj = child_obj - verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output)) - self.assertFalse(verifychain.verify(msg, 1)) - - def test_blind_sig_chain_wrong_msg(self): # pylint: disable=too-many-locals - """Test blind signature chain with a fake message""" - - test_levels = 4 - msg = os.urandom(1024) - fake_msg = os.urandom(1024) - - ca = ECCBlind() - signer_obj = ca - - output = bytearray() - - for level in range(test_levels): - if not level: - output.extend(ca.pubkey()) - requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) - child_obj = ECCBlind() - point_r = signer_obj.signer_init() - pubkey = child_obj.pubkey() - - if level == test_levels - 1: - msg_blinded = requester_obj.create_signing_request(point_r, - msg) - else: - msg_blinded = requester_obj.create_signing_request(point_r, - pubkey) - signature_blinded = signer_obj.blind_sign(msg_blinded) - signature = requester_obj.unblind(signature_blinded) - if level != test_levels - 1: - output.extend(pubkey) - output.extend(signature) - signer_obj = child_obj - verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output)) - self.assertFalse(verifychain.verify(fake_msg, 1)) - - def test_blind_sig_chain_wrong_intermediary(self): # pylint: disable=too-many-locals - """Test blind signature chain using a fake intermediary pubkey""" - - test_levels = 4 - msg = os.urandom(1024) - wrong_level = 2 - - ca = ECCBlind() - signer_obj = ca - fake_intermediary = ECCBlind() - - output = bytearray() - - for level in range(test_levels): - if not level: - output.extend(ca.pubkey()) - requester_obj = ECCBlind(pubkey=signer_obj.pubkey()) - child_obj = ECCBlind() - point_r = signer_obj.signer_init() - pubkey = child_obj.pubkey() - - if level == test_levels - 1: - msg_blinded = requester_obj.create_signing_request(point_r, - msg) - else: - msg_blinded = requester_obj.create_signing_request(point_r, - pubkey) - signature_blinded = signer_obj.blind_sign(msg_blinded) - signature = requester_obj.unblind(signature_blinded) - if level == wrong_level: - output.extend(fake_intermediary.pubkey()) - elif level != test_levels - 1: - output.extend(pubkey) - output.extend(signature) - signer_obj = child_obj - verifychain = ECCBlindChain(ca=ca.pubkey(), chain=str(output)) - self.assertFalse(verifychain.verify(msg, 1))