Refactoring crypto base changes #1806

Merged
g1itch merged 14 commits from crypto-sort into v0.6 2021-08-17 15:07:33 +02:00
15 changed files with 324 additions and 175 deletions

View File

@ -1,8 +1,7 @@
""" """
Operations with addresses Operations with addresses
""" """
# pylint: disable=redefined-outer-name,inconsistent-return-statements # pylint: disable=inconsistent-return-statements
import sys
import hashlib import hashlib
import logging import logging
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
@ -151,12 +150,6 @@ def encodeAddress(version, stream, ripe):
' a given ripe hash was not 20.' ' a given ripe hash was not 20.'
) )
if isinstance(ripe, str):
if ripe[:2] == '\x00\x00':
ripe = ripe[2:]
elif ripe[:1] == '\x00':
ripe = ripe[1:]
else:
if ripe[:2] == b'\x00\x00': if ripe[:2] == b'\x00\x00':
ripe = ripe[2:] ripe = ripe[2:]
elif ripe[:1] == b'\x00': elif ripe[:1] == b'\x00':
@ -166,14 +159,8 @@ def encodeAddress(version, stream, ripe):
raise Exception( raise Exception(
'Programming error in encodeAddress: The length of' 'Programming error in encodeAddress: The length of'
' a given ripe hash was not 20.') ' a given ripe hash was not 20.')
ripe = ripe.lstrip('\x00') ripe = ripe.lstrip(b'\x00')
if sys.version_info[0] == 3:
if isinstance(ripe, str):
storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe.encode('utf-8')
else:
storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe
else:
storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe
# Generate the checksum # Generate the checksum
@ -184,7 +171,10 @@ def encodeAddress(version, stream, ripe):
sha.update(currentHash) sha.update(currentHash)
checksum = sha.digest()[0:4] checksum = sha.digest()[0:4]
# FIXME: encodeBase58 should take binary data, to reduce conversions
# encodeBase58(storedBinaryData + checksum)
asInt = int(hexlify(storedBinaryData) + hexlify(checksum), 16) asInt = int(hexlify(storedBinaryData) + hexlify(checksum), 16)
# should it be str? If yes, it should be everywhere in the code
return 'BM-' + encodeBase58(asInt) return 'BM-' + encodeBase58(asInt)

View File

@ -10,17 +10,22 @@ High level cryptographic functions based on `.pyelliptic` OpenSSL bindings.
from binascii import hexlify from binascii import hexlify
import pyelliptic import pyelliptic
from bmconfigparser import BMConfigParser
from pyelliptic import OpenSSL from pyelliptic import OpenSSL
from pyelliptic import arithmetic as a from pyelliptic import arithmetic as a
from bmconfigparser import BMConfigParser
__all__ = ['encrypt', 'makeCryptor', 'pointMult', 'privToPub', 'sign', 'verify']
def makeCryptor(privkey): def makeCryptor(privkey):
"""Return a private `.pyelliptic.ECC` instance""" """Return a private `.pyelliptic.ECC` instance"""
private_key = a.changebase(privkey, 16, 256, minlen=32) private_key = a.changebase(privkey, 16, 256, minlen=32)
public_key = pointMult(private_key) public_key = pointMult(private_key)
privkey_bin = '\x02\xca\x00\x20' + private_key privkey_bin = b'\x02\xca\x00\x20' + private_key
pubkey_bin = '\x02\xca\x00\x20' + public_key[1:-32] + '\x00\x20' + public_key[-32:] pubkey_bin = (
b'\x02\xca\x00\x20' + public_key[1:-32] + b'\x00\x20' + public_key[-32:]
)
cryptor = pyelliptic.ECC( cryptor = pyelliptic.ECC(
curve='secp256k1', privkey=privkey_bin, pubkey=pubkey_bin) curve='secp256k1', privkey=privkey_bin, pubkey=pubkey_bin)
return cryptor return cryptor
@ -29,7 +34,7 @@ def makeCryptor(privkey):
def hexToPubkey(pubkey): def hexToPubkey(pubkey):
"""Convert a pubkey from hex to binary""" """Convert a pubkey from hex to binary"""
pubkey_raw = a.changebase(pubkey[2:], 16, 256, minlen=64) pubkey_raw = a.changebase(pubkey[2:], 16, 256, minlen=64)
pubkey_bin = '\x02\xca\x00 ' + pubkey_raw[:32] + '\x00 ' + pubkey_raw[32:] pubkey_bin = b'\x02\xca\x00 ' + pubkey_raw[:32] + b'\x00 ' + pubkey_raw[32:]
return pubkey_bin return pubkey_bin

View File

@ -91,16 +91,16 @@ def isBitSetWithinBitfield(fourByteString, n):
return x & 2**n != 0 return x & 2**n != 0
# ip addresses # IP addresses
def encodeHost(host): def encodeHost(host):
"""Encode a given host to be used in low-level socket operations""" """Encode a given host to be used in low-level socket operations"""
if host.find('.onion') > -1: if host.find('.onion') > -1:
return '\xfd\x87\xd8\x7e\xeb\x43' + base64.b32decode( return b'\xfd\x87\xd8\x7e\xeb\x43' + base64.b32decode(
host.split(".")[0], True) host.split(".")[0], True)
elif host.find(':') == -1: elif host.find(':') == -1:
return '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ return b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(host) socket.inet_aton(host)
return socket.inet_pton(socket.AF_INET6, host) return socket.inet_pton(socket.AF_INET6, host)
@ -147,10 +147,10 @@ def checkIPAddress(host, private=False):
Returns hostStandardFormat if it is a valid IP address, Returns hostStandardFormat if it is a valid IP address,
otherwise returns False otherwise returns False
""" """
if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': if host[0:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
hostStandardFormat = socket.inet_ntop(socket.AF_INET, host[12:]) hostStandardFormat = socket.inet_ntop(socket.AF_INET, host[12:])
return checkIPv4Address(host[12:], hostStandardFormat, private) return checkIPv4Address(host[12:], hostStandardFormat, private)
elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43': elif host[0:6] == b'\xfd\x87\xd8\x7e\xeb\x43':
# Onion, based on BMD/bitcoind # Onion, based on BMD/bitcoind
hostStandardFormat = base64.b32encode(host[6:]).lower() + ".onion" hostStandardFormat = base64.b32encode(host[6:]).lower() + ".onion"
if private: if private:
@ -161,7 +161,7 @@ def checkIPAddress(host, private=False):
hostStandardFormat = socket.inet_ntop(socket.AF_INET6, host) hostStandardFormat = socket.inet_ntop(socket.AF_INET6, host)
except ValueError: except ValueError:
return False return False
if hostStandardFormat == "": if len(hostStandardFormat) == 0:
# This can happen on Windows systems which are # This can happen on Windows systems which are
# not 64-bit compatible so let us drop the IPv6 address. # not 64-bit compatible so let us drop the IPv6 address.
return False return False
@ -173,23 +173,23 @@ def checkIPv4Address(host, hostStandardFormat, private=False):
Returns hostStandardFormat if it is an IPv4 address, Returns hostStandardFormat if it is an IPv4 address,
otherwise returns False otherwise returns False
""" """
if host[0] == '\x7F': # 127/8 if host[0:1] == b'\x7F': # 127/8
if not private: if not private:
logger.debug( logger.debug(
'Ignoring IP address in loopback range: %s', 'Ignoring IP address in loopback range: %s',
hostStandardFormat) hostStandardFormat)
return hostStandardFormat if private else False return hostStandardFormat if private else False
if host[0] == '\x0A': # 10/8 if host[0:1] == b'\x0A': # 10/8
if not private: if not private:
logger.debug( logger.debug(
'Ignoring IP address in private range: %s', hostStandardFormat) 'Ignoring IP address in private range: %s', hostStandardFormat)
return hostStandardFormat if private else False return hostStandardFormat if private else False
if host[0:2] == '\xC0\xA8': # 192.168/16 if host[0:2] == b'\xC0\xA8': # 192.168/16
if not private: if not private:
logger.debug( logger.debug(
'Ignoring IP address in private range: %s', hostStandardFormat) 'Ignoring IP address in private range: %s', hostStandardFormat)
return hostStandardFormat if private else False return hostStandardFormat if private else False
if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12 if host[0:2] >= b'\xAC\x10' and host[0:2] < b'\xAC\x20': # 172.16/12
if not private: if not private:
logger.debug( logger.debug(
'Ignoring IP address in private range: %s', hostStandardFormat) 'Ignoring IP address in private range: %s', hostStandardFormat)
@ -202,15 +202,19 @@ def checkIPv6Address(host, hostStandardFormat, private=False):
Returns hostStandardFormat if it is an IPv6 address, Returns hostStandardFormat if it is an IPv6 address,
otherwise returns False otherwise returns False
""" """
if host == ('\x00' * 15) + '\x01': if host == b'\x00' * 15 + b'\x01':
if not private: if not private:
logger.debug('Ignoring loopback address: %s', hostStandardFormat) logger.debug('Ignoring loopback address: %s', hostStandardFormat)
return False return False
if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80: try:
host = [ord(c) for c in host[:2]]
except TypeError: # python3 has ints already
pass
if host[0] == 0xfe and host[1] & 0xc0 == 0x80:
if not private: if not private:
logger.debug('Ignoring local address: %s', hostStandardFormat) logger.debug('Ignoring local address: %s', hostStandardFormat)
return hostStandardFormat if private else False return hostStandardFormat if private else False
if (ord(host[0]) & 0xfe) == 0xfc: if host[0] & 0xfe == 0xfc:
if not private: if not private:
logger.debug( logger.debug(
'Ignoring unique local address: %s', hostStandardFormat) 'Ignoring unique local address: %s', hostStandardFormat)

View File

@ -25,28 +25,31 @@ def inv(a, n):
def get_code_string(base): def get_code_string(base):
"""Returns string according to base value""" """Returns string according to base value"""
if base == 2: if base == 2:
return '01' return b'01'
elif base == 10: if base == 10:
return '0123456789' return b'0123456789'
elif base == 16: if base == 16:
return "0123456789abcdef" return b'0123456789abcdef'
elif base == 58: if base == 58:
return "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" return b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
elif base == 256: if base == 256:
return ''.join([chr(x) for x in range(256)]) try:
else: return b''.join([chr(x) for x in range(256)])
except TypeError:
return bytes([x for x in range(256)])
raise ValueError("Invalid base!") raise ValueError("Invalid base!")
def encode(val, base, minlen=0): def encode(val, base, minlen=0):
"""Returns the encoded string""" """Returns the encoded string"""
code_string = get_code_string(base) code_string = get_code_string(base)
result = "" result = b''
while val > 0: while val > 0:
val, i = divmod(val, base) val, i = divmod(val, base)
result = code_string[i] + result result = code_string[i:i + 1] + result
if len(result) < minlen: if len(result) < minlen:
result = code_string[0] * (minlen - len(result)) + result result = code_string[0:1] * (minlen - len(result)) + result
return result return result
@ -116,7 +119,7 @@ def hex_to_point(h):
def point_to_hex(p): def point_to_hex(p):
"""Converting point value to hexadecimal""" """Converting point value to hexadecimal"""
return '04' + encode(p[0], 16, 64) + encode(p[1], 16, 64) return b'04' + encode(p[0], 16, 64) + encode(p[1], 16, 64)
def multiply(privkey, pubkey): def multiply(privkey, pubkey):

View File

View File

@ -0,0 +1,84 @@
"""
Test the arithmetic functions
"""
from binascii import unhexlify
import unittest
try:
from pyelliptic import arithmetic
except ImportError:
from pybitmessage.pyelliptic import arithmetic
# These keys are from addresses test script
sample_pubsigningkey = (
b'044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d'
b'16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce')
sample_pubencryptionkey = (
b'044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c'
b'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9')
sample_privsigningkey = \
b'93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665'
sample_privencryptionkey = \
b'4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a'
sample_factor = \
66858749573256452658262553961707680376751171096153613379801854825275240965733
# G * sample_factor
sample_point = (
33567437183004486938355437500683826356288335339807546987348409590129959362313,
94730058721143827257669456336351159718085716196507891067256111928318063085006
)
class TestArithmetic(unittest.TestCase):
"""Test arithmetic functions"""
def test_base10_multiply(self):
"""Test arithmetic.base10_multiply"""
self.assertEqual(
sample_point,
arithmetic.base10_multiply(arithmetic.G, sample_factor))
def test_decode(self):
"""Decode sample privsigningkey from hex to int and compare to factor"""
self.assertEqual(
arithmetic.decode(sample_privsigningkey, 16), sample_factor)
def test_encode(self):
"""Encode sample factor into hex and compare to privsigningkey"""
self.assertEqual(
arithmetic.encode(sample_factor, 16), sample_privsigningkey)
def test_changebase(self):
"""Check the results of changebase()"""
self.assertEqual(
arithmetic.changebase(sample_privsigningkey, 16, 256, minlen=32),
unhexlify(sample_privsigningkey))
self.assertEqual(
arithmetic.changebase(sample_pubsigningkey, 16, 256, minlen=64),
unhexlify(sample_pubsigningkey))
self.assertEqual(
32, # padding
len(arithmetic.changebase(sample_privsigningkey[:5], 16, 256, 32)))
def test_hex_to_point(self):
"""Check that sample_pubsigningkey is sample_point encoded in hex"""
self.assertEqual(
arithmetic.hex_to_point(sample_pubsigningkey), sample_point)
def test_point_to_hex(self):
"""Check that sample_point is sample_pubsigningkey decoded from hex"""
self.assertEqual(
arithmetic.point_to_hex(sample_point), sample_pubsigningkey)
def test_privtopub(self):
"""Generate public keys and check the result"""
self.assertEqual(
arithmetic.privtopub(sample_privsigningkey),
sample_pubsigningkey
)
self.assertEqual(
arithmetic.privtopub(sample_privencryptionkey),
sample_pubencryptionkey
)

View File

@ -5,6 +5,9 @@ import os
import unittest import unittest
from hashlib import sha256 from hashlib import sha256
try:
from pyelliptic import ECCBlind, ECCBlindChain, OpenSSL
except ImportError:
from pybitmessage.pyelliptic import ECCBlind, ECCBlindChain, OpenSSL from pybitmessage.pyelliptic import ECCBlind, ECCBlindChain, OpenSSL
# pylint: disable=protected-access # pylint: disable=protected-access

View File

@ -3,7 +3,10 @@ Test if OpenSSL is working correctly
""" """
import unittest import unittest
from pybitmessage.pyelliptic.openssl import OpenSSL try:
from pyelliptic.openssl import OpenSSL
except ImportError:
from pybitmessage.pyelliptic import OpenSSL
try: try:
OpenSSL.BN_bn2binpad OpenSSL.BN_bn2binpad
@ -33,7 +36,7 @@ class TestOpenSSL(unittest.TestCase):
@unittest.skipUnless(have_pad, 'Skipping OpenSSL pad test') @unittest.skipUnless(have_pad, 'Skipping OpenSSL pad test')
def test_padding(self): def test_padding(self):
"""Test an alternatie implementation of bn2binpad""" """Test an alternative implementation of bn2binpad"""
ctx = OpenSSL.BN_CTX_new() ctx = OpenSSL.BN_CTX_new()
a = OpenSSL.BN_new() a = OpenSSL.BN_new()

31
src/tests/samples.py Normal file
View File

@ -0,0 +1,31 @@
"""Various sample data"""
from binascii import unhexlify
# These keys are from addresses test script
sample_pubsigningkey = unhexlify(
'044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d'
'16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce')
sample_pubencryptionkey = unhexlify(
'044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c'
'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9')
sample_privsigningkey = \
b'93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665'
sample_privencryptionkey = \
b'4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a'
sample_ripe = b'003cd097eb7f35c87b5dc8b4538c22cb55312a9f'
# stream: 1, version: 2
sample_address = 'BM-onkVu1KKL2UaUss5Upg9vXmqd3esTmV79'
sample_factor = 66858749573256452658262553961707680376751171096153613379801854825275240965733
# G * sample_factor
sample_point = (
33567437183004486938355437500683826356288335339807546987348409590129959362313,
94730058721143827257669456336351159718085716196507891067256111928318063085006
)
sample_seed = 'TIGER, tiger, burning bright. In the forests of the night'
# Deterministic addresses with stream 1 and versions 3, 4
sample_deterministic_addr3 = 'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN'
sample_deterministic_addr4 = 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'

View File

@ -0,0 +1,35 @@
import unittest
from binascii import unhexlify
from pybitmessage import addresses
from .samples import sample_address, sample_ripe
class TestAddresses(unittest.TestCase):
"""Test addresses manipulations"""
def test_decode(self):
"""Decode some well known addresses and check the result"""
self.assertEqual(
addresses.decodeAddress(sample_address),
('success', 2, 1, unhexlify(sample_ripe)))
status, version, stream, ripe1 = addresses.decodeAddress(
'2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
self.assertEqual(status, 'success')
self.assertEqual(stream, 1)
self.assertEqual(version, 4)
status, version, stream, ripe2 = addresses.decodeAddress(
'2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN')
self.assertEqual(status, 'success')
self.assertEqual(stream, 1)
self.assertEqual(version, 3)
self.assertEqual(ripe1, ripe2)
def test_encode(self):
"""Encode sample ripe and compare the result to sample address"""
self.assertEqual(
sample_address,
addresses.encodeAddress(2, 1, unhexlify(sample_ripe)))

View File

@ -10,6 +10,9 @@ from six.moves import xmlrpc_client # nosec
import psutil import psutil
from .samples import (
sample_seed, sample_deterministic_addr3, sample_deterministic_addr4)
from .test_process import TestProcessProto from .test_process import TestProcessProto
@ -58,9 +61,7 @@ class TestAPIShutdown(TestAPIProto):
class TestAPI(TestAPIProto): class TestAPI(TestAPIProto):
"""Main API test case""" """Main API test case"""
_seed = base64.encodestring( _seed = base64.encodestring(sample_seed)
'TIGER, tiger, burning bright. In the forests of the night'
)
def _add_random_address(self, label): def _add_random_address(self, label):
return self.api.createRandomAddress(base64.encodestring(label)) return self.api.createRandomAddress(base64.encodestring(label))
@ -108,7 +109,7 @@ class TestAPI(TestAPIProto):
def test_decode_address(self): def test_decode_address(self):
"""Checking the return of API command 'decodeAddress'""" """Checking the return of API command 'decodeAddress'"""
result = json.loads( result = json.loads(
self.api.decodeAddress('BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')) self.api.decodeAddress(sample_deterministic_addr4))
self.assertEqual(result.get('status'), 'success') self.assertEqual(result.get('status'), 'success')
self.assertEqual(result['addressVersion'], 4) self.assertEqual(result['addressVersion'], 4)
self.assertEqual(result['streamNumber'], 1) self.assertEqual(result['streamNumber'], 1)
@ -117,10 +118,10 @@ class TestAPI(TestAPIProto):
"""Test creation of deterministic addresses""" """Test creation of deterministic addresses"""
self.assertEqual( self.assertEqual(
self.api.getDeterministicAddress(self._seed, 4, 1), self.api.getDeterministicAddress(self._seed, 4, 1),
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK') sample_deterministic_addr4)
self.assertEqual( self.assertEqual(
self.api.getDeterministicAddress(self._seed, 3, 1), self.api.getDeterministicAddress(self._seed, 3, 1),
'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN') sample_deterministic_addr3)
self.assertRegexpMatches( self.assertRegexpMatches(
self.api.getDeterministicAddress(self._seed, 2, 1), self.api.getDeterministicAddress(self._seed, 2, 1),
r'^API Error 0002:') r'^API Error 0002:')
@ -150,7 +151,7 @@ class TestAPI(TestAPIProto):
self.api.createDeterministicAddresses(self._seed, 2, 4) self.api.createDeterministicAddresses(self._seed, 2, 4)
)['addresses'] )['addresses']
self.assertEqual(len(addresses), 2) self.assertEqual(len(addresses), 2)
self.assertEqual(addresses[0], 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK') self.assertEqual(addresses[0], sample_deterministic_addr4)
for addr in addresses: for addr in addresses:
self.assertEqual(self.api.deleteAddress(addr), 'success') self.assertEqual(self.api.deleteAddress(addr), 'success')
@ -172,19 +173,18 @@ class TestAPI(TestAPIProto):
) )
# Add known address # Add known address
self.api.addAddressBookEntry( self.api.addAddressBookEntry(
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK', sample_deterministic_addr4,
base64.encodestring('tiger_4') base64.encodestring('tiger_4')
) )
# Check addressbook entry # Check addressbook entry
entries = json.loads( entries = json.loads(
self.api.listAddressBookEntries()).get('addresses')[0] self.api.listAddressBookEntries()).get('addresses')[0]
self.assertEqual( self.assertEqual(
entries['address'], 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK') entries['address'], sample_deterministic_addr4)
self.assertEqual( self.assertEqual(
base64.decodestring(entries['label']), 'tiger_4') base64.decodestring(entries['label']), 'tiger_4')
# Remove known address # Remove known address
self.api.deleteAddressBookEntry( self.api.deleteAddressBookEntry(sample_deterministic_addr4)
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
# Addressbook should be empty again # Addressbook should be empty again
self.assertEqual( self.assertEqual(
json.loads(self.api.listAddressBookEntries()).get('addresses'), json.loads(self.api.listAddressBookEntries()).get('addresses'),
@ -218,7 +218,7 @@ class TestAPI(TestAPIProto):
msg = base64.encodestring('test message') msg = base64.encodestring('test message')
msg_subject = base64.encodestring('test_subject') msg_subject = base64.encodestring('test_subject')
ackdata = self.api.sendMessage( ackdata = self.api.sendMessage(
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK', addr, msg_subject, msg) sample_deterministic_addr4, addr, msg_subject, msg)
try: try:
# Check ackdata and message status # Check ackdata and message status
int(ackdata, 16) int(ackdata, 16)
@ -332,19 +332,12 @@ class TestAPI(TestAPIProto):
"""Testing chan creation/joining""" """Testing chan creation/joining"""
# Create chan with known address # Create chan with known address
self.assertEqual( self.assertEqual(
self.api.createChan(self._seed), self.api.createChan(self._seed), sample_deterministic_addr4)
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'
)
# cleanup # cleanup
self.assertEqual( self.assertEqual(
self.api.leaveChan('BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'), self.api.leaveChan(sample_deterministic_addr4), 'success')
'success'
)
# Join chan with addresses of version 3 or 4 # Join chan with addresses of version 3 or 4
for addr in ( for addr in (sample_deterministic_addr4, sample_deterministic_addr3):
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK',
'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN'
):
self.assertEqual(self.api.joinChan(self._seed, addr), 'success') self.assertEqual(self.api.joinChan(self._seed, addr), 'success')
self.assertEqual(self.api.leaveChan(addr), 'success') self.assertEqual(self.api.leaveChan(addr), 'success')
# Joining with wrong address should fail # Joining with wrong address should fail

View File

@ -5,9 +5,9 @@ Test the alternatives for crypto primitives
import hashlib import hashlib
import unittest import unittest
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from binascii import hexlify, unhexlify from binascii import hexlify
from pybitmessage.pyelliptic import arithmetic from pybitmessage import highlevelcrypto
try: try:
@ -15,29 +15,12 @@ try:
except ImportError: except ImportError:
RIPEMD = None RIPEMD = None
from .samples import (
# These keys are from addresses test script sample_pubsigningkey, sample_pubencryptionkey,
sample_pubsigningkey = unhexlify( sample_privsigningkey, sample_privencryptionkey, sample_ripe
'044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d'
'16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce')
sample_pubencryptionkey = unhexlify(
'044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c'
'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9')
sample_privsigningkey = \
'93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665'
sample_privencryptionkey = \
'4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a'
sample_ripe = b'003cd097eb7f35c87b5dc8b4538c22cb55312a9f'
# stream: 1, version: 2
sample_address = 'BM-onkVu1KKL2UaUss5Upg9vXmqd3esTmV79'
sample_factor = 66858749573256452658262553961707680376751171096153613379801854825275240965733
# G * sample_factor
sample_point = (
33567437183004486938355437500683826356288335339807546987348409590129959362313,
94730058721143827257669456336351159718085716196507891067256111928318063085006
) )
_sha = hashlib.new('sha512') _sha = hashlib.new('sha512')
_sha.update(sample_pubsigningkey + sample_pubencryptionkey) _sha.update(sample_pubsigningkey + sample_pubencryptionkey)
@ -76,38 +59,16 @@ class TestCrypto(RIPEMD160TestCase, unittest.TestCase):
return RIPEMD.RIPEMD160Hash(data).digest() return RIPEMD.RIPEMD160Hash(data).digest()
class TestAddresses(unittest.TestCase): class TestHighlevelcrypto(unittest.TestCase):
"""Test addresses manipulations""" """Test highlevelcrypto public functions"""
def test_base10_multiply(self):
"""Test arithmetic.base10_multiply"""
self.assertEqual(
sample_point,
arithmetic.base10_multiply(arithmetic.G, sample_factor))
def test_privtopub(self): def test_privtopub(self):
"""Generate public keys and check the result""" """Generate public keys and check the result"""
self.assertEqual( self.assertEqual(
arithmetic.privtopub(sample_privsigningkey).encode(), highlevelcrypto.privToPub(sample_privsigningkey),
hexlify(sample_pubsigningkey) hexlify(sample_pubsigningkey)
) )
self.assertEqual( self.assertEqual(
arithmetic.privtopub(sample_privencryptionkey).encode(), highlevelcrypto.privToPub(sample_privencryptionkey),
hexlify(sample_pubencryptionkey) hexlify(sample_pubencryptionkey)
) )
def test_address(self):
"""Create address and check the result"""
from pybitmessage import addresses
from pybitmessage.fallback import RIPEMD160Hash
sha = hashlib.new('sha512')
sha.update(sample_pubsigningkey + sample_pubencryptionkey)
ripe_hash = RIPEMD160Hash(sha.digest()).digest()
self.assertEqual(ripe_hash, unhexlify(sample_ripe))
self.assertEqual(
addresses.encodeAddress(2, 1, ripe_hash), sample_address)
self.assertEqual(
addresses.decodeAddress(sample_address),
('success', 2, 1, ripe_hash))

View File

@ -1,43 +0,0 @@
"""
Test for network group
"""
import unittest
from .common import skip_python3
skip_python3()
class TestNetworkGroup(unittest.TestCase):
"""
Test case for network group
"""
def test_network_group(self):
"""Test various types of network groups"""
from pybitmessage.protocol import network_group
test_ip = '1.2.3.4'
self.assertEqual('\x01\x02', network_group(test_ip))
test_ip = '127.0.0.1'
self.assertEqual('IPv4', network_group(test_ip))
test_ip = '0102:0304:0506:0708:090A:0B0C:0D0E:0F10'
self.assertEqual(
'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C',
network_group(test_ip))
test_ip = 'bootstrap8444.bitmessage.org'
self.assertEqual(
'bootstrap8444.bitmessage.org',
network_group(test_ip))
test_ip = 'quzwelsuziwqgpt2.onion'
self.assertEqual(
test_ip,
network_group(test_ip))
test_ip = None
self.assertEqual(
None,
network_group(test_ip))

View File

@ -2,25 +2,102 @@
Tests for common protocol functions Tests for common protocol functions
""" """
import sys
import unittest import unittest
from .common import skip_python3 from pybitmessage import protocol, state
skip_python3()
class TestProtocol(unittest.TestCase): class TestProtocol(unittest.TestCase):
"""Main protocol test case""" """Main protocol test case"""
def test_checkIPv4Address(self):
"""Check the results of protocol.checkIPv4Address()"""
token = 'HELLO'
# checking protocol.encodeHost()[12:]
self.assertEqual( # 127.0.0.1
token, protocol.checkIPv4Address(b'\x7f\x00\x00\x01', token, True))
self.assertFalse(
protocol.checkIPv4Address(b'\x7f\x00\x00\x01', token))
self.assertEqual( # 10.42.43.1
token, protocol.checkIPv4Address(b'\n*+\x01', token, True))
self.assertFalse(
protocol.checkIPv4Address(b'\n*+\x01', token, False))
self.assertEqual( # 192.168.0.254
token, protocol.checkIPv4Address(b'\xc0\xa8\x00\xfe', token, True))
self.assertEqual( # 172.31.255.254
token, protocol.checkIPv4Address(b'\xac\x1f\xff\xfe', token, True))
# self.assertEqual( # 169.254.1.1
# token, protocol.checkIPv4Address(b'\xa9\xfe\x01\x01', token, True))
# self.assertEqual( # 254.128.1.1
# token, protocol.checkIPv4Address(b'\xfe\x80\x01\x01', token, True))
self.assertFalse( # 8.8.8.8
protocol.checkIPv4Address(b'\x08\x08\x08\x08', token, True))
def test_checkIPv6Address(self):
"""Check the results of protocol.checkIPv6Address()"""
test_ip = '2001:db8::ff00:42:8329'
self.assertEqual(
'test', protocol.checkIPv6Address(
protocol.encodeHost(test_ip), 'test'))
self.assertFalse(
protocol.checkIPv6Address(
protocol.encodeHost(test_ip), 'test', True))
for test_ip in ('fe80::200:5aee:feaa:20a2', 'fdf8:f53b:82e4::53'):
self.assertEqual(
'test', protocol.checkIPv6Address(
protocol.encodeHost(test_ip), 'test', True))
self.assertFalse(
protocol.checkIPv6Address(
protocol.encodeHost(test_ip), 'test'))
def test_check_local(self): def test_check_local(self):
"""Check the logic of TCPConnection.local""" """Check the logic of TCPConnection.local"""
from pybitmessage import protocol, state
self.assertTrue( self.assertTrue(
protocol.checkIPAddress(protocol.encodeHost('127.0.0.1'), True)) protocol.checkIPAddress(protocol.encodeHost('127.0.0.1'), True))
self.assertTrue( self.assertTrue(
protocol.checkIPAddress(protocol.encodeHost('192.168.0.1'), True)) protocol.checkIPAddress(protocol.encodeHost('192.168.0.1'), True))
self.assertTrue(
protocol.checkIPAddress(protocol.encodeHost('10.42.43.1'), True))
self.assertTrue(
protocol.checkIPAddress(protocol.encodeHost('172.31.255.2'), True))
self.assertFalse(protocol.checkIPAddress(
protocol.encodeHost('2001:db8::ff00:42:8329'), True))
globalhost = protocol.encodeHost('8.8.8.8')
self.assertFalse(protocol.checkIPAddress(globalhost, True))
self.assertEqual(protocol.checkIPAddress(globalhost), '8.8.8.8')
@unittest.skipIf(
sys.hexversion >= 0x3000000, 'this is still not working with python3')
def test_check_local_socks(self):
"""The SOCKS part of the local check"""
self.assertTrue( self.assertTrue(
not protocol.checkSocksIP('127.0.0.1') not protocol.checkSocksIP('127.0.0.1')
or state.socksIP) or state.socksIP)
def test_network_group(self):
"""Test various types of network groups"""
test_ip = '1.2.3.4'
self.assertEqual(b'\x01\x02', protocol.network_group(test_ip))
test_ip = '127.0.0.1'
self.assertEqual('IPv4', protocol.network_group(test_ip))
self.assertEqual(
protocol.network_group('8.8.8.8'),
protocol.network_group('8.8.4.4'))
self.assertNotEqual(
protocol.network_group('1.1.1.1'),
protocol.network_group('8.8.8.8'))
test_ip = '0102:0304:0506:0708:090A:0B0C:0D0E:0F10'
self.assertEqual(
b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C',
protocol.network_group(test_ip))
for test_ip in (
'bootstrap8444.bitmessage.org', 'quzwelsuziwqgpt2.onion', None):
self.assertEqual(
test_ip, protocol.network_group(test_ip))

View File

@ -14,7 +14,10 @@ def unittest_discover():
# randomize the order of tests in test cases # randomize the order of tests in test cases
loader.sortTestMethodsUsing = lambda a, b: random.randint(-1, 1) loader.sortTestMethodsUsing = lambda a, b: random.randint(-1, 1)
# pybitmessage symlink may disappear on Windows # pybitmessage symlink may disappear on Windows
return loader.discover('src.tests') testsuite = loader.discover('src.tests')
testsuite.addTests([loader.discover('src.pyelliptic')])
return testsuite
if __name__ == "__main__": if __name__ == "__main__":