diff --git a/src/addresses.py b/src/addresses.py index 1ac5ea40..79d4c075 100644 --- a/src/addresses.py +++ b/src/addresses.py @@ -1,8 +1,7 @@ """ Operations with addresses """ -# pylint: disable=redefined-outer-name,inconsistent-return-statements -import sys +# pylint: disable=inconsistent-return-statements import hashlib import logging from binascii import hexlify, unhexlify @@ -151,30 +150,18 @@ def encodeAddress(version, stream, ripe): ' a given ripe hash was not 20.' ) - if isinstance(ripe, str): - if ripe[:2] == '\x00\x00': - ripe = ripe[2:] - elif ripe[:1] == '\x00': - ripe = ripe[1:] - else: - if ripe[:2] == b'\x00\x00': - ripe = ripe[2:] - elif ripe[:1] == b'\x00': - ripe = ripe[1:] + if ripe[:2] == b'\x00\x00': + ripe = ripe[2:] + elif ripe[:1] == b'\x00': + ripe = ripe[1:] elif version == 4: if len(ripe) != 20: raise Exception( 'Programming error in encodeAddress: The length of' ' a given ripe hash was not 20.') - ripe = ripe.lstrip('\x00') + ripe = ripe.lstrip(b'\x00') - if sys.version_info[0] == 3: - if isinstance(ripe, str): - storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe.encode('utf-8') - else: - storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe - else: - storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe + storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe # Generate the checksum sha = hashlib.new('sha512') @@ -184,7 +171,10 @@ def encodeAddress(version, stream, ripe): sha.update(currentHash) checksum = sha.digest()[0:4] + # FIXME: encodeBase58 should take binary data, to reduce conversions + # encodeBase58(storedBinaryData + checksum) asInt = int(hexlify(storedBinaryData) + hexlify(checksum), 16) + # should it be str? If yes, it should be everywhere in the code return 'BM-' + encodeBase58(asInt) diff --git a/src/highlevelcrypto.py b/src/highlevelcrypto.py index f89a31c8..82743acf 100644 --- a/src/highlevelcrypto.py +++ b/src/highlevelcrypto.py @@ -10,17 +10,22 @@ High level cryptographic functions based on `.pyelliptic` OpenSSL bindings. from binascii import hexlify import pyelliptic -from bmconfigparser import BMConfigParser from pyelliptic import OpenSSL from pyelliptic import arithmetic as a +from bmconfigparser import BMConfigParser + +__all__ = ['encrypt', 'makeCryptor', 'pointMult', 'privToPub', 'sign', 'verify'] + def makeCryptor(privkey): """Return a private `.pyelliptic.ECC` instance""" private_key = a.changebase(privkey, 16, 256, minlen=32) public_key = pointMult(private_key) - privkey_bin = '\x02\xca\x00\x20' + private_key - pubkey_bin = '\x02\xca\x00\x20' + public_key[1:-32] + '\x00\x20' + public_key[-32:] + privkey_bin = b'\x02\xca\x00\x20' + private_key + pubkey_bin = ( + b'\x02\xca\x00\x20' + public_key[1:-32] + b'\x00\x20' + public_key[-32:] + ) cryptor = pyelliptic.ECC( curve='secp256k1', privkey=privkey_bin, pubkey=pubkey_bin) return cryptor @@ -29,7 +34,7 @@ def makeCryptor(privkey): def hexToPubkey(pubkey): """Convert a pubkey from hex to binary""" pubkey_raw = a.changebase(pubkey[2:], 16, 256, minlen=64) - pubkey_bin = '\x02\xca\x00 ' + pubkey_raw[:32] + '\x00 ' + pubkey_raw[32:] + pubkey_bin = b'\x02\xca\x00 ' + pubkey_raw[:32] + b'\x00 ' + pubkey_raw[32:] return pubkey_bin diff --git a/src/protocol.py b/src/protocol.py index 4f2d0856..e2d80512 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -91,16 +91,16 @@ def isBitSetWithinBitfield(fourByteString, n): return x & 2**n != 0 -# ip addresses +# IP addresses def encodeHost(host): """Encode a given host to be used in low-level socket operations""" if host.find('.onion') > -1: - return '\xfd\x87\xd8\x7e\xeb\x43' + base64.b32decode( + return b'\xfd\x87\xd8\x7e\xeb\x43' + base64.b32decode( host.split(".")[0], True) elif host.find(':') == -1: - return '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ + return b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ socket.inet_aton(host) return socket.inet_pton(socket.AF_INET6, host) @@ -147,10 +147,10 @@ def checkIPAddress(host, private=False): Returns hostStandardFormat if it is a valid IP address, otherwise returns False """ - if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': + if host[0:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': hostStandardFormat = socket.inet_ntop(socket.AF_INET, host[12:]) return checkIPv4Address(host[12:], hostStandardFormat, private) - elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43': + elif host[0:6] == b'\xfd\x87\xd8\x7e\xeb\x43': # Onion, based on BMD/bitcoind hostStandardFormat = base64.b32encode(host[6:]).lower() + ".onion" if private: @@ -161,7 +161,7 @@ def checkIPAddress(host, private=False): hostStandardFormat = socket.inet_ntop(socket.AF_INET6, host) except ValueError: return False - if hostStandardFormat == "": + if len(hostStandardFormat) == 0: # This can happen on Windows systems which are # not 64-bit compatible so let us drop the IPv6 address. return False @@ -173,23 +173,23 @@ def checkIPv4Address(host, hostStandardFormat, private=False): Returns hostStandardFormat if it is an IPv4 address, otherwise returns False """ - if host[0] == '\x7F': # 127/8 + if host[0:1] == b'\x7F': # 127/8 if not private: logger.debug( 'Ignoring IP address in loopback range: %s', hostStandardFormat) return hostStandardFormat if private else False - if host[0] == '\x0A': # 10/8 + if host[0:1] == b'\x0A': # 10/8 if not private: logger.debug( 'Ignoring IP address in private range: %s', hostStandardFormat) return hostStandardFormat if private else False - if host[0:2] == '\xC0\xA8': # 192.168/16 + if host[0:2] == b'\xC0\xA8': # 192.168/16 if not private: logger.debug( 'Ignoring IP address in private range: %s', hostStandardFormat) return hostStandardFormat if private else False - if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12 + if host[0:2] >= b'\xAC\x10' and host[0:2] < b'\xAC\x20': # 172.16/12 if not private: logger.debug( 'Ignoring IP address in private range: %s', hostStandardFormat) @@ -202,15 +202,19 @@ def checkIPv6Address(host, hostStandardFormat, private=False): Returns hostStandardFormat if it is an IPv6 address, otherwise returns False """ - if host == ('\x00' * 15) + '\x01': + if host == b'\x00' * 15 + b'\x01': if not private: logger.debug('Ignoring loopback address: %s', hostStandardFormat) return False - if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80: + try: + host = [ord(c) for c in host[:2]] + except TypeError: # python3 has ints already + pass + if host[0] == 0xfe and host[1] & 0xc0 == 0x80: if not private: logger.debug('Ignoring local address: %s', hostStandardFormat) return hostStandardFormat if private else False - if (ord(host[0]) & 0xfe) == 0xfc: + if host[0] & 0xfe == 0xfc: if not private: logger.debug( 'Ignoring unique local address: %s', hostStandardFormat) diff --git a/src/pyelliptic/arithmetic.py b/src/pyelliptic/arithmetic.py index cb3049c0..23c24b5e 100644 --- a/src/pyelliptic/arithmetic.py +++ b/src/pyelliptic/arithmetic.py @@ -25,28 +25,31 @@ def inv(a, n): def get_code_string(base): """Returns string according to base value""" if base == 2: - return '01' - elif base == 10: - return '0123456789' - elif base == 16: - return "0123456789abcdef" - elif base == 58: - return "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" - elif base == 256: - return ''.join([chr(x) for x in range(256)]) - else: - raise ValueError("Invalid base!") + return b'01' + if base == 10: + return b'0123456789' + if base == 16: + return b'0123456789abcdef' + if base == 58: + return b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' + if base == 256: + try: + return b''.join([chr(x) for x in range(256)]) + except TypeError: + return bytes([x for x in range(256)]) + + raise ValueError("Invalid base!") def encode(val, base, minlen=0): """Returns the encoded string""" code_string = get_code_string(base) - result = "" + result = b'' while val > 0: val, i = divmod(val, base) - result = code_string[i] + result + result = code_string[i:i + 1] + result if len(result) < minlen: - result = code_string[0] * (minlen - len(result)) + result + result = code_string[0:1] * (minlen - len(result)) + result return result @@ -116,7 +119,7 @@ def hex_to_point(h): def point_to_hex(p): """Converting point value to hexadecimal""" - return '04' + encode(p[0], 16, 64) + encode(p[1], 16, 64) + return b'04' + encode(p[0], 16, 64) + encode(p[1], 16, 64) def multiply(privkey, pubkey): diff --git a/src/pyelliptic/tests/__init__.py b/src/pyelliptic/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/pyelliptic/tests/test_arithmetic.py b/src/pyelliptic/tests/test_arithmetic.py new file mode 100644 index 00000000..7b5c59b1 --- /dev/null +++ b/src/pyelliptic/tests/test_arithmetic.py @@ -0,0 +1,84 @@ +""" +Test the arithmetic functions +""" + +from binascii import unhexlify +import unittest + +try: + from pyelliptic import arithmetic +except ImportError: + from pybitmessage.pyelliptic import arithmetic + + +# These keys are from addresses test script +sample_pubsigningkey = ( + b'044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d' + b'16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce') +sample_pubencryptionkey = ( + b'044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c' + b'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9') +sample_privsigningkey = \ + b'93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665' +sample_privencryptionkey = \ + b'4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a' + +sample_factor = \ + 66858749573256452658262553961707680376751171096153613379801854825275240965733 +# G * sample_factor +sample_point = ( + 33567437183004486938355437500683826356288335339807546987348409590129959362313, + 94730058721143827257669456336351159718085716196507891067256111928318063085006 +) + + +class TestArithmetic(unittest.TestCase): + """Test arithmetic functions""" + def test_base10_multiply(self): + """Test arithmetic.base10_multiply""" + self.assertEqual( + sample_point, + arithmetic.base10_multiply(arithmetic.G, sample_factor)) + + def test_decode(self): + """Decode sample privsigningkey from hex to int and compare to factor""" + self.assertEqual( + arithmetic.decode(sample_privsigningkey, 16), sample_factor) + + def test_encode(self): + """Encode sample factor into hex and compare to privsigningkey""" + self.assertEqual( + arithmetic.encode(sample_factor, 16), sample_privsigningkey) + + def test_changebase(self): + """Check the results of changebase()""" + self.assertEqual( + arithmetic.changebase(sample_privsigningkey, 16, 256, minlen=32), + unhexlify(sample_privsigningkey)) + self.assertEqual( + arithmetic.changebase(sample_pubsigningkey, 16, 256, minlen=64), + unhexlify(sample_pubsigningkey)) + self.assertEqual( + 32, # padding + len(arithmetic.changebase(sample_privsigningkey[:5], 16, 256, 32))) + + def test_hex_to_point(self): + """Check that sample_pubsigningkey is sample_point encoded in hex""" + self.assertEqual( + arithmetic.hex_to_point(sample_pubsigningkey), sample_point) + + def test_point_to_hex(self): + """Check that sample_point is sample_pubsigningkey decoded from hex""" + self.assertEqual( + arithmetic.point_to_hex(sample_point), sample_pubsigningkey) + + def test_privtopub(self): + """Generate public keys and check the result""" + self.assertEqual( + arithmetic.privtopub(sample_privsigningkey), + sample_pubsigningkey + ) + self.assertEqual( + arithmetic.privtopub(sample_privencryptionkey), + sample_pubencryptionkey + ) diff --git a/src/tests/test_blindsig.py b/src/pyelliptic/tests/test_blindsig.py similarity index 98% rename from src/tests/test_blindsig.py rename to src/pyelliptic/tests/test_blindsig.py index ad6673e7..9ed72081 100644 --- a/src/tests/test_blindsig.py +++ b/src/pyelliptic/tests/test_blindsig.py @@ -5,7 +5,10 @@ import os import unittest from hashlib import sha256 -from pybitmessage.pyelliptic import ECCBlind, ECCBlindChain, OpenSSL +try: + from pyelliptic import ECCBlind, ECCBlindChain, OpenSSL +except ImportError: + from pybitmessage.pyelliptic import ECCBlind, ECCBlindChain, OpenSSL # pylint: disable=protected-access diff --git a/src/tests/test_openssl.py b/src/pyelliptic/tests/test_openssl.py similarity index 89% rename from src/tests/test_openssl.py rename to src/pyelliptic/tests/test_openssl.py index c62bb8b3..cb789277 100644 --- a/src/tests/test_openssl.py +++ b/src/pyelliptic/tests/test_openssl.py @@ -3,7 +3,10 @@ Test if OpenSSL is working correctly """ import unittest -from pybitmessage.pyelliptic.openssl import OpenSSL +try: + from pyelliptic.openssl import OpenSSL +except ImportError: + from pybitmessage.pyelliptic import OpenSSL try: OpenSSL.BN_bn2binpad @@ -33,7 +36,7 @@ class TestOpenSSL(unittest.TestCase): @unittest.skipUnless(have_pad, 'Skipping OpenSSL pad test') def test_padding(self): - """Test an alternatie implementation of bn2binpad""" + """Test an alternative implementation of bn2binpad""" ctx = OpenSSL.BN_CTX_new() a = OpenSSL.BN_new() diff --git a/src/tests/samples.py b/src/tests/samples.py new file mode 100644 index 00000000..c6cef927 --- /dev/null +++ b/src/tests/samples.py @@ -0,0 +1,31 @@ +"""Various sample data""" + +from binascii import unhexlify + + +# These keys are from addresses test script +sample_pubsigningkey = unhexlify( + '044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d' + '16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce') +sample_pubencryptionkey = unhexlify( + '044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c' + 'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9') +sample_privsigningkey = \ + b'93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665' +sample_privencryptionkey = \ + b'4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a' +sample_ripe = b'003cd097eb7f35c87b5dc8b4538c22cb55312a9f' +# stream: 1, version: 2 +sample_address = 'BM-onkVu1KKL2UaUss5Upg9vXmqd3esTmV79' + +sample_factor = 66858749573256452658262553961707680376751171096153613379801854825275240965733 +# G * sample_factor +sample_point = ( + 33567437183004486938355437500683826356288335339807546987348409590129959362313, + 94730058721143827257669456336351159718085716196507891067256111928318063085006 +) + +sample_seed = 'TIGER, tiger, burning bright. In the forests of the night' +# Deterministic addresses with stream 1 and versions 3, 4 +sample_deterministic_addr3 = 'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN' +sample_deterministic_addr4 = 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK' diff --git a/src/tests/test_addresses.py b/src/tests/test_addresses.py new file mode 100644 index 00000000..e37e2854 --- /dev/null +++ b/src/tests/test_addresses.py @@ -0,0 +1,35 @@ + +import unittest +from binascii import unhexlify + +from pybitmessage import addresses + +from .samples import sample_address, sample_ripe + + +class TestAddresses(unittest.TestCase): + """Test addresses manipulations""" + + def test_decode(self): + """Decode some well known addresses and check the result""" + self.assertEqual( + addresses.decodeAddress(sample_address), + ('success', 2, 1, unhexlify(sample_ripe))) + + status, version, stream, ripe1 = addresses.decodeAddress( + '2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK') + self.assertEqual(status, 'success') + self.assertEqual(stream, 1) + self.assertEqual(version, 4) + status, version, stream, ripe2 = addresses.decodeAddress( + '2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN') + self.assertEqual(status, 'success') + self.assertEqual(stream, 1) + self.assertEqual(version, 3) + self.assertEqual(ripe1, ripe2) + + def test_encode(self): + """Encode sample ripe and compare the result to sample address""" + self.assertEqual( + sample_address, + addresses.encodeAddress(2, 1, unhexlify(sample_ripe))) diff --git a/src/tests/test_api.py b/src/tests/test_api.py index fffbbfc0..6e71c90b 100644 --- a/src/tests/test_api.py +++ b/src/tests/test_api.py @@ -10,6 +10,9 @@ from six.moves import xmlrpc_client # nosec import psutil +from .samples import ( + sample_seed, sample_deterministic_addr3, sample_deterministic_addr4) + from .test_process import TestProcessProto @@ -58,9 +61,7 @@ class TestAPIShutdown(TestAPIProto): class TestAPI(TestAPIProto): """Main API test case""" - _seed = base64.encodestring( - 'TIGER, tiger, burning bright. In the forests of the night' - ) + _seed = base64.encodestring(sample_seed) def _add_random_address(self, label): return self.api.createRandomAddress(base64.encodestring(label)) @@ -108,7 +109,7 @@ class TestAPI(TestAPIProto): def test_decode_address(self): """Checking the return of API command 'decodeAddress'""" result = json.loads( - self.api.decodeAddress('BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')) + self.api.decodeAddress(sample_deterministic_addr4)) self.assertEqual(result.get('status'), 'success') self.assertEqual(result['addressVersion'], 4) self.assertEqual(result['streamNumber'], 1) @@ -117,10 +118,10 @@ class TestAPI(TestAPIProto): """Test creation of deterministic addresses""" self.assertEqual( self.api.getDeterministicAddress(self._seed, 4, 1), - 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK') + sample_deterministic_addr4) self.assertEqual( self.api.getDeterministicAddress(self._seed, 3, 1), - 'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN') + sample_deterministic_addr3) self.assertRegexpMatches( self.api.getDeterministicAddress(self._seed, 2, 1), r'^API Error 0002:') @@ -150,7 +151,7 @@ class TestAPI(TestAPIProto): self.api.createDeterministicAddresses(self._seed, 2, 4) )['addresses'] self.assertEqual(len(addresses), 2) - self.assertEqual(addresses[0], 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK') + self.assertEqual(addresses[0], sample_deterministic_addr4) for addr in addresses: self.assertEqual(self.api.deleteAddress(addr), 'success') @@ -172,19 +173,18 @@ class TestAPI(TestAPIProto): ) # Add known address self.api.addAddressBookEntry( - 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK', + sample_deterministic_addr4, base64.encodestring('tiger_4') ) # Check addressbook entry entries = json.loads( self.api.listAddressBookEntries()).get('addresses')[0] self.assertEqual( - entries['address'], 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK') + entries['address'], sample_deterministic_addr4) self.assertEqual( base64.decodestring(entries['label']), 'tiger_4') # Remove known address - self.api.deleteAddressBookEntry( - 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK') + self.api.deleteAddressBookEntry(sample_deterministic_addr4) # Addressbook should be empty again self.assertEqual( json.loads(self.api.listAddressBookEntries()).get('addresses'), @@ -218,7 +218,7 @@ class TestAPI(TestAPIProto): msg = base64.encodestring('test message') msg_subject = base64.encodestring('test_subject') ackdata = self.api.sendMessage( - 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK', addr, msg_subject, msg) + sample_deterministic_addr4, addr, msg_subject, msg) try: # Check ackdata and message status int(ackdata, 16) @@ -332,19 +332,12 @@ class TestAPI(TestAPIProto): """Testing chan creation/joining""" # Create chan with known address self.assertEqual( - self.api.createChan(self._seed), - 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK' - ) + self.api.createChan(self._seed), sample_deterministic_addr4) # cleanup self.assertEqual( - self.api.leaveChan('BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'), - 'success' - ) + self.api.leaveChan(sample_deterministic_addr4), 'success') # Join chan with addresses of version 3 or 4 - for addr in ( - 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK', - 'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN' - ): + for addr in (sample_deterministic_addr4, sample_deterministic_addr3): self.assertEqual(self.api.joinChan(self._seed, addr), 'success') self.assertEqual(self.api.leaveChan(addr), 'success') # Joining with wrong address should fail diff --git a/src/tests/test_crypto.py b/src/tests/test_crypto.py index b94f626e..38410359 100644 --- a/src/tests/test_crypto.py +++ b/src/tests/test_crypto.py @@ -5,9 +5,9 @@ Test the alternatives for crypto primitives import hashlib import unittest from abc import ABCMeta, abstractmethod -from binascii import hexlify, unhexlify +from binascii import hexlify -from pybitmessage.pyelliptic import arithmetic +from pybitmessage import highlevelcrypto try: @@ -15,29 +15,12 @@ try: except ImportError: RIPEMD = None - -# These keys are from addresses test script -sample_pubsigningkey = unhexlify( - '044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d' - '16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce') -sample_pubencryptionkey = unhexlify( - '044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c' - 'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9') -sample_privsigningkey = \ - '93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665' -sample_privencryptionkey = \ - '4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a' -sample_ripe = b'003cd097eb7f35c87b5dc8b4538c22cb55312a9f' -# stream: 1, version: 2 -sample_address = 'BM-onkVu1KKL2UaUss5Upg9vXmqd3esTmV79' - -sample_factor = 66858749573256452658262553961707680376751171096153613379801854825275240965733 -# G * sample_factor -sample_point = ( - 33567437183004486938355437500683826356288335339807546987348409590129959362313, - 94730058721143827257669456336351159718085716196507891067256111928318063085006 +from .samples import ( + sample_pubsigningkey, sample_pubencryptionkey, + sample_privsigningkey, sample_privencryptionkey, sample_ripe ) + _sha = hashlib.new('sha512') _sha.update(sample_pubsigningkey + sample_pubencryptionkey) @@ -76,38 +59,16 @@ class TestCrypto(RIPEMD160TestCase, unittest.TestCase): return RIPEMD.RIPEMD160Hash(data).digest() -class TestAddresses(unittest.TestCase): - """Test addresses manipulations""" - def test_base10_multiply(self): - """Test arithmetic.base10_multiply""" - self.assertEqual( - sample_point, - arithmetic.base10_multiply(arithmetic.G, sample_factor)) +class TestHighlevelcrypto(unittest.TestCase): + """Test highlevelcrypto public functions""" def test_privtopub(self): """Generate public keys and check the result""" self.assertEqual( - arithmetic.privtopub(sample_privsigningkey).encode(), + highlevelcrypto.privToPub(sample_privsigningkey), hexlify(sample_pubsigningkey) ) self.assertEqual( - arithmetic.privtopub(sample_privencryptionkey).encode(), + highlevelcrypto.privToPub(sample_privencryptionkey), hexlify(sample_pubencryptionkey) ) - - def test_address(self): - """Create address and check the result""" - from pybitmessage import addresses - from pybitmessage.fallback import RIPEMD160Hash - - sha = hashlib.new('sha512') - sha.update(sample_pubsigningkey + sample_pubencryptionkey) - ripe_hash = RIPEMD160Hash(sha.digest()).digest() - self.assertEqual(ripe_hash, unhexlify(sample_ripe)) - - self.assertEqual( - addresses.encodeAddress(2, 1, ripe_hash), sample_address) - - self.assertEqual( - addresses.decodeAddress(sample_address), - ('success', 2, 1, ripe_hash)) diff --git a/src/tests/test_networkgroup.py b/src/tests/test_networkgroup.py deleted file mode 100644 index 79163402..00000000 --- a/src/tests/test_networkgroup.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -Test for network group -""" -import unittest - -from .common import skip_python3 - -skip_python3() - - -class TestNetworkGroup(unittest.TestCase): - """ - Test case for network group - """ - def test_network_group(self): - """Test various types of network groups""" - from pybitmessage.protocol import network_group - - test_ip = '1.2.3.4' - self.assertEqual('\x01\x02', network_group(test_ip)) - - test_ip = '127.0.0.1' - self.assertEqual('IPv4', network_group(test_ip)) - - test_ip = '0102:0304:0506:0708:090A:0B0C:0D0E:0F10' - self.assertEqual( - '\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C', - network_group(test_ip)) - - test_ip = 'bootstrap8444.bitmessage.org' - self.assertEqual( - 'bootstrap8444.bitmessage.org', - network_group(test_ip)) - - test_ip = 'quzwelsuziwqgpt2.onion' - self.assertEqual( - test_ip, - network_group(test_ip)) - - test_ip = None - self.assertEqual( - None, - network_group(test_ip)) diff --git a/src/tests/test_protocol.py b/src/tests/test_protocol.py index a3c73a73..ee649481 100644 --- a/src/tests/test_protocol.py +++ b/src/tests/test_protocol.py @@ -2,25 +2,102 @@ Tests for common protocol functions """ +import sys import unittest -from .common import skip_python3 - -skip_python3() +from pybitmessage import protocol, state class TestProtocol(unittest.TestCase): """Main protocol test case""" + def test_checkIPv4Address(self): + """Check the results of protocol.checkIPv4Address()""" + token = 'HELLO' + # checking protocol.encodeHost()[12:] + self.assertEqual( # 127.0.0.1 + token, protocol.checkIPv4Address(b'\x7f\x00\x00\x01', token, True)) + self.assertFalse( + protocol.checkIPv4Address(b'\x7f\x00\x00\x01', token)) + self.assertEqual( # 10.42.43.1 + token, protocol.checkIPv4Address(b'\n*+\x01', token, True)) + self.assertFalse( + protocol.checkIPv4Address(b'\n*+\x01', token, False)) + self.assertEqual( # 192.168.0.254 + token, protocol.checkIPv4Address(b'\xc0\xa8\x00\xfe', token, True)) + self.assertEqual( # 172.31.255.254 + token, protocol.checkIPv4Address(b'\xac\x1f\xff\xfe', token, True)) + # self.assertEqual( # 169.254.1.1 + # token, protocol.checkIPv4Address(b'\xa9\xfe\x01\x01', token, True)) + # self.assertEqual( # 254.128.1.1 + # token, protocol.checkIPv4Address(b'\xfe\x80\x01\x01', token, True)) + self.assertFalse( # 8.8.8.8 + protocol.checkIPv4Address(b'\x08\x08\x08\x08', token, True)) + + def test_checkIPv6Address(self): + """Check the results of protocol.checkIPv6Address()""" + test_ip = '2001:db8::ff00:42:8329' + self.assertEqual( + 'test', protocol.checkIPv6Address( + protocol.encodeHost(test_ip), 'test')) + self.assertFalse( + protocol.checkIPv6Address( + protocol.encodeHost(test_ip), 'test', True)) + for test_ip in ('fe80::200:5aee:feaa:20a2', 'fdf8:f53b:82e4::53'): + self.assertEqual( + 'test', protocol.checkIPv6Address( + protocol.encodeHost(test_ip), 'test', True)) + self.assertFalse( + protocol.checkIPv6Address( + protocol.encodeHost(test_ip), 'test')) + def test_check_local(self): """Check the logic of TCPConnection.local""" - from pybitmessage import protocol, state - self.assertTrue( protocol.checkIPAddress(protocol.encodeHost('127.0.0.1'), True)) self.assertTrue( protocol.checkIPAddress(protocol.encodeHost('192.168.0.1'), True)) + self.assertTrue( + protocol.checkIPAddress(protocol.encodeHost('10.42.43.1'), True)) + self.assertTrue( + protocol.checkIPAddress(protocol.encodeHost('172.31.255.2'), True)) + self.assertFalse(protocol.checkIPAddress( + protocol.encodeHost('2001:db8::ff00:42:8329'), True)) + globalhost = protocol.encodeHost('8.8.8.8') + self.assertFalse(protocol.checkIPAddress(globalhost, True)) + self.assertEqual(protocol.checkIPAddress(globalhost), '8.8.8.8') + + @unittest.skipIf( + sys.hexversion >= 0x3000000, 'this is still not working with python3') + def test_check_local_socks(self): + """The SOCKS part of the local check""" self.assertTrue( not protocol.checkSocksIP('127.0.0.1') or state.socksIP) + + def test_network_group(self): + """Test various types of network groups""" + + test_ip = '1.2.3.4' + self.assertEqual(b'\x01\x02', protocol.network_group(test_ip)) + + test_ip = '127.0.0.1' + self.assertEqual('IPv4', protocol.network_group(test_ip)) + + self.assertEqual( + protocol.network_group('8.8.8.8'), + protocol.network_group('8.8.4.4')) + self.assertNotEqual( + protocol.network_group('1.1.1.1'), + protocol.network_group('8.8.8.8')) + + test_ip = '0102:0304:0506:0708:090A:0B0C:0D0E:0F10' + self.assertEqual( + b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C', + protocol.network_group(test_ip)) + + for test_ip in ( + 'bootstrap8444.bitmessage.org', 'quzwelsuziwqgpt2.onion', None): + self.assertEqual( + test_ip, protocol.network_group(test_ip)) diff --git a/tests.py b/tests.py index 0e88ba63..b933b212 100644 --- a/tests.py +++ b/tests.py @@ -14,7 +14,10 @@ def unittest_discover(): # randomize the order of tests in test cases loader.sortTestMethodsUsing = lambda a, b: random.randint(-1, 1) # pybitmessage symlink may disappear on Windows - return loader.discover('src.tests') + testsuite = loader.discover('src.tests') + testsuite.addTests([loader.discover('src.pyelliptic')]) + + return testsuite if __name__ == "__main__":