V0.6 test api #1781

Closed
kdcis wants to merge 8 commits from v0.6-test-api into v0.6
6 changed files with 167 additions and 97 deletions

View File

@ -2,7 +2,6 @@
import os import os
import random import random
from pyelliptic.openssl import OpenSSL from pyelliptic.openssl import OpenSSL
NoneType = type(None) NoneType = type(None)

View File

@ -3,6 +3,7 @@ A queue with multiple internal subqueues.
Elements are added into a random subqueue, and retrieval rotates Elements are added into a random subqueue, and retrieval rotates
""" """
import helper_random
import sys import sys
if sys.version_info[0] == 3: if sys.version_info[0] == 3:
import queue as Queue import queue as Queue
@ -11,11 +12,6 @@ else:
from collections import deque from collections import deque
if sys.version_info[0] == 3:
from . import helper_random
else:
import helper_random
class MultiQueue(Queue.Queue): class MultiQueue(Queue.Queue):
"""A base queue class""" """A base queue class"""

View File

@ -3,9 +3,13 @@
import os import os
import pkg_resources import pkg_resources
dist = pkg_resources.get_distribution('pybitmessage') dist = pkg_resources.get_distribution('pybitmessage')
script_file = os.path.join(dist.location, dist.key, 'bitmessagemain.py') script_file = os.path.join(dist.location, dist.key, 'bitmessagemain.py')
new_globals = globals() new_globals = globals()
new_globals.update(__file__=script_file) new_globals.update(__file__=script_file)
try:
execfile(script_file, new_globals) execfile(script_file, new_globals)
except NameError:
exec(compile(open(script_file, "rb").read(), script_file, 'exec'), new_globals)

View File

@ -1,3 +1,4 @@
# pylint: disable=E1101
""" """
Tests using API. Tests using API.
""" """
@ -5,9 +6,8 @@ Tests using API.
import base64 import base64
import json import json
import time import time
from .common import skip_python3 import sys
import six
skip_python3()
try: # nosec try: # nosec
from xmlrpclib import ServerProxy, ProtocolError from xmlrpclib import ServerProxy, ProtocolError
@ -16,6 +16,8 @@ except ImportError:
from .test_process import TestProcessProto, TestProcessShutdown from .test_process import TestProcessProto, TestProcessShutdown
PY3 = sys.version_info[0] >= 3
class TestAPIProto(TestProcessProto): class TestAPIProto(TestProcessProto):
"""Test case logic for testing API""" """Test case logic for testing API"""
@ -63,11 +65,17 @@ class TestAPIShutdown(TestAPIProto, TestProcessShutdown):
class TestAPI(TestAPIProto): class TestAPI(TestAPIProto):
"""Main API test case""" """Main API test case"""
if PY3:
_seed = base64.encodebytes(
b'TIGER, tiger, burning bright. In the forests of the night')
else:
_seed = base64.encodestring( _seed = base64.encodestring(
'TIGER, tiger, burning bright. In the forests of the night' 'TIGER, tiger, burning bright. In the forests of the night')
)
def _add_random_address(self, label): def _add_random_address(self, label):
if PY3:
return self.api.createRandomAddress(base64.encodebytes(bytes(label, 'UTF-8')).decode('utf-8'))
else:
return self.api.createRandomAddress(base64.encodestring(label)) return self.api.createRandomAddress(base64.encodestring(label))
def test_user_password(self): def test_user_password(self):
@ -120,38 +128,38 @@ class TestAPI(TestAPIProto):
def test_create_deterministic_addresses(self): def test_create_deterministic_addresses(self):
"""Test creation of deterministic addresses""" """Test creation of deterministic addresses"""
self.assertEqual( self.assertEqual(
self.api.getDeterministicAddress(self._seed, 4, 1), self.api.getDeterministicAddress(self._seed.decode('utf-8'), 4, 1),
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK') 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
self.assertEqual( self.assertEqual(
self.api.getDeterministicAddress(self._seed, 3, 1), self.api.getDeterministicAddress(self._seed.decode('utf-8'), 3, 1),
'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN') 'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN')
self.assertRegexpMatches( six.assertRegex(
self.api.getDeterministicAddress(self._seed, 2, 1), self, self.api.getDeterministicAddress(self._seed.decode('utf-8'), 2, 1),
r'^API Error 0002:') r'^API Error 0002:')
# This is here until the streams will be implemented # This is here until the streams will be implemented
self.assertRegexpMatches( six.assertRegex(
self.api.getDeterministicAddress(self._seed, 3, 2), self, self.api.getDeterministicAddress(self._seed.decode('utf-8'), 3, 2),
r'API Error 0003:') r'API Error 0003:')
self.assertRegexpMatches( six.assertRegex(
self.api.createDeterministicAddresses(self._seed, 1, 4, 2), self, self.api.createDeterministicAddresses(self._seed.decode('utf-8'), 1, 4, 2),
r'API Error 0003:') r'API Error 0003:')
self.assertRegexpMatches( six.assertRegex(
self.api.createDeterministicAddresses('', 1), self, self.api.createDeterministicAddresses('', 1),
r'API Error 0001:') r'API Error 0001:')
self.assertRegexpMatches( six.assertRegex(
self.api.createDeterministicAddresses(self._seed, 1, 2), self, self.api.createDeterministicAddresses(self._seed.decode('utf-8'), 1, 2),
r'API Error 0002:') r'API Error 0002:')
self.assertRegexpMatches( six.assertRegex(
self.api.createDeterministicAddresses(self._seed, 0), self, self.api.createDeterministicAddresses(self._seed.decode('utf-8'), 0),
r'API Error 0004:') r'API Error 0004:')
self.assertRegexpMatches( six.assertRegex(
self.api.createDeterministicAddresses(self._seed, 1000), self, self.api.createDeterministicAddresses(self._seed.decode('utf-8'), 1000),
r'API Error 0005:') r'API Error 0005:')
addresses = json.loads( addresses = json.loads(
self.api.createDeterministicAddresses(self._seed, 2, 4) self.api.createDeterministicAddresses(self._seed.decode('utf-8'), 2, 4)
)['addresses'] )['addresses']
self.assertEqual(len(addresses), 2) self.assertEqual(len(addresses), 2)
self.assertEqual(addresses[0], 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK') self.assertEqual(addresses[0], 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
@ -161,14 +169,41 @@ class TestAPI(TestAPIProto):
def test_create_random_address(self): def test_create_random_address(self):
"""API command 'createRandomAddress': basic BM-address validation""" """API command 'createRandomAddress': basic BM-address validation"""
addr = self._add_random_address('random_1') addr = self._add_random_address('random_1')
self.assertRegexpMatches(addr, r'^BM-') six.assertRegex(self, addr, r'^BM-')
self.assertRegexpMatches(addr[3:], r'[a-zA-Z1-9]+$') six.assertRegex(self, addr[3:], r'[a-zA-Z1-9]+$')
# Whitepaper says "around 36 character" # Whitepaper says "around 36 character"
self.assertLessEqual(len(addr[3:]), 40) self.assertLessEqual(len(addr[3:]), 40)
self.assertEqual(self.api.deleteAddress(addr), 'success') self.assertEqual(self.api.deleteAddress(addr), 'success')
def test_addressbook(self): def test_addressbook(self):
"""Testing API commands for addressbook manipulations""" """Testing API commands for addressbook manipulations"""
if PY3:
# Initially it's empty
self.assertEqual(
json.loads(self.api.listAddressBookEntries()).get('addresses'),
[]
)
# Add known address
self.api.addAddressBookEntry(
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK',
base64.encodebytes('tiger_4'.encode('UTF-8')).decode('utf-8')
)
# Check addressbook entry
entries = json.loads(
self.api.listAddressBookEntries()).get('addresses')[0]
self.assertEqual(
entries['address'], 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
self.assertEqual(
base64.decodebytes(bytes(entries['label'], 'utf-8')).decode('utf-8'), 'tiger_4')
# Remove known address
self.api.deleteAddressBookEntry(
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK')
# Addressbook should be empty again
self.assertEqual(
json.loads(self.api.listAddressBookEntries()).get('addresses'),
[]
)
else:
# Initially it's empty # Initially it's empty
self.assertEqual( self.assertEqual(
json.loads(self.api.listAddressBookEntries()).get('addresses'), json.loads(self.api.listAddressBookEntries()).get('addresses'),
@ -197,6 +232,25 @@ class TestAPI(TestAPIProto):
def test_subscriptions(self): def test_subscriptions(self):
"""Testing the API commands related to subscriptions""" """Testing the API commands related to subscriptions"""
if PY3:
for s in json.loads(self.api.listSubscriptions())['subscriptions']:
# special address, added when sqlThread starts
if s['address'] == 'BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw':
self.assertEqual(
base64.decodebytes(bytes(s['label'], 'utf-8')).decode('utf-8'),
'Bitmessage new releases/announcements')
self.assertTrue(s['enabled'])
break
else:
self.fail(
'Could not find Bitmessage new releases/announcements'
' in subscriptions')
self.assertEqual(
self.api.deleteSubscription('BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw'),
'Deleted subscription if it existed.')
self.assertEqual(
json.loads(self.api.listSubscriptions())['subscriptions'], [])
else:
for s in json.loads(self.api.listSubscriptions())['subscriptions']: for s in json.loads(self.api.listSubscriptions())['subscriptions']:
# special address, added when sqlThread starts # special address, added when sqlThread starts
if s['address'] == 'BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw': if s['address'] == 'BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw':
@ -217,7 +271,11 @@ class TestAPI(TestAPIProto):
def test_send(self): def test_send(self):
"""Test message sending""" """Test message sending"""
# self.api.createDeterministicAddresses(self._seed, 1, 4) if PY3:
addr = str(self._add_random_address('random_2'))
msg = str(base64.encodebytes('test message'.encode('UTF-8')).decode('utf-8'))
msg_subject = str(base64.encodebytes('test_subject'.encode('UTF-8')).decode('utf-8'))
else:
addr = self._add_random_address('random_2') addr = self._add_random_address('random_2')
msg = base64.encodestring('test message') msg = base64.encodestring('test message')
msg_subject = base64.encodestring('test_subject') msg_subject = base64.encodestring('test_subject')
@ -277,6 +335,12 @@ class TestAPI(TestAPIProto):
def test_send_broadcast(self): def test_send_broadcast(self):
"""Test broadcast sending""" """Test broadcast sending"""
if PY3:
addr = self._add_random_address('random_2')
msg = base64.encodebytes('test broadcast'.encode('UTF-8')).decode('utf-8')
ackdata = self.api.sendBroadcast(
addr, base64.encodebytes('test_subject'.encode('UTF-8')).decode('utf-8'), msg)
else:
addr = self._add_random_address('random_2') addr = self._add_random_address('random_2')
msg = base64.encodestring('test broadcast') msg = base64.encodestring('test broadcast')
ackdata = self.api.sendBroadcast( ackdata = self.api.sendBroadcast(
@ -326,7 +390,7 @@ class TestAPI(TestAPIProto):
"""Testing chan creation/joining""" """Testing chan creation/joining"""
# Create chan with known address # Create chan with known address
self.assertEqual( self.assertEqual(
self.api.createChan(self._seed), self.api.createChan(self._seed.decode("utf-8")),
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK' 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK'
) )
# cleanup # cleanup
@ -339,10 +403,10 @@ class TestAPI(TestAPIProto):
'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK', 'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK',
'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN' 'BM-2DBPTgeSawWYZceFD69AbDT5q4iUWtj1ZN'
): ):
self.assertEqual(self.api.joinChan(self._seed, addr), 'success') self.assertEqual(self.api.joinChan(self._seed.decode("utf-8"), addr), 'success')
self.assertEqual(self.api.leaveChan(addr), 'success') self.assertEqual(self.api.leaveChan(addr), 'success')
# Joining with wrong address should fail # Joining with wrong address should fail
self.assertRegexpMatches( six.assertRegex(
self.api.joinChan(self._seed, 'BM-2cWzSnwjJ7yRP3nLEW'), self, self.api.joinChan(self._seed.decode("utf-8"), 'BM-2cWzSnwjJ7yRP3nLEW'),
r'^API Error 0008:' r'^API Error 0008:'
) )

View File

@ -1,11 +1,14 @@
# pylint: disable=E1101
""" """
Testing the logger configuration Testing the logger configuration
""" """
import os import os
import tempfile import tempfile
import sys
from .test_process import TestProcessProto from .test_process import TestProcessProto
PY3 = sys.version_info[0] >= 3
class TestLogger(TestProcessProto): class TestLogger(TestProcessProto):
@ -43,6 +46,9 @@ handlers=default
cls.log_file = os.path.join(cls.home, 'debug.log') cls.log_file = os.path.join(cls.home, 'debug.log')
with open(os.path.join(cls.home, 'logging.dat'), 'wb') as dst: with open(os.path.join(cls.home, 'logging.dat'), 'wb') as dst:
if PY3:
dst.write(bytes(cls.conf_template.format(cls.log_file, cls.pattern), 'utf-8'))
else:
dst.write(cls.conf_template.format(cls.log_file, cls.pattern)) dst.write(cls.conf_template.format(cls.log_file, cls.pattern))
super(TestLogger, cls).setUpClass() super(TestLogger, cls).setUpClass()
@ -52,5 +58,9 @@ handlers=default
self._stop_process() self._stop_process()
data = open(self.log_file).read() data = open(self.log_file).read()
if PY3:
self.assertRegex(data, self.pattern)
self.assertRegex(data, 'Loaded logger configuration')
else:
self.assertRegexpMatches(data, self.pattern) self.assertRegexpMatches(data, self.pattern)
self.assertRegexpMatches(data, 'Loaded logger configuration') self.assertRegexpMatches(data, 'Loaded logger configuration')

View File

@ -9,13 +9,9 @@ import sys
import tempfile import tempfile
import time import time
import unittest import unittest
import psutil import psutil
from .common import cleanup, put_signal_file, skip_python3 from .common import cleanup, put_signal_file
skip_python3()
class TestProcessProto(unittest.TestCase): class TestProcessProto(unittest.TestCase):
@ -87,7 +83,8 @@ class TestProcessProto(unittest.TestCase):
def _get_readline(cls, pfile): def _get_readline(cls, pfile):
pfile = os.path.join(cls.home, pfile) pfile = os.path.join(cls.home, pfile)
try: try:
return open(pfile, 'rb').readline().strip() with open(pfile, 'rb') as p:
return p.readline().strip()
except (OSError, IOError): except (OSError, IOError):
pass pass
@ -145,7 +142,7 @@ class TestProcessProto(unittest.TestCase):
thread_names = subprocess.check_output([ thread_names = subprocess.check_output([
"ps", "-L", "-o", "comm=", "--pid", "ps", "-L", "-o", "comm=", "--pid",
str(self.process.pid) str(self.process.pid)
]).split() ]).decode('utf-8').split()
except: # pylint: disable=bare-except except: # pylint: disable=bare-except
thread_names = [] thread_names = []
@ -155,7 +152,7 @@ class TestProcessProto(unittest.TestCase):
missing_threads = [] missing_threads = []
for thread_name in thread_names: for thread_name in thread_names:
if thread_name not in self._threads_names: if thread_name not in self._threads_names:
extra_threads.append(thread_name) extra_threads.append(thread_name.decode('utf-8'))
for thread_name in self._threads_names: for thread_name in self._threads_names:
if thread_name not in thread_names: if thread_name not in thread_names:
missing_threads.append(thread_name) missing_threads.append(thread_name)