Refactoring crypto base changes #1806
|
@ -1,8 +1,7 @@
|
|||
"""
|
||||
Operations with addresses
|
||||
"""
|
||||
# pylint: disable=redefined-outer-name,inconsistent-return-statements
|
||||
import sys
|
||||
# pylint: disable=inconsistent-return-statements
|
||||
import hashlib
|
||||
import logging
|
||||
from binascii import hexlify, unhexlify
|
||||
|
@ -151,12 +150,6 @@ def encodeAddress(version, stream, ripe):
|
|||
' a given ripe hash was not 20.'
|
||||
)
|
||||
|
||||
if isinstance(ripe, str):
|
||||
if ripe[:2] == '\x00\x00':
|
||||
ripe = ripe[2:]
|
||||
elif ripe[:1] == '\x00':
|
||||
ripe = ripe[1:]
|
||||
else:
|
||||
if ripe[:2] == b'\x00\x00':
|
||||
ripe = ripe[2:]
|
||||
elif ripe[:1] == b'\x00':
|
||||
|
@ -166,14 +159,8 @@ def encodeAddress(version, stream, ripe):
|
|||
raise Exception(
|
||||
'Programming error in encodeAddress: The length of'
|
||||
' a given ripe hash was not 20.')
|
||||
ripe = ripe.lstrip('\x00')
|
||||
ripe = ripe.lstrip(b'\x00')
|
||||
|
||||
if sys.version_info[0] == 3:
|
||||
if isinstance(ripe, str):
|
||||
storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe.encode('utf-8')
|
||||
else:
|
||||
storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe
|
||||
else:
|
||||
storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe
|
||||
|
||||
# Generate the checksum
|
||||
|
@ -184,7 +171,10 @@ def encodeAddress(version, stream, ripe):
|
|||
sha.update(currentHash)
|
||||
checksum = sha.digest()[0:4]
|
||||
|
||||
# FIXME: encodeBase58 should take binary data, to reduce conversions
|
||||
# encodeBase58(storedBinaryData + checksum)
|
||||
asInt = int(hexlify(storedBinaryData) + hexlify(checksum), 16)
|
||||
# should it be str? If yes, it should be everywhere in the code
|
||||
return 'BM-' + encodeBase58(asInt)
|
||||
|
||||
|
||||
|
|
|
@ -10,17 +10,22 @@ High level cryptographic functions based on `.pyelliptic` OpenSSL bindings.
|
|||
from binascii import hexlify
|
||||
|
||||
import pyelliptic
|
||||
from bmconfigparser import BMConfigParser
|
||||
from pyelliptic import OpenSSL
|
||||
from pyelliptic import arithmetic as a
|
||||
|
||||
from bmconfigparser import BMConfigParser
|
||||
|
||||
__all__ = ['encrypt', 'makeCryptor', 'pointMult', 'privToPub', 'sign', 'verify']
|
||||
|
||||
|
||||
def makeCryptor(privkey):
|
||||
"""Return a private `.pyelliptic.ECC` instance"""
|
||||
private_key = a.changebase(privkey, 16, 256, minlen=32)
|
||||
public_key = pointMult(private_key)
|
||||
privkey_bin = '\x02\xca\x00\x20' + private_key
|
||||
pubkey_bin = '\x02\xca\x00\x20' + public_key[1:-32] + '\x00\x20' + public_key[-32:]
|
||||
privkey_bin = b'\x02\xca\x00\x20' + private_key
|
||||
pubkey_bin = (
|
||||
b'\x02\xca\x00\x20' + public_key[1:-32] + b'\x00\x20' + public_key[-32:]
|
||||
)
|
||||
cryptor = pyelliptic.ECC(
|
||||
curve='secp256k1', privkey=privkey_bin, pubkey=pubkey_bin)
|
||||
return cryptor
|
||||
|
@ -29,7 +34,7 @@ def makeCryptor(privkey):
|
|||
def hexToPubkey(pubkey):
|
||||
"""Convert a pubkey from hex to binary"""
|
||||
pubkey_raw = a.changebase(pubkey[2:], 16, 256, minlen=64)
|
||||
pubkey_bin = '\x02\xca\x00 ' + pubkey_raw[:32] + '\x00 ' + pubkey_raw[32:]
|
||||
pubkey_bin = b'\x02\xca\x00 ' + pubkey_raw[:32] + b'\x00 ' + pubkey_raw[32:]
|
||||
return pubkey_bin
|
||||
|
||||
|
||||
|
|
|
@ -91,16 +91,16 @@ def isBitSetWithinBitfield(fourByteString, n):
|
|||
return x & 2**n != 0
|
||||
|
||||
|
||||
# ip addresses
|
||||
# IP addresses
|
||||
|
||||
|
||||
def encodeHost(host):
|
||||
"""Encode a given host to be used in low-level socket operations"""
|
||||
if host.find('.onion') > -1:
|
||||
return '\xfd\x87\xd8\x7e\xeb\x43' + base64.b32decode(
|
||||
return b'\xfd\x87\xd8\x7e\xeb\x43' + base64.b32decode(
|
||||
host.split(".")[0], True)
|
||||
elif host.find(':') == -1:
|
||||
return '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
|
||||
return b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
|
||||
socket.inet_aton(host)
|
||||
return socket.inet_pton(socket.AF_INET6, host)
|
||||
|
||||
|
@ -147,10 +147,10 @@ def checkIPAddress(host, private=False):
|
|||
Returns hostStandardFormat if it is a valid IP address,
|
||||
otherwise returns False
|
||||
"""
|
||||
if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||
if host[0:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||
hostStandardFormat = socket.inet_ntop(socket.AF_INET, host[12:])
|
||||
return checkIPv4Address(host[12:], hostStandardFormat, private)
|
||||
elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43':
|
||||
elif host[0:6] == b'\xfd\x87\xd8\x7e\xeb\x43':
|
||||
# Onion, based on BMD/bitcoind
|
||||
hostStandardFormat = base64.b32encode(host[6:]).lower() + ".onion"
|
||||
if private:
|
||||
|
@ -161,7 +161,7 @@ def checkIPAddress(host, private=False):
|
|||
hostStandardFormat = socket.inet_ntop(socket.AF_INET6, host)
|
||||
except ValueError:
|
||||
return False
|
||||
if hostStandardFormat == "":
|
||||
if len(hostStandardFormat) == 0:
|
||||
# This can happen on Windows systems which are
|
||||
# not 64-bit compatible so let us drop the IPv6 address.
|
||||
return False
|
||||
|
@ -173,23 +173,23 @@ def checkIPv4Address(host, hostStandardFormat, private=False):
|
|||
Returns hostStandardFormat if it is an IPv4 address,
|
||||
otherwise returns False
|
||||
"""
|
||||
if host[0] == '\x7F': # 127/8
|
||||
if host[0:1] == b'\x7F': # 127/8
|
||||
if not private:
|
||||
logger.debug(
|
||||
'Ignoring IP address in loopback range: %s',
|
||||
hostStandardFormat)
|
||||
return hostStandardFormat if private else False
|
||||
if host[0] == '\x0A': # 10/8
|
||||
if host[0:1] == b'\x0A': # 10/8
|
||||
if not private:
|
||||
logger.debug(
|
||||
'Ignoring IP address in private range: %s', hostStandardFormat)
|
||||
return hostStandardFormat if private else False
|
||||
if host[0:2] == '\xC0\xA8': # 192.168/16
|
||||
if host[0:2] == b'\xC0\xA8': # 192.168/16
|
||||
if not private:
|
||||
logger.debug(
|
||||
'Ignoring IP address in private range: %s', hostStandardFormat)
|
||||
return hostStandardFormat if private else False
|
||||
if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12
|
||||
if host[0:2] >= b'\xAC\x10' and host[0:2] < b'\xAC\x20': # 172.16/12
|
||||
if not private:
|
||||
logger.debug(
|
||||
'Ignoring IP address in private range: %s', hostStandardFormat)
|
||||
|
@ -202,15 +202,19 @@ def checkIPv6Address(host, hostStandardFormat, private=False):
|
|||
Returns hostStandardFormat if it is an IPv6 address,
|
||||
otherwise returns False
|
||||
"""
|
||||
if host == ('\x00' * 15) + '\x01':
|
||||
if host == b'\x00' * 15 + b'\x01':
|
||||
if not private:
|
||||
logger.debug('Ignoring loopback address: %s', hostStandardFormat)
|
||||
return False
|
||||
if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80:
|
||||
try:
|
||||
host = [ord(c) for c in host[:2]]
|
||||
except TypeError: # python3 has ints already
|
||||
pass
|
||||
if host[0] == 0xfe and host[1] & 0xc0 == 0x80:
|
||||
if not private:
|
||||
logger.debug('Ignoring local address: %s', hostStandardFormat)
|
||||
return hostStandardFormat if private else False
|
||||
if (ord(host[0]) & 0xfe) == 0xfc:
|
||||
if host[0] & 0xfe == 0xfc:
|
||||
if not private:
|
||||
logger.debug(
|
||||
'Ignoring unique local address: %s', hostStandardFormat)
|
||||
|
|
|
@ -25,28 +25,31 @@ def inv(a, n):
|
|||
def get_code_string(base):
|
||||
"""Returns string according to base value"""
|
||||
if base == 2:
|
||||
return '01'
|
||||
elif base == 10:
|
||||
return '0123456789'
|
||||
elif base == 16:
|
||||
return "0123456789abcdef"
|
||||
elif base == 58:
|
||||
return "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
|
||||
elif base == 256:
|
||||
return ''.join([chr(x) for x in range(256)])
|
||||
else:
|
||||
return b'01'
|
||||
if base == 10:
|
||||
return b'0123456789'
|
||||
if base == 16:
|
||||
return b'0123456789abcdef'
|
||||
if base == 58:
|
||||
return b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
|
||||
if base == 256:
|
||||
try:
|
||||
return b''.join([chr(x) for x in range(256)])
|
||||
except TypeError:
|
||||
return bytes([x for x in range(256)])
|
||||
|
||||
raise ValueError("Invalid base!")
|
||||
|
||||
|
||||
def encode(val, base, minlen=0):
|
||||
"""Returns the encoded string"""
|
||||
code_string = get_code_string(base)
|
||||
result = ""
|
||||
result = b''
|
||||
while val > 0:
|
||||
val, i = divmod(val, base)
|
||||
result = code_string[i] + result
|
||||
result = code_string[i:i + 1] + result
|
||||
if len(result) < minlen:
|
||||
result = code_string[0] * (minlen - len(result)) + result
|
||||
result = code_string[0:1] * (minlen - len(result)) + result
|
||||
return result
|
||||
|
||||
|
||||
|
@ -116,7 +119,7 @@ def hex_to_point(h):
|
|||
|
||||
def point_to_hex(p):
|
||||
"""Converting point value to hexadecimal"""
|
||||
return '04' + encode(p[0], 16, 64) + encode(p[1], 16, 64)
|
||||
return b'04' + encode(p[0], 16, 64) + encode(p[1], 16, 64)
|
||||
|
||||
|
||||
def multiply(privkey, pubkey):
|
||||
|
|
0
src/pyelliptic/tests/__init__.py
Normal file
0
src/pyelliptic/tests/__init__.py
Normal file
84
src/pyelliptic/tests/test_arithmetic.py
Normal file
84
src/pyelliptic/tests/test_arithmetic.py
Normal file
|
@ -0,0 +1,84 @@
|
|||
"""
|
||||
Test the arithmetic functions
|
||||
"""
|
||||
|
||||
from binascii import unhexlify
|
||||
import unittest
|
||||
|
||||
try:
|
||||
from pyelliptic import arithmetic
|
||||
except ImportError:
|
||||
from pybitmessage.pyelliptic import arithmetic
|
||||
|
||||
|
||||
# These keys are from addresses test script
|
||||
sample_pubsigningkey = (
|
||||
b'044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d'
|
||||
b'16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce')
|
||||
sample_pubencryptionkey = (
|
||||
b'044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c'
|
||||
b'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9')
|
||||
sample_privsigningkey = \
|
||||
b'93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665'
|
||||
sample_privencryptionkey = \
|
||||
b'4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a'
|
||||
|
||||
sample_factor = \
|
||||
66858749573256452658262553961707680376751171096153613379801854825275240965733
|
||||
# G * sample_factor
|
||||
sample_point = (
|
||||
33567437183004486938355437500683826356288335339807546987348409590129959362313,
|
||||
94730058721143827257669456336351159718085716196507891067256111928318063085006
|
||||
)
|
||||
|
||||
|
||||
class TestArithmetic(unittest.TestCase):
|
||||
"""Test arithmetic functions"""
|
||||
def test_base10_multiply(self):
|
||||
"""Test arithmetic.base10_multiply"""
|
||||
self.assertEqual(
|
||||
sample_point,
|
||||
arithmetic.base10_multiply(arithmetic.G, sample_factor))
|
||||
|
||||
def test_decode(self):
|
||||
"""Decode sample privsigningkey from hex to int and compare to factor"""
|
||||
self.assertEqual(
|
||||
arithmetic.decode(sample_privsigningkey, 16), sample_factor)
|
||||
|
||||
def test_encode(self):
|
||||
"""Encode sample factor into hex and compare to privsigningkey"""
|
||||
self.assertEqual(
|
||||
arithmetic.encode(sample_factor, 16), sample_privsigningkey)
|
||||
|
||||
def test_changebase(self):
|
||||
"""Check the results of changebase()"""
|
||||
self.assertEqual(
|
||||
arithmetic.changebase(sample_privsigningkey, 16, 256, minlen=32),
|
||||
unhexlify(sample_privsigningkey))
|
||||
self.assertEqual(
|
||||
arithmetic.changebase(sample_pubsigningkey, 16, 256, minlen=64),
|
||||
unhexlify(sample_pubsigningkey))
|
||||
self.assertEqual(
|
||||
32, # padding
|
||||
len(arithmetic.changebase(sample_privsigningkey[:5], 16, 256, 32)))
|
||||
|
||||
def test_hex_to_point(self):
|
||||
"""Check that sample_pubsigningkey is sample_point encoded in hex"""
|
||||
self.assertEqual(
|
||||
arithmetic.hex_to_point(sample_pubsigningkey), sample_point)
|
||||
|
||||
def test_point_to_hex(self):
|
||||
"""Check that sample_point is sample_pubsigningkey decoded from hex"""
|
||||
self.assertEqual(
|
||||
arithmetic.point_to_hex(sample_point), sample_pubsigningkey)
|
||||
|
||||
def test_privtopub(self):
|
||||
"""Generate public keys and check the result"""
|
||||
self.assertEqual(
|
||||
arithmetic.privtopub(sample_privsigningkey),
|
||||
sample_pubsigningkey
|
||||
)
|
||||
self.assertEqual(
|
||||
arithmetic.privtopub(sample_privencryptionkey),
|
||||
sample_pubencryptionkey
|
||||
)
|
|
@ -5,7 +5,10 @@ import os
|
|||
import unittest
|
||||
from hashlib import sha256
|
||||
|
||||
from pybitmessage.pyelliptic import ECCBlind, ECCBlindChain, OpenSSL
|
||||
try:
|
||||
from pyelliptic import ECCBlind, ECCBlindChain, OpenSSL
|
||||
except ImportError:
|
||||
from pybitmessage.pyelliptic import ECCBlind, ECCBlindChain, OpenSSL
|
||||
|
||||
# pylint: disable=protected-access
|
||||
|
|
@ -3,7 +3,10 @@ Test if OpenSSL is working correctly
|
|||
"""
|
||||
import unittest
|
||||
|
||||
from pybitmessage.pyelliptic.openssl import OpenSSL
|
||||
try:
|
||||
from pyelliptic.openssl import OpenSSL
|
||||
except ImportError:
|
||||
from pybitmessage.pyelliptic import OpenSSL
|
||||
|
||||
try:
|
||||
OpenSSL.BN_bn2binpad
|
||||
|
@ -33,7 +36,7 @@ class TestOpenSSL(unittest.TestCase):
|
|||
|
||||
@unittest.skipUnless(have_pad, 'Skipping OpenSSL pad test')
|
||||
def test_padding(self):
|
||||
"""Test an alternatie implementation of bn2binpad"""
|
||||
"""Test an alternative implementation of bn2binpad"""
|
||||
|
||||
ctx = OpenSSL.BN_CTX_new()
|
||||
a = OpenSSL.BN_new()
|
31
src/tests/samples.py
Normal file
31
src/tests/samples.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
"""Various sample data"""
|
||||
|
||||
from binascii import unhexlify
|
||||
|
||||
|
||||
# These keys are from addresses test script
|
||||
sample_pubsigningkey = unhexlify(
|
||||
'044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d'
|
||||
'16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce')
|
||||
sample_pubencryptionkey = unhexlify(
|
||||
'044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c'
|
||||
'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9')
|
||||
sample_privsigningkey = \
|
||||
b'93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665'
|
||||
sample_privencryptionkey = \
|
||||
b'4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a'
|
||||
sample_ripe = b'003cd097eb7f35c87b5dc8b4538c22cb55312a9f'
|
||||
# stream: 1, version: 2
|
||||
sample_address = 'BM-onkVu1KKL2UaUss5Upg9vXmqd3esTmV79'
|
||||
|
||||
sample_factor = 66858749573256452658262553961707680376751171096153613379801854825275240965733
|
||||
# G * sample_factor
|
||||
sample_point = (
|
||||
33567437183004486938355437500683826356288335339807546987348409590129959362313,
|
||||
94730058721143827257669456336351159718085716196507891067256111928318063085006
|
||||
)
|
||||
|
||||
sample_seed = 'TIGER, tiger, burning bright. In the forests of the night'
|
||||
# Deterministic addresses with stream 1 and versions 3, 4
|
||||
sample_deterministic_addr3 = 'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN'
|
||||
sample_deterministic_addr4 = 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'
|
35
src/tests/test_addresses.py
Normal file
35
src/tests/test_addresses.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
|
||||
import unittest
|
||||
from binascii import unhexlify
|
||||
|
||||
from pybitmessage import addresses
|
||||
|
||||
from .samples import sample_address, sample_ripe
|
||||
|
||||
|
||||
class TestAddresses(unittest.TestCase):
|
||||
"""Test addresses manipulations"""
|
||||
|
||||
def test_decode(self):
|
||||
"""Decode some well known addresses and check the result"""
|
||||
self.assertEqual(
|
||||
addresses.decodeAddress(sample_address),
|
||||
('success', 2, 1, unhexlify(sample_ripe)))
|
||||
|
||||
status, version, stream, ripe1 = addresses.decodeAddress(
|
||||
'2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
|
||||
self.assertEqual(status, 'success')
|
||||
self.assertEqual(stream, 1)
|
||||
self.assertEqual(version, 4)
|
||||
status, version, stream, ripe2 = addresses.decodeAddress(
|
||||
'2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN')
|
||||
self.assertEqual(status, 'success')
|
||||
self.assertEqual(stream, 1)
|
||||
self.assertEqual(version, 3)
|
||||
self.assertEqual(ripe1, ripe2)
|
||||
|
||||
def test_encode(self):
|
||||
"""Encode sample ripe and compare the result to sample address"""
|
||||
self.assertEqual(
|
||||
sample_address,
|
||||
addresses.encodeAddress(2, 1, unhexlify(sample_ripe)))
|
|
@ -10,6 +10,9 @@ from six.moves import xmlrpc_client # nosec
|
|||
|
||||
import psutil
|
||||
|
||||
from .samples import (
|
||||
sample_seed, sample_deterministic_addr3, sample_deterministic_addr4)
|
||||
|
||||
from .test_process import TestProcessProto
|
||||
|
||||
|
||||
|
@ -58,9 +61,7 @@ class TestAPIShutdown(TestAPIProto):
|
|||
|
||||
class TestAPI(TestAPIProto):
|
||||
"""Main API test case"""
|
||||
_seed = base64.encodestring(
|
||||
'TIGER, tiger, burning bright. In the forests of the night'
|
||||
)
|
||||
_seed = base64.encodestring(sample_seed)
|
||||
|
||||
def _add_random_address(self, label):
|
||||
return self.api.createRandomAddress(base64.encodestring(label))
|
||||
|
@ -108,7 +109,7 @@ class TestAPI(TestAPIProto):
|
|||
def test_decode_address(self):
|
||||
"""Checking the return of API command 'decodeAddress'"""
|
||||
result = json.loads(
|
||||
self.api.decodeAddress('BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'))
|
||||
self.api.decodeAddress(sample_deterministic_addr4))
|
||||
self.assertEqual(result.get('status'), 'success')
|
||||
self.assertEqual(result['addressVersion'], 4)
|
||||
self.assertEqual(result['streamNumber'], 1)
|
||||
|
@ -117,10 +118,10 @@ class TestAPI(TestAPIProto):
|
|||
"""Test creation of deterministic addresses"""
|
||||
self.assertEqual(
|
||||
self.api.getDeterministicAddress(self._seed, 4, 1),
|
||||
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
|
||||
sample_deterministic_addr4)
|
||||
self.assertEqual(
|
||||
self.api.getDeterministicAddress(self._seed, 3, 1),
|
||||
'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN')
|
||||
sample_deterministic_addr3)
|
||||
self.assertRegexpMatches(
|
||||
self.api.getDeterministicAddress(self._seed, 2, 1),
|
||||
r'^API Error 0002:')
|
||||
|
@ -150,7 +151,7 @@ class TestAPI(TestAPIProto):
|
|||
self.api.createDeterministicAddresses(self._seed, 2, 4)
|
||||
)['addresses']
|
||||
self.assertEqual(len(addresses), 2)
|
||||
self.assertEqual(addresses[0], 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
|
||||
self.assertEqual(addresses[0], sample_deterministic_addr4)
|
||||
for addr in addresses:
|
||||
self.assertEqual(self.api.deleteAddress(addr), 'success')
|
||||
|
||||
|
@ -172,19 +173,18 @@ class TestAPI(TestAPIProto):
|
|||
)
|
||||
# Add known address
|
||||
self.api.addAddressBookEntry(
|
||||
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK',
|
||||
sample_deterministic_addr4,
|
||||
base64.encodestring('tiger_4')
|
||||
)
|
||||
# Check addressbook entry
|
||||
entries = json.loads(
|
||||
self.api.listAddressBookEntries()).get('addresses')[0]
|
||||
self.assertEqual(
|
||||
entries['address'], 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
|
||||
entries['address'], sample_deterministic_addr4)
|
||||
self.assertEqual(
|
||||
base64.decodestring(entries['label']), 'tiger_4')
|
||||
# Remove known address
|
||||
self.api.deleteAddressBookEntry(
|
||||
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
|
||||
self.api.deleteAddressBookEntry(sample_deterministic_addr4)
|
||||
# Addressbook should be empty again
|
||||
self.assertEqual(
|
||||
json.loads(self.api.listAddressBookEntries()).get('addresses'),
|
||||
|
@ -218,7 +218,7 @@ class TestAPI(TestAPIProto):
|
|||
msg = base64.encodestring('test message')
|
||||
msg_subject = base64.encodestring('test_subject')
|
||||
ackdata = self.api.sendMessage(
|
||||
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK', addr, msg_subject, msg)
|
||||
sample_deterministic_addr4, addr, msg_subject, msg)
|
||||
try:
|
||||
# Check ackdata and message status
|
||||
int(ackdata, 16)
|
||||
|
@ -332,19 +332,12 @@ class TestAPI(TestAPIProto):
|
|||
"""Testing chan creation/joining"""
|
||||
# Create chan with known address
|
||||
self.assertEqual(
|
||||
self.api.createChan(self._seed),
|
||||
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'
|
||||
)
|
||||
self.api.createChan(self._seed), sample_deterministic_addr4)
|
||||
# cleanup
|
||||
self.assertEqual(
|
||||
self.api.leaveChan('BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'),
|
||||
'success'
|
||||
)
|
||||
self.api.leaveChan(sample_deterministic_addr4), 'success')
|
||||
# Join chan with addresses of version 3 or 4
|
||||
for addr in (
|
||||
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK',
|
||||
'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN'
|
||||
):
|
||||
for addr in (sample_deterministic_addr4, sample_deterministic_addr3):
|
||||
self.assertEqual(self.api.joinChan(self._seed, addr), 'success')
|
||||
self.assertEqual(self.api.leaveChan(addr), 'success')
|
||||
# Joining with wrong address should fail
|
||||
|
|
|
@ -5,9 +5,9 @@ Test the alternatives for crypto primitives
|
|||
import hashlib
|
||||
import unittest
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from binascii import hexlify, unhexlify
|
||||
from binascii import hexlify
|
||||
|
||||
from pybitmessage.pyelliptic import arithmetic
|
||||
from pybitmessage import highlevelcrypto
|
||||
|
||||
|
||||
try:
|
||||
|
@ -15,29 +15,12 @@ try:
|
|||
except ImportError:
|
||||
RIPEMD = None
|
||||
|
||||
|
||||
# These keys are from addresses test script
|
||||
sample_pubsigningkey = unhexlify(
|
||||
'044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d'
|
||||
'16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce')
|
||||
sample_pubencryptionkey = unhexlify(
|
||||
'044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c'
|
||||
'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9')
|
||||
sample_privsigningkey = \
|
||||
'93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665'
|
||||
sample_privencryptionkey = \
|
||||
'4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a'
|
||||
sample_ripe = b'003cd097eb7f35c87b5dc8b4538c22cb55312a9f'
|
||||
# stream: 1, version: 2
|
||||
sample_address = 'BM-onkVu1KKL2UaUss5Upg9vXmqd3esTmV79'
|
||||
|
||||
sample_factor = 66858749573256452658262553961707680376751171096153613379801854825275240965733
|
||||
# G * sample_factor
|
||||
sample_point = (
|
||||
33567437183004486938355437500683826356288335339807546987348409590129959362313,
|
||||
94730058721143827257669456336351159718085716196507891067256111928318063085006
|
||||
from .samples import (
|
||||
sample_pubsigningkey, sample_pubencryptionkey,
|
||||
sample_privsigningkey, sample_privencryptionkey, sample_ripe
|
||||
)
|
||||
|
||||
|
||||
_sha = hashlib.new('sha512')
|
||||
_sha.update(sample_pubsigningkey + sample_pubencryptionkey)
|
||||
|
||||
|
@ -76,38 +59,16 @@ class TestCrypto(RIPEMD160TestCase, unittest.TestCase):
|
|||
return RIPEMD.RIPEMD160Hash(data).digest()
|
||||
|
||||
|
||||
class TestAddresses(unittest.TestCase):
|
||||
"""Test addresses manipulations"""
|
||||
def test_base10_multiply(self):
|
||||
"""Test arithmetic.base10_multiply"""
|
||||
self.assertEqual(
|
||||
sample_point,
|
||||
arithmetic.base10_multiply(arithmetic.G, sample_factor))
|
||||
class TestHighlevelcrypto(unittest.TestCase):
|
||||
"""Test highlevelcrypto public functions"""
|
||||
|
||||
def test_privtopub(self):
|
||||
"""Generate public keys and check the result"""
|
||||
self.assertEqual(
|
||||
arithmetic.privtopub(sample_privsigningkey).encode(),
|
||||
highlevelcrypto.privToPub(sample_privsigningkey),
|
||||
hexlify(sample_pubsigningkey)
|
||||
)
|
||||
self.assertEqual(
|
||||
arithmetic.privtopub(sample_privencryptionkey).encode(),
|
||||
highlevelcrypto.privToPub(sample_privencryptionkey),
|
||||
hexlify(sample_pubencryptionkey)
|
||||
)
|
||||
|
||||
def test_address(self):
|
||||
"""Create address and check the result"""
|
||||
from pybitmessage import addresses
|
||||
from pybitmessage.fallback import RIPEMD160Hash
|
||||
|
||||
sha = hashlib.new('sha512')
|
||||
sha.update(sample_pubsigningkey + sample_pubencryptionkey)
|
||||
ripe_hash = RIPEMD160Hash(sha.digest()).digest()
|
||||
self.assertEqual(ripe_hash, unhexlify(sample_ripe))
|
||||
|
||||
self.assertEqual(
|
||||
addresses.encodeAddress(2, 1, ripe_hash), sample_address)
|
||||
|
||||
self.assertEqual(
|
||||
addresses.decodeAddress(sample_address),
|
||||
('success', 2, 1, ripe_hash))
|
||||
|
|
|
@ -1,43 +0,0 @@
|
|||
"""
|
||||
Test for network group
|
||||
"""
|
||||
import unittest
|
||||
|
||||
from .common import skip_python3
|
||||
|
||||
skip_python3()
|
||||
|
||||
|
||||
class TestNetworkGroup(unittest.TestCase):
|
||||
"""
|
||||
Test case for network group
|
||||
"""
|
||||
def test_network_group(self):
|
||||
"""Test various types of network groups"""
|
||||
from pybitmessage.protocol import network_group
|
||||
|
||||
test_ip = '1.2.3.4'
|
||||
self.assertEqual('\x01\x02', network_group(test_ip))
|
||||
|
||||
test_ip = '127.0.0.1'
|
||||
self.assertEqual('IPv4', network_group(test_ip))
|
||||
|
||||
test_ip = '0102:0304:0506:0708:090A:0B0C:0D0E:0F10'
|
||||
self.assertEqual(
|
||||
'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C',
|
||||
network_group(test_ip))
|
||||
|
||||
test_ip = 'bootstrap8444.bitmessage.org'
|
||||
self.assertEqual(
|
||||
'bootstrap8444.bitmessage.org',
|
||||
network_group(test_ip))
|
||||
|
||||
test_ip = 'quzwelsuziwqgpt2.onion'
|
||||
self.assertEqual(
|
||||
test_ip,
|
||||
network_group(test_ip))
|
||||
|
||||
test_ip = None
|
||||
self.assertEqual(
|
||||
None,
|
||||
network_group(test_ip))
|
|
@ -2,25 +2,102 @@
|
|||
Tests for common protocol functions
|
||||
"""
|
||||
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
from .common import skip_python3
|
||||
|
||||
skip_python3()
|
||||
from pybitmessage import protocol, state
|
||||
|
||||
|
||||
class TestProtocol(unittest.TestCase):
|
||||
"""Main protocol test case"""
|
||||
|
||||
def test_checkIPv4Address(self):
|
||||
"""Check the results of protocol.checkIPv4Address()"""
|
||||
token = 'HELLO'
|
||||
# checking protocol.encodeHost()[12:]
|
||||
self.assertEqual( # 127.0.0.1
|
||||
token, protocol.checkIPv4Address(b'\x7f\x00\x00\x01', token, True))
|
||||
self.assertFalse(
|
||||
protocol.checkIPv4Address(b'\x7f\x00\x00\x01', token))
|
||||
self.assertEqual( # 10.42.43.1
|
||||
token, protocol.checkIPv4Address(b'\n*+\x01', token, True))
|
||||
self.assertFalse(
|
||||
protocol.checkIPv4Address(b'\n*+\x01', token, False))
|
||||
self.assertEqual( # 192.168.0.254
|
||||
token, protocol.checkIPv4Address(b'\xc0\xa8\x00\xfe', token, True))
|
||||
self.assertEqual( # 172.31.255.254
|
||||
token, protocol.checkIPv4Address(b'\xac\x1f\xff\xfe', token, True))
|
||||
# self.assertEqual( # 169.254.1.1
|
||||
# token, protocol.checkIPv4Address(b'\xa9\xfe\x01\x01', token, True))
|
||||
# self.assertEqual( # 254.128.1.1
|
||||
# token, protocol.checkIPv4Address(b'\xfe\x80\x01\x01', token, True))
|
||||
self.assertFalse( # 8.8.8.8
|
||||
protocol.checkIPv4Address(b'\x08\x08\x08\x08', token, True))
|
||||
|
||||
def test_checkIPv6Address(self):
|
||||
"""Check the results of protocol.checkIPv6Address()"""
|
||||
test_ip = '2001:db8::ff00:42:8329'
|
||||
self.assertEqual(
|
||||
'test', protocol.checkIPv6Address(
|
||||
protocol.encodeHost(test_ip), 'test'))
|
||||
self.assertFalse(
|
||||
protocol.checkIPv6Address(
|
||||
protocol.encodeHost(test_ip), 'test', True))
|
||||
for test_ip in ('fe80::200:5aee:feaa:20a2', 'fdf8:f53b:82e4::53'):
|
||||
self.assertEqual(
|
||||
'test', protocol.checkIPv6Address(
|
||||
protocol.encodeHost(test_ip), 'test', True))
|
||||
self.assertFalse(
|
||||
protocol.checkIPv6Address(
|
||||
protocol.encodeHost(test_ip), 'test'))
|
||||
|
||||
def test_check_local(self):
|
||||
"""Check the logic of TCPConnection.local"""
|
||||
from pybitmessage import protocol, state
|
||||
|
||||
self.assertTrue(
|
||||
protocol.checkIPAddress(protocol.encodeHost('127.0.0.1'), True))
|
||||
self.assertTrue(
|
||||
protocol.checkIPAddress(protocol.encodeHost('192.168.0.1'), True))
|
||||
self.assertTrue(
|
||||
protocol.checkIPAddress(protocol.encodeHost('10.42.43.1'), True))
|
||||
self.assertTrue(
|
||||
protocol.checkIPAddress(protocol.encodeHost('172.31.255.2'), True))
|
||||
self.assertFalse(protocol.checkIPAddress(
|
||||
protocol.encodeHost('2001:db8::ff00:42:8329'), True))
|
||||
|
||||
globalhost = protocol.encodeHost('8.8.8.8')
|
||||
self.assertFalse(protocol.checkIPAddress(globalhost, True))
|
||||
self.assertEqual(protocol.checkIPAddress(globalhost), '8.8.8.8')
|
||||
|
||||
@unittest.skipIf(
|
||||
sys.hexversion >= 0x3000000, 'this is still not working with python3')
|
||||
def test_check_local_socks(self):
|
||||
"""The SOCKS part of the local check"""
|
||||
self.assertTrue(
|
||||
not protocol.checkSocksIP('127.0.0.1')
|
||||
or state.socksIP)
|
||||
|
||||
def test_network_group(self):
|
||||
"""Test various types of network groups"""
|
||||
|
||||
test_ip = '1.2.3.4'
|
||||
self.assertEqual(b'\x01\x02', protocol.network_group(test_ip))
|
||||
|
||||
test_ip = '127.0.0.1'
|
||||
self.assertEqual('IPv4', protocol.network_group(test_ip))
|
||||
|
||||
self.assertEqual(
|
||||
protocol.network_group('8.8.8.8'),
|
||||
protocol.network_group('8.8.4.4'))
|
||||
self.assertNotEqual(
|
||||
protocol.network_group('1.1.1.1'),
|
||||
protocol.network_group('8.8.8.8'))
|
||||
|
||||
test_ip = '0102:0304:0506:0708:090A:0B0C:0D0E:0F10'
|
||||
self.assertEqual(
|
||||
b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C',
|
||||
protocol.network_group(test_ip))
|
||||
|
||||
for test_ip in (
|
||||
'bootstrap8444.bitmessage.org', 'quzwelsuziwqgpt2.onion', None):
|
||||
self.assertEqual(
|
||||
test_ip, protocol.network_group(test_ip))
|
||||
|
|
5
tests.py
5
tests.py
|
@ -14,7 +14,10 @@ def unittest_discover():
|
|||
# randomize the order of tests in test cases
|
||||
loader.sortTestMethodsUsing = lambda a, b: random.randint(-1, 1)
|
||||
# pybitmessage symlink may disappear on Windows
|
||||
return loader.discover('src.tests')
|
||||
testsuite = loader.discover('src.tests')
|
||||
testsuite.addTests([loader.discover('src.pyelliptic')])
|
||||
|
||||
return testsuite
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Reference in New Issue
Block a user