Refactoring crypto base changes #1806

Merged
g1itch merged 14 commits from crypto-sort into v0.6 2021-08-17 15:07:33 +02:00
15 changed files with 324 additions and 175 deletions

View File

@ -1,8 +1,7 @@
"""
Operations with addresses
"""
# pylint: disable=redefined-outer-name,inconsistent-return-statements
import sys
# pylint: disable=inconsistent-return-statements
import hashlib
import logging
from binascii import hexlify, unhexlify
@ -151,30 +150,18 @@ def encodeAddress(version, stream, ripe):
' a given ripe hash was not 20.'
)
if isinstance(ripe, str):
if ripe[:2] == '\x00\x00':
ripe = ripe[2:]
elif ripe[:1] == '\x00':
ripe = ripe[1:]
else:
if ripe[:2] == b'\x00\x00':
ripe = ripe[2:]
elif ripe[:1] == b'\x00':
ripe = ripe[1:]
if ripe[:2] == b'\x00\x00':
ripe = ripe[2:]
elif ripe[:1] == b'\x00':
ripe = ripe[1:]
elif version == 4:
if len(ripe) != 20:
raise Exception(
'Programming error in encodeAddress: The length of'
' a given ripe hash was not 20.')
ripe = ripe.lstrip('\x00')
ripe = ripe.lstrip(b'\x00')
if sys.version_info[0] == 3:
if isinstance(ripe, str):
storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe.encode('utf-8')
else:
storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe
else:
storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe
storedBinaryData = encodeVarint(version) + encodeVarint(stream) + ripe
# Generate the checksum
sha = hashlib.new('sha512')
@ -184,7 +171,10 @@ def encodeAddress(version, stream, ripe):
sha.update(currentHash)
checksum = sha.digest()[0:4]
# FIXME: encodeBase58 should take binary data, to reduce conversions
# encodeBase58(storedBinaryData + checksum)
asInt = int(hexlify(storedBinaryData) + hexlify(checksum), 16)
# should it be str? If yes, it should be everywhere in the code
return 'BM-' + encodeBase58(asInt)

View File

@ -10,17 +10,22 @@ High level cryptographic functions based on `.pyelliptic` OpenSSL bindings.
from binascii import hexlify
import pyelliptic
from bmconfigparser import BMConfigParser
from pyelliptic import OpenSSL
from pyelliptic import arithmetic as a
from bmconfigparser import BMConfigParser
__all__ = ['encrypt', 'makeCryptor', 'pointMult', 'privToPub', 'sign', 'verify']
def makeCryptor(privkey):
"""Return a private `.pyelliptic.ECC` instance"""
private_key = a.changebase(privkey, 16, 256, minlen=32)
public_key = pointMult(private_key)
privkey_bin = '\x02\xca\x00\x20' + private_key
pubkey_bin = '\x02\xca\x00\x20' + public_key[1:-32] + '\x00\x20' + public_key[-32:]
privkey_bin = b'\x02\xca\x00\x20' + private_key
pubkey_bin = (
b'\x02\xca\x00\x20' + public_key[1:-32] + b'\x00\x20' + public_key[-32:]
)
cryptor = pyelliptic.ECC(
curve='secp256k1', privkey=privkey_bin, pubkey=pubkey_bin)
return cryptor
@ -29,7 +34,7 @@ def makeCryptor(privkey):
def hexToPubkey(pubkey):
"""Convert a pubkey from hex to binary"""
pubkey_raw = a.changebase(pubkey[2:], 16, 256, minlen=64)
pubkey_bin = '\x02\xca\x00 ' + pubkey_raw[:32] + '\x00 ' + pubkey_raw[32:]
pubkey_bin = b'\x02\xca\x00 ' + pubkey_raw[:32] + b'\x00 ' + pubkey_raw[32:]
return pubkey_bin

View File

@ -91,16 +91,16 @@ def isBitSetWithinBitfield(fourByteString, n):
return x & 2**n != 0
# ip addresses
# IP addresses
def encodeHost(host):
"""Encode a given host to be used in low-level socket operations"""
if host.find('.onion') > -1:
return '\xfd\x87\xd8\x7e\xeb\x43' + base64.b32decode(
return b'\xfd\x87\xd8\x7e\xeb\x43' + base64.b32decode(
host.split(".")[0], True)
elif host.find(':') == -1:
return '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
return b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(host)
return socket.inet_pton(socket.AF_INET6, host)
@ -147,10 +147,10 @@ def checkIPAddress(host, private=False):
Returns hostStandardFormat if it is a valid IP address,
otherwise returns False
"""
if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
if host[0:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
hostStandardFormat = socket.inet_ntop(socket.AF_INET, host[12:])
return checkIPv4Address(host[12:], hostStandardFormat, private)
elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43':
elif host[0:6] == b'\xfd\x87\xd8\x7e\xeb\x43':
# Onion, based on BMD/bitcoind
hostStandardFormat = base64.b32encode(host[6:]).lower() + ".onion"
if private:
@ -161,7 +161,7 @@ def checkIPAddress(host, private=False):
hostStandardFormat = socket.inet_ntop(socket.AF_INET6, host)
except ValueError:
return False
if hostStandardFormat == "":
if len(hostStandardFormat) == 0:
# This can happen on Windows systems which are
# not 64-bit compatible so let us drop the IPv6 address.
return False
@ -173,23 +173,23 @@ def checkIPv4Address(host, hostStandardFormat, private=False):
Returns hostStandardFormat if it is an IPv4 address,
otherwise returns False
"""
if host[0] == '\x7F': # 127/8
if host[0:1] == b'\x7F': # 127/8
if not private:
logger.debug(
'Ignoring IP address in loopback range: %s',
hostStandardFormat)
return hostStandardFormat if private else False
if host[0] == '\x0A': # 10/8
if host[0:1] == b'\x0A': # 10/8
if not private:
logger.debug(
'Ignoring IP address in private range: %s', hostStandardFormat)
return hostStandardFormat if private else False
if host[0:2] == '\xC0\xA8': # 192.168/16
if host[0:2] == b'\xC0\xA8': # 192.168/16
if not private:
logger.debug(
'Ignoring IP address in private range: %s', hostStandardFormat)
return hostStandardFormat if private else False
if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12
if host[0:2] >= b'\xAC\x10' and host[0:2] < b'\xAC\x20': # 172.16/12
if not private:
logger.debug(
'Ignoring IP address in private range: %s', hostStandardFormat)
@ -202,15 +202,19 @@ def checkIPv6Address(host, hostStandardFormat, private=False):
Returns hostStandardFormat if it is an IPv6 address,
otherwise returns False
"""
if host == ('\x00' * 15) + '\x01':
if host == b'\x00' * 15 + b'\x01':
if not private:
logger.debug('Ignoring loopback address: %s', hostStandardFormat)
return False
if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80:
try:
host = [ord(c) for c in host[:2]]
except TypeError: # python3 has ints already
pass
if host[0] == 0xfe and host[1] & 0xc0 == 0x80:
if not private:
logger.debug('Ignoring local address: %s', hostStandardFormat)
return hostStandardFormat if private else False
if (ord(host[0]) & 0xfe) == 0xfc:
if host[0] & 0xfe == 0xfc:
if not private:
logger.debug(
'Ignoring unique local address: %s', hostStandardFormat)

View File

@ -25,28 +25,31 @@ def inv(a, n):
def get_code_string(base):
"""Returns string according to base value"""
if base == 2:
return '01'
elif base == 10:
return '0123456789'
elif base == 16:
return "0123456789abcdef"
elif base == 58:
return "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
elif base == 256:
return ''.join([chr(x) for x in range(256)])
else:
raise ValueError("Invalid base!")
return b'01'
if base == 10:
return b'0123456789'
if base == 16:
return b'0123456789abcdef'
if base == 58:
return b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
if base == 256:
try:
return b''.join([chr(x) for x in range(256)])
except TypeError:
return bytes([x for x in range(256)])
raise ValueError("Invalid base!")
def encode(val, base, minlen=0):
"""Returns the encoded string"""
code_string = get_code_string(base)
result = ""
result = b''
while val > 0:
val, i = divmod(val, base)
result = code_string[i] + result
result = code_string[i:i + 1] + result
if len(result) < minlen:
result = code_string[0] * (minlen - len(result)) + result
result = code_string[0:1] * (minlen - len(result)) + result
return result
@ -116,7 +119,7 @@ def hex_to_point(h):
def point_to_hex(p):
"""Converting point value to hexadecimal"""
return '04' + encode(p[0], 16, 64) + encode(p[1], 16, 64)
return b'04' + encode(p[0], 16, 64) + encode(p[1], 16, 64)
def multiply(privkey, pubkey):

View File

View File

@ -0,0 +1,84 @@
"""
Test the arithmetic functions
"""
from binascii import unhexlify
import unittest
try:
from pyelliptic import arithmetic
except ImportError:
from pybitmessage.pyelliptic import arithmetic
# These keys are from addresses test script
sample_pubsigningkey = (
b'044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d'
b'16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce')
sample_pubencryptionkey = (
b'044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c'
b'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9')
sample_privsigningkey = \
b'93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665'
sample_privencryptionkey = \
b'4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a'
sample_factor = \
66858749573256452658262553961707680376751171096153613379801854825275240965733
# G * sample_factor
sample_point = (
33567437183004486938355437500683826356288335339807546987348409590129959362313,
94730058721143827257669456336351159718085716196507891067256111928318063085006
)
class TestArithmetic(unittest.TestCase):
"""Test arithmetic functions"""
def test_base10_multiply(self):
"""Test arithmetic.base10_multiply"""
self.assertEqual(
sample_point,
arithmetic.base10_multiply(arithmetic.G, sample_factor))
def test_decode(self):
"""Decode sample privsigningkey from hex to int and compare to factor"""
self.assertEqual(
arithmetic.decode(sample_privsigningkey, 16), sample_factor)
def test_encode(self):
"""Encode sample factor into hex and compare to privsigningkey"""
self.assertEqual(
arithmetic.encode(sample_factor, 16), sample_privsigningkey)
def test_changebase(self):
"""Check the results of changebase()"""
self.assertEqual(
arithmetic.changebase(sample_privsigningkey, 16, 256, minlen=32),
unhexlify(sample_privsigningkey))
self.assertEqual(
arithmetic.changebase(sample_pubsigningkey, 16, 256, minlen=64),
unhexlify(sample_pubsigningkey))
self.assertEqual(
32, # padding
len(arithmetic.changebase(sample_privsigningkey[:5], 16, 256, 32)))
def test_hex_to_point(self):
"""Check that sample_pubsigningkey is sample_point encoded in hex"""
self.assertEqual(
arithmetic.hex_to_point(sample_pubsigningkey), sample_point)
def test_point_to_hex(self):
"""Check that sample_point is sample_pubsigningkey decoded from hex"""
self.assertEqual(
arithmetic.point_to_hex(sample_point), sample_pubsigningkey)
def test_privtopub(self):
"""Generate public keys and check the result"""
self.assertEqual(
arithmetic.privtopub(sample_privsigningkey),
sample_pubsigningkey
)
self.assertEqual(
arithmetic.privtopub(sample_privencryptionkey),
sample_pubencryptionkey
)

View File

@ -5,7 +5,10 @@ import os
import unittest
from hashlib import sha256
from pybitmessage.pyelliptic import ECCBlind, ECCBlindChain, OpenSSL
try:
from pyelliptic import ECCBlind, ECCBlindChain, OpenSSL
except ImportError:
from pybitmessage.pyelliptic import ECCBlind, ECCBlindChain, OpenSSL
# pylint: disable=protected-access

View File

@ -3,7 +3,10 @@ Test if OpenSSL is working correctly
"""
import unittest
from pybitmessage.pyelliptic.openssl import OpenSSL
try:
from pyelliptic.openssl import OpenSSL
except ImportError:
from pybitmessage.pyelliptic import OpenSSL
try:
OpenSSL.BN_bn2binpad
@ -33,7 +36,7 @@ class TestOpenSSL(unittest.TestCase):
@unittest.skipUnless(have_pad, 'Skipping OpenSSL pad test')
def test_padding(self):
"""Test an alternatie implementation of bn2binpad"""
"""Test an alternative implementation of bn2binpad"""
ctx = OpenSSL.BN_CTX_new()
a = OpenSSL.BN_new()

31
src/tests/samples.py Normal file
View File

@ -0,0 +1,31 @@
"""Various sample data"""
from binascii import unhexlify
# These keys are from addresses test script
sample_pubsigningkey = unhexlify(
'044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d'
'16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce')
sample_pubencryptionkey = unhexlify(
'044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c'
'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9')
sample_privsigningkey = \
b'93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665'
sample_privencryptionkey = \
b'4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a'
sample_ripe = b'003cd097eb7f35c87b5dc8b4538c22cb55312a9f'
# stream: 1, version: 2
sample_address = 'BM-onkVu1KKL2UaUss5Upg9vXmqd3esTmV79'
sample_factor = 66858749573256452658262553961707680376751171096153613379801854825275240965733
# G * sample_factor
sample_point = (
33567437183004486938355437500683826356288335339807546987348409590129959362313,
94730058721143827257669456336351159718085716196507891067256111928318063085006
)
sample_seed = 'TIGER, tiger, burning bright. In the forests of the night'
# Deterministic addresses with stream 1 and versions 3, 4
sample_deterministic_addr3 = 'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN'
sample_deterministic_addr4 = 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'

View File

@ -0,0 +1,35 @@
import unittest
from binascii import unhexlify
from pybitmessage import addresses
from .samples import sample_address, sample_ripe
class TestAddresses(unittest.TestCase):
"""Test addresses manipulations"""
def test_decode(self):
"""Decode some well known addresses and check the result"""
self.assertEqual(
addresses.decodeAddress(sample_address),
('success', 2, 1, unhexlify(sample_ripe)))
status, version, stream, ripe1 = addresses.decodeAddress(
'2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
self.assertEqual(status, 'success')
self.assertEqual(stream, 1)
self.assertEqual(version, 4)
status, version, stream, ripe2 = addresses.decodeAddress(
'2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN')
self.assertEqual(status, 'success')
self.assertEqual(stream, 1)
self.assertEqual(version, 3)
self.assertEqual(ripe1, ripe2)
def test_encode(self):
"""Encode sample ripe and compare the result to sample address"""
self.assertEqual(
sample_address,
addresses.encodeAddress(2, 1, unhexlify(sample_ripe)))

View File

@ -10,6 +10,9 @@ from six.moves import xmlrpc_client # nosec
import psutil
from .samples import (
sample_seed, sample_deterministic_addr3, sample_deterministic_addr4)
from .test_process import TestProcessProto
@ -58,9 +61,7 @@ class TestAPIShutdown(TestAPIProto):
class TestAPI(TestAPIProto):
"""Main API test case"""
_seed = base64.encodestring(
'TIGER, tiger, burning bright. In the forests of the night'
)
_seed = base64.encodestring(sample_seed)
def _add_random_address(self, label):
return self.api.createRandomAddress(base64.encodestring(label))
@ -108,7 +109,7 @@ class TestAPI(TestAPIProto):
def test_decode_address(self):
"""Checking the return of API command 'decodeAddress'"""
result = json.loads(
self.api.decodeAddress('BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'))
self.api.decodeAddress(sample_deterministic_addr4))
self.assertEqual(result.get('status'), 'success')
self.assertEqual(result['addressVersion'], 4)
self.assertEqual(result['streamNumber'], 1)
@ -117,10 +118,10 @@ class TestAPI(TestAPIProto):
"""Test creation of deterministic addresses"""
self.assertEqual(
self.api.getDeterministicAddress(self._seed, 4, 1),
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
sample_deterministic_addr4)
self.assertEqual(
self.api.getDeterministicAddress(self._seed, 3, 1),
'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN')
sample_deterministic_addr3)
self.assertRegexpMatches(
self.api.getDeterministicAddress(self._seed, 2, 1),
r'^API Error 0002:')
@ -150,7 +151,7 @@ class TestAPI(TestAPIProto):
self.api.createDeterministicAddresses(self._seed, 2, 4)
)['addresses']
self.assertEqual(len(addresses), 2)
self.assertEqual(addresses[0], 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
self.assertEqual(addresses[0], sample_deterministic_addr4)
for addr in addresses:
self.assertEqual(self.api.deleteAddress(addr), 'success')
@ -172,19 +173,18 @@ class TestAPI(TestAPIProto):
)
# Add known address
self.api.addAddressBookEntry(
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK',
sample_deterministic_addr4,
base64.encodestring('tiger_4')
)
# Check addressbook entry
entries = json.loads(
self.api.listAddressBookEntries()).get('addresses')[0]
self.assertEqual(
entries['address'], 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
entries['address'], sample_deterministic_addr4)
self.assertEqual(
base64.decodestring(entries['label']), 'tiger_4')
# Remove known address
self.api.deleteAddressBookEntry(
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
self.api.deleteAddressBookEntry(sample_deterministic_addr4)
# Addressbook should be empty again
self.assertEqual(
json.loads(self.api.listAddressBookEntries()).get('addresses'),
@ -218,7 +218,7 @@ class TestAPI(TestAPIProto):
msg = base64.encodestring('test message')
msg_subject = base64.encodestring('test_subject')
ackdata = self.api.sendMessage(
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK', addr, msg_subject, msg)
sample_deterministic_addr4, addr, msg_subject, msg)
try:
# Check ackdata and message status
int(ackdata, 16)
@ -332,19 +332,12 @@ class TestAPI(TestAPIProto):
"""Testing chan creation/joining"""
# Create chan with known address
self.assertEqual(
self.api.createChan(self._seed),
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'
)
self.api.createChan(self._seed), sample_deterministic_addr4)
# cleanup
self.assertEqual(
self.api.leaveChan('BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'),
'success'
)
self.api.leaveChan(sample_deterministic_addr4), 'success')
# Join chan with addresses of version 3 or 4
for addr in (
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK',
'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN'
):
for addr in (sample_deterministic_addr4, sample_deterministic_addr3):
self.assertEqual(self.api.joinChan(self._seed, addr), 'success')
self.assertEqual(self.api.leaveChan(addr), 'success')
# Joining with wrong address should fail

View File

@ -5,9 +5,9 @@ Test the alternatives for crypto primitives
import hashlib
import unittest
from abc import ABCMeta, abstractmethod
from binascii import hexlify, unhexlify
from binascii import hexlify
from pybitmessage.pyelliptic import arithmetic
from pybitmessage import highlevelcrypto
try:
@ -15,29 +15,12 @@ try:
except ImportError:
RIPEMD = None
# These keys are from addresses test script
sample_pubsigningkey = unhexlify(
'044a367f049ec16cb6b6118eb734a9962d10b8db59c890cd08f210c43ff08bdf09d'
'16f502ca26cd0713f38988a1237f1fc8fa07b15653c996dc4013af6d15505ce')
sample_pubencryptionkey = unhexlify(
'044597d59177fc1d89555d38915f581b5ff2286b39d022ca0283d2bdd5c36be5d3c'
'e7b9b97792327851a562752e4b79475d1f51f5a71352482b241227f45ed36a9')
sample_privsigningkey = \
'93d0b61371a54b53df143b954035d612f8efa8a3ed1cf842c2186bfd8f876665'
sample_privencryptionkey = \
'4b0b73a54e19b059dc274ab69df095fe699f43b17397bca26fdf40f4d7400a3a'
sample_ripe = b'003cd097eb7f35c87b5dc8b4538c22cb55312a9f'
# stream: 1, version: 2
sample_address = 'BM-onkVu1KKL2UaUss5Upg9vXmqd3esTmV79'
sample_factor = 66858749573256452658262553961707680376751171096153613379801854825275240965733
# G * sample_factor
sample_point = (
33567437183004486938355437500683826356288335339807546987348409590129959362313,
94730058721143827257669456336351159718085716196507891067256111928318063085006
from .samples import (
sample_pubsigningkey, sample_pubencryptionkey,
sample_privsigningkey, sample_privencryptionkey, sample_ripe
)
_sha = hashlib.new('sha512')
_sha.update(sample_pubsigningkey + sample_pubencryptionkey)
@ -76,38 +59,16 @@ class TestCrypto(RIPEMD160TestCase, unittest.TestCase):
return RIPEMD.RIPEMD160Hash(data).digest()
class TestAddresses(unittest.TestCase):
"""Test addresses manipulations"""
def test_base10_multiply(self):
"""Test arithmetic.base10_multiply"""
self.assertEqual(
sample_point,
arithmetic.base10_multiply(arithmetic.G, sample_factor))
class TestHighlevelcrypto(unittest.TestCase):
"""Test highlevelcrypto public functions"""
def test_privtopub(self):
"""Generate public keys and check the result"""
self.assertEqual(
arithmetic.privtopub(sample_privsigningkey).encode(),
highlevelcrypto.privToPub(sample_privsigningkey),
hexlify(sample_pubsigningkey)
)
self.assertEqual(
arithmetic.privtopub(sample_privencryptionkey).encode(),
highlevelcrypto.privToPub(sample_privencryptionkey),
hexlify(sample_pubencryptionkey)
)
def test_address(self):
"""Create address and check the result"""
from pybitmessage import addresses
from pybitmessage.fallback import RIPEMD160Hash
sha = hashlib.new('sha512')
sha.update(sample_pubsigningkey + sample_pubencryptionkey)
ripe_hash = RIPEMD160Hash(sha.digest()).digest()
self.assertEqual(ripe_hash, unhexlify(sample_ripe))
self.assertEqual(
addresses.encodeAddress(2, 1, ripe_hash), sample_address)
self.assertEqual(
addresses.decodeAddress(sample_address),
('success', 2, 1, ripe_hash))

View File

@ -1,43 +0,0 @@
"""
Test for network group
"""
import unittest
from .common import skip_python3
skip_python3()
class TestNetworkGroup(unittest.TestCase):
"""
Test case for network group
"""
def test_network_group(self):
"""Test various types of network groups"""
from pybitmessage.protocol import network_group
test_ip = '1.2.3.4'
self.assertEqual('\x01\x02', network_group(test_ip))
test_ip = '127.0.0.1'
self.assertEqual('IPv4', network_group(test_ip))
test_ip = '0102:0304:0506:0708:090A:0B0C:0D0E:0F10'
self.assertEqual(
'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C',
network_group(test_ip))
test_ip = 'bootstrap8444.bitmessage.org'
self.assertEqual(
'bootstrap8444.bitmessage.org',
network_group(test_ip))
test_ip = 'quzwelsuziwqgpt2.onion'
self.assertEqual(
test_ip,
network_group(test_ip))
test_ip = None
self.assertEqual(
None,
network_group(test_ip))

View File

@ -2,25 +2,102 @@
Tests for common protocol functions
"""
import sys
import unittest
from .common import skip_python3
skip_python3()
from pybitmessage import protocol, state
class TestProtocol(unittest.TestCase):
"""Main protocol test case"""
def test_checkIPv4Address(self):
"""Check the results of protocol.checkIPv4Address()"""
token = 'HELLO'
# checking protocol.encodeHost()[12:]
self.assertEqual( # 127.0.0.1
token, protocol.checkIPv4Address(b'\x7f\x00\x00\x01', token, True))
self.assertFalse(
protocol.checkIPv4Address(b'\x7f\x00\x00\x01', token))
self.assertEqual( # 10.42.43.1
token, protocol.checkIPv4Address(b'\n*+\x01', token, True))
self.assertFalse(
protocol.checkIPv4Address(b'\n*+\x01', token, False))
self.assertEqual( # 192.168.0.254
token, protocol.checkIPv4Address(b'\xc0\xa8\x00\xfe', token, True))
self.assertEqual( # 172.31.255.254
token, protocol.checkIPv4Address(b'\xac\x1f\xff\xfe', token, True))
# self.assertEqual( # 169.254.1.1
# token, protocol.checkIPv4Address(b'\xa9\xfe\x01\x01', token, True))
# self.assertEqual( # 254.128.1.1
# token, protocol.checkIPv4Address(b'\xfe\x80\x01\x01', token, True))
self.assertFalse( # 8.8.8.8
protocol.checkIPv4Address(b'\x08\x08\x08\x08', token, True))
def test_checkIPv6Address(self):
"""Check the results of protocol.checkIPv6Address()"""
test_ip = '2001:db8::ff00:42:8329'
self.assertEqual(
'test', protocol.checkIPv6Address(
protocol.encodeHost(test_ip), 'test'))
self.assertFalse(
protocol.checkIPv6Address(
protocol.encodeHost(test_ip), 'test', True))
for test_ip in ('fe80::200:5aee:feaa:20a2', 'fdf8:f53b:82e4::53'):
self.assertEqual(
'test', protocol.checkIPv6Address(
protocol.encodeHost(test_ip), 'test', True))
self.assertFalse(
protocol.checkIPv6Address(
protocol.encodeHost(test_ip), 'test'))
def test_check_local(self):
"""Check the logic of TCPConnection.local"""
from pybitmessage import protocol, state
self.assertTrue(
protocol.checkIPAddress(protocol.encodeHost('127.0.0.1'), True))
self.assertTrue(
protocol.checkIPAddress(protocol.encodeHost('192.168.0.1'), True))
self.assertTrue(
protocol.checkIPAddress(protocol.encodeHost('10.42.43.1'), True))
self.assertTrue(
protocol.checkIPAddress(protocol.encodeHost('172.31.255.2'), True))
self.assertFalse(protocol.checkIPAddress(
protocol.encodeHost('2001:db8::ff00:42:8329'), True))
globalhost = protocol.encodeHost('8.8.8.8')
self.assertFalse(protocol.checkIPAddress(globalhost, True))
self.assertEqual(protocol.checkIPAddress(globalhost), '8.8.8.8')
@unittest.skipIf(
sys.hexversion >= 0x3000000, 'this is still not working with python3')
def test_check_local_socks(self):
"""The SOCKS part of the local check"""
self.assertTrue(
not protocol.checkSocksIP('127.0.0.1')
or state.socksIP)
def test_network_group(self):
"""Test various types of network groups"""
test_ip = '1.2.3.4'
self.assertEqual(b'\x01\x02', protocol.network_group(test_ip))
test_ip = '127.0.0.1'
self.assertEqual('IPv4', protocol.network_group(test_ip))
self.assertEqual(
protocol.network_group('8.8.8.8'),
protocol.network_group('8.8.4.4'))
self.assertNotEqual(
protocol.network_group('1.1.1.1'),
protocol.network_group('8.8.8.8'))
test_ip = '0102:0304:0506:0708:090A:0B0C:0D0E:0F10'
self.assertEqual(
b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C',
protocol.network_group(test_ip))
for test_ip in (
'bootstrap8444.bitmessage.org', 'quzwelsuziwqgpt2.onion', None):
self.assertEqual(
test_ip, protocol.network_group(test_ip))

View File

@ -14,7 +14,10 @@ def unittest_discover():
# randomize the order of tests in test cases
loader.sortTestMethodsUsing = lambda a, b: random.randint(-1, 1)
# pybitmessage symlink may disappear on Windows
return loader.discover('src.tests')
testsuite = loader.discover('src.tests')
testsuite.addTests([loader.discover('src.pyelliptic')])
return testsuite
if __name__ == "__main__":