diff --git a/docs/conf.py b/docs/conf.py index b6e75cc1..3464e056 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -49,6 +49,8 @@ extensions = [ 'm2r', ] +default_role = 'obj' + # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] @@ -199,7 +201,6 @@ epub_exclude_files = ['search.html'] autodoc_mock_imports = [ 'debug', 'pybitmessage.bitmessagekivy', - 'pybitmessage.bitmessagemain', 'pybitmessage.bitmessageqt.addressvalidator', 'pybitmessage.helper_startup', 'pybitmessage.network.httpd', @@ -219,17 +220,17 @@ autodoc_mock_imports = [ 'qrcode', 'stem', ] +autodoc_member_order = 'bysource' # Apidoc settings apidoc_module_dir = '../pybitmessage' apidoc_output_dir = 'autodoc' apidoc_excluded_paths = [ - 'bitmessagekivy', 'bitmessagemain.py', 'build_osx.py', + 'bitmessagekivy', 'build_osx.py', 'bitmessageqt/addressvalidator.py', 'bitmessageqt/migrationwizard.py', - 'bitmessageqt/newaddresswizard.py', - 'class_objectProcessor.py', 'defaults.py', 'helper_startup.py', + 'bitmessageqt/newaddresswizard.py', 'helper_startup.py', 'kivymd', 'main.py', 'navigationdrawer', 'network/http*', - 'pybitmessage', 'queues.py', 'tests', 'version.py' + 'pybitmessage', 'tests', 'version.py' ] apidoc_module_first = True apidoc_separate_modules = True diff --git a/src/addresses.py b/src/addresses.py index 533ec169..b83f3f6e 100644 --- a/src/addresses.py +++ b/src/addresses.py @@ -54,11 +54,20 @@ def decodeBase58(string, alphabet=ALPHABET): return num +class varintEncodeError(Exception): + """Exception class for encoding varint""" + pass + + +class varintDecodeError(Exception): + """Exception class for decoding varint data""" + pass + + def encodeVarint(integer): """Convert integer into varint bytes""" if integer < 0: - logger.error('varint cannot be < 0') - raise SystemExit + raise varintEncodeError('varint cannot be < 0') if integer < 253: return pack('>B', integer) if integer >= 253 and integer < 65536: @@ -68,13 +77,7 @@ def encodeVarint(integer): if integer >= 4294967296 and integer < 18446744073709551616: return pack('>B', 255) + pack('>Q', integer) if integer >= 18446744073709551616: - logger.error('varint cannot be >= 18446744073709551616') - raise SystemExit - - -class varintDecodeError(Exception): - """Exception class for decoding varint data""" - pass + raise varintEncodeError('varint cannot be >= 18446744073709551616') def decodeVarint(data): diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 4ad9311f..c70eb0bf 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -7,8 +7,6 @@ # Right now, PyBitmessage only support connecting to stream 1. It doesn't # yet contain logic to expand into further streams. -# The software version variable is now held in shared.py - import os import sys @@ -31,40 +29,27 @@ import time import traceback from struct import pack +import defaults +import shared +import state +import shutdown +from bmconfigparser import BMConfigParser +from debug import logger # this should go before any threads from helper_startup import ( isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections ) -from singleinstance import singleinstance - -import defaults -import shared -import knownnodes -import state -import shutdown -from debug import logger # this should go before any threads - -# Classes -from class_sqlThread import sqlThread -from class_singleCleaner import singleCleaner -from class_objectProcessor import objectProcessor -from class_singleWorker import singleWorker -from class_addressGenerator import addressGenerator -from bmconfigparser import BMConfigParser - from inventory import Inventory - -from network.connectionpool import BMConnectionPool -from network.dandelion import Dandelion -from network.networkthread import BMNetworkThread -from network.receivequeuethread import ReceiveQueueThread -from network.announcethread import AnnounceThread -from network.invthread import InvThread -from network.addrthread import AddrThread -from network.downloadthread import DownloadThread -from network.uploadthread import UploadThread - -# Helper Functions -import helper_threading +from knownnodes import readKnownNodes +# Network objects and threads +from network import ( + BMConnectionPool, Dandelion, + AddrThread, AnnounceThread, BMNetworkThread, InvThread, ReceiveQueueThread, + DownloadThread, UploadThread) +from singleinstance import singleinstance +# Synchronous threads +from threads import ( + set_thread_name, + addressGenerator, objectProcessor, singleCleaner, singleWorker, sqlThread) def connectToStream(streamNumber): @@ -84,14 +69,6 @@ def connectToStream(streamNumber): except: pass - with knownnodes.knownNodesLock: - if streamNumber not in knownnodes.knownNodes: - knownnodes.knownNodes[streamNumber] = {} - if streamNumber * 2 not in knownnodes.knownNodes: - knownnodes.knownNodes[streamNumber * 2] = {} - if streamNumber * 2 + 1 not in knownnodes.knownNodes: - knownnodes.knownNodes[streamNumber * 2 + 1] = {} - BMConnectionPool().connectToStream(streamNumber) @@ -275,7 +252,7 @@ class Main: self.setSignalHandler() - helper_threading.set_thread_name("PyBitmessage") + set_thread_name("PyBitmessage") state.dandelion = config.safeGetInt('network', 'dandelion') # dandelion requires outbound connections, without them, @@ -291,7 +268,7 @@ class Main: defaults.networkDefaultPayloadLengthExtraBytes = int( defaults.networkDefaultPayloadLengthExtraBytes / 100) - knownnodes.readKnownNodes() + readKnownNodes() # Not needed if objproc is disabled if state.enableObjProc: diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index 5f014563..6fbf5df6 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -14,7 +14,7 @@ import network.stats import shared import widgets from inventory import Inventory -from network.connectionpool import BMConnectionPool +from network import BMConnectionPool from retranslateui import RetranslateMixin from tr import _translate from uisignaler import UISignaler diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index fa268377..c7c7e261 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -14,7 +14,7 @@ import highlevelcrypto from bmconfigparser import BMConfigParser from addresses import decodeAddress, encodeAddress, encodeVarint from fallback import RIPEMD160Hash -from network.threads import StoppableThread +from network import StoppableThread class addressGenerator(StoppableThread): @@ -29,6 +29,10 @@ class addressGenerator(StoppableThread): super(addressGenerator, self).stopThread() def run(self): + """ + Process the requests for addresses generation + from `.queues.addressGeneratorQueue` + """ while state.shutdown == 0: queueValue = queues.addressGeneratorQueue.get() nonceTrialsPerByte = 0 diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 6ae46658..b22876e8 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -21,6 +21,7 @@ import helper_sent from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery from helper_ackPayload import genAckPayload from network import bmproto +from network.node import Peer import protocol import queues import state @@ -57,6 +58,7 @@ class objectProcessor(threading.Thread): self.successfullyDecryptMessageTimings = [] def run(self): + """Process the objects from `.queues.objectProcessorQueue`""" while True: objectType, data = queues.objectProcessorQueue.get() @@ -160,7 +162,7 @@ class objectProcessor(threading.Thread): if not host: return - peer = state.Peer(host, port) + peer = Peer(host, port) with knownnodes.knownNodesLock: knownnodes.addKnownNode( stream, peer, is_self=state.ownAddresses.get(peer)) @@ -1051,7 +1053,8 @@ class objectProcessor(threading.Thread): # for it. elif addressVersion >= 4: tag = hashlib.sha512(hashlib.sha512( - encodeVarint(addressVersion) + encodeVarint(streamNumber) + ripe + encodeVarint(addressVersion) + encodeVarint(streamNumber) + + ripe ).digest()).digest()[32:] if tag in state.neededPubkeys: del state.neededPubkeys[tag] @@ -1059,9 +1062,8 @@ class objectProcessor(threading.Thread): def sendMessages(self, address): """ - This function is called by the possibleNewPubkey function when - that function sees that we now have the necessary pubkey - to send one or more messages. + This method is called by the `possibleNewPubkey` when it sees + that we now have the necessary pubkey to send one or more messages. """ logger.info('We have been awaiting the arrival of this pubkey.') sqlExecute( diff --git a/src/class_objectProcessorQueue.py b/src/class_objectProcessorQueue.py deleted file mode 100644 index b6628816..00000000 --- a/src/class_objectProcessorQueue.py +++ /dev/null @@ -1,24 +0,0 @@ -import Queue -import threading -import time - -class ObjectProcessorQueue(Queue.Queue): - maxSize = 32000000 - - def __init__(self): - Queue.Queue.__init__(self) - self.sizeLock = threading.Lock() - self.curSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects. - - def put(self, item, block = True, timeout = None): - while self.curSize >= self.maxSize: - time.sleep(1) - with self.sizeLock: - self.curSize += len(item[1]) - Queue.Queue.put(self, item, block, timeout) - - def get(self, block = True, timeout = None): - item = Queue.Queue.get(self, block, timeout) - with self.sizeLock: - self.curSize -= len(item[1]) - return item diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index fc53a5b0..9ffc1607 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -1,5 +1,5 @@ """ -The singleCleaner class is a timer-driven thread that cleans data structures +The `singleCleaner` class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. @@ -31,8 +31,7 @@ import tr from bmconfigparser import BMConfigParser from helper_sql import sqlQuery, sqlExecute from inventory import Inventory -from network.connectionpool import BMConnectionPool -from network.threads import StoppableThread +from network import BMConnectionPool, StoppableThread class singleCleaner(StoppableThread): @@ -46,12 +45,12 @@ class singleCleaner(StoppableThread): try: shared.maximumLengthOfTimeToBotherResendingMessages = ( float(BMConfigParser().get( - 'bitmessagesettings', 'stopresendingafterxdays')) * - 24 * 60 * 60 + 'bitmessagesettings', 'stopresendingafterxdays')) + * 24 * 60 * 60 ) + ( float(BMConfigParser().get( - 'bitmessagesettings', 'stopresendingafterxmonths')) * - (60 * 60 * 24 * 365) / 12) + 'bitmessagesettings', 'stopresendingafterxmonths')) + * (60 * 60 * 24 * 365) / 12) except: # Either the user hasn't set stopresendingafterxdays and # stopresendingafterxmonths yet or the options are missing @@ -93,8 +92,8 @@ class singleCleaner(StoppableThread): "SELECT toaddress, ackdata, status FROM sent" " WHERE ((status='awaitingpubkey' OR status='msgsent')" " AND folder='sent' AND sleeptill?)", - int(time.time()), int(time.time()) - - shared.maximumLengthOfTimeToBotherResendingMessages + int(time.time()), int(time.time()) + - shared.maximumLengthOfTimeToBotherResendingMessages ) for row in queryreturn: if len(row) < 2: @@ -140,9 +139,7 @@ class singleCleaner(StoppableThread): # thread.downloadQueue.clear() # inv/object tracking - for connection in \ - BMConnectionPool().inboundConnections.values() + \ - BMConnectionPool().outboundConnections.values(): + for connection in BMConnectionPool().connections(): connection.clean() # discovery tracking diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 77fa18c0..60eabe2e 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -28,7 +28,7 @@ from addresses import calculateInventoryHash, decodeAddress, decodeVarint, encod from bmconfigparser import BMConfigParser from helper_sql import sqlExecute, sqlQuery from inventory import Inventory -from network.threads import StoppableThread +from network import StoppableThread def sizeof_fmt(num, suffix='h/s'): diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index a45571e0..bcb56303 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -1,3 +1,7 @@ +""" +sqlThread is defined here +""" + import threading from bmconfigparser import BMConfigParser import sqlite3 @@ -19,11 +23,13 @@ import tr class sqlThread(threading.Thread): + """A thread for all SQL operations""" def __init__(self): threading.Thread.__init__(self, name="SQL") def run(self): + """Process SQL queries from `.helper_sql.sqlSubmitQueue`""" self.conn = sqlite3.connect(state.appdata + 'messages.dat') self.conn.text_factory = str self.cur = self.conn.cursor() diff --git a/src/defaults.py b/src/defaults.py index d10f9000..32162b56 100644 --- a/src/defaults.py +++ b/src/defaults.py @@ -1,24 +1,24 @@ """ -src/defaults.py -=============== +Common default values """ -# sanity check, prevent doing ridiculous PoW -# 20 million PoWs equals approximately 2 days on dev's dual R9 290 +#: sanity check, prevent doing ridiculous PoW +#: 20 million PoWs equals approximately 2 days on dev's dual R9 290 ridiculousDifficulty = 20000000 -# Remember here the RPC port read from namecoin.conf so we can restore to -# it as default whenever the user changes the "method" selection for -# namecoin integration to "namecoind". +#: Remember here the RPC port read from namecoin.conf so we can restore to +#: it as default whenever the user changes the "method" selection for +#: namecoin integration to "namecoind". namecoinDefaultRpcPort = "8336" # If changed, these values will cause particularly unexpected behavior: # You won't be able to either send or receive messages because the proof # of work you do (or demand) won't match that done or demanded by others. # Don't change them! -# The amount of work that should be performed (and demanded) per byte of the payload. +#: The amount of work that should be performed (and demanded) per byte +#: of the payload. networkDefaultProofOfWorkNonceTrialsPerByte = 1000 -# To make sending short messages a little more difficult, this value is -# added to the payload length for use in calculating the proof of work -# target. +#: To make sending short messages a little more difficult, this value is +#: added to the payload length for use in calculating the proof of work +#: target. networkDefaultPayloadLengthExtraBytes = 1000 diff --git a/src/helper_sql.py b/src/helper_sql.py index 2b558f62..138a9f50 100644 --- a/src/helper_sql.py +++ b/src/helper_sql.py @@ -1,17 +1,39 @@ -"""Helper Sql performs sql operations.""" +""" +SQL-related functions defined here are really pass the queries (or other SQL +commands) to :class:`.threads.sqlThread` through `sqlSubmitQueue` queue and check +or return the result got from `sqlReturnQueue`. + +This is done that way because :mod:`sqlite3` is so thread-unsafe that they +won't even let you call it from different threads using your own locks. +SQLite objects can only be used from one thread. + +.. note:: This actually only applies for certain deployments, and/or + really old version of sqlite. I haven't actually seen it anywhere. + Current versions do have support for threading and multiprocessing. + I don't see an urgent reason to refactor this, but it should be noted + in the comment that the problem is mostly not valid. Sadly, last time + I checked, there is no reliable way to check whether the library is + or isn't thread-safe. +""" import threading import Queue sqlSubmitQueue = Queue.Queue() -# SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. -# SQL objects #can only be called from one thread. +"""the queue for SQL""" sqlReturnQueue = Queue.Queue() +"""the queue for results""" sqlLock = threading.Lock() def sqlQuery(sqlStatement, *args): - """SQLLITE execute statement and return query.""" + """ + Query sqlite and return results + + :param str sqlStatement: SQL statement string + :param list args: SQL query parameters + :rtype: list + """ sqlLock.acquire() sqlSubmitQueue.put(sqlStatement) diff --git a/src/helper_startup.py b/src/helper_startup.py index 1a1119f5..9aaad5ef 100644 --- a/src/helper_startup.py +++ b/src/helper_startup.py @@ -1,13 +1,9 @@ """ -src/helper_startup.py -===================== - -Helper Start performs all the startup operations. +Startup operations. """ # pylint: disable=too-many-branches,too-many-statements from __future__ import print_function -import ConfigParser import os import platform import sys @@ -19,28 +15,12 @@ import paths import state from bmconfigparser import BMConfigParser + # The user may de-select Portable Mode in the settings if they want # the config files to stay in the application data folder. StoreConfigFilesInSameDirectoryAsProgramByDefault = False -def _loadTrustedPeer(): - try: - trustedPeer = BMConfigParser().get('bitmessagesettings', 'trustedpeer') - except ConfigParser.Error: - # This probably means the trusted peer wasn't specified so we - # can just leave it as None - return - try: - host, port = trustedPeer.split(':') - except ValueError: - sys.exit( - 'Bad trustedpeer config setting! It should be set as' - ' trustedpeer=:' - ) - state.trustedPeer = state.Peer(host, int(port)) - - def loadConfig(): """Load the config""" config = BMConfigParser() @@ -134,8 +114,6 @@ def loadConfig(): else: updateConfig() - _loadTrustedPeer() - def updateConfig(): """Save the config""" diff --git a/src/helper_threading.py b/src/helper_threading.py deleted file mode 100644 index 56dd7063..00000000 --- a/src/helper_threading.py +++ /dev/null @@ -1,21 +0,0 @@ -"""set_thread_name for threads that don't use StoppableThread""" - -import threading - -try: - import prctl -except ImportError: - def set_thread_name(name): - """Set the thread name for external use (visible from the OS).""" - threading.current_thread().name = name -else: - def set_thread_name(name): - """Set a name for the thread for python internal use.""" - prctl.set_name(name) - - def _thread_name_hack(self): - set_thread_name(self.name) - threading.Thread.__bootstrap_original__(self) - # pylint: disable=protected-access - threading.Thread.__bootstrap_original__ = threading.Thread._Thread__bootstrap - threading.Thread._Thread__bootstrap = _thread_name_hack diff --git a/src/knownnodes.py b/src/knownnodes.py index 1d9e6897..bb588fcb 100644 --- a/src/knownnodes.py +++ b/src/knownnodes.py @@ -3,6 +3,7 @@ Manipulations with knownNodes dictionary. """ import json +import logging import os import pickle import threading @@ -10,28 +11,33 @@ import time import state from bmconfigparser import BMConfigParser -from debug import logger +from network.node import Peer knownNodesLock = threading.Lock() +"""Thread lock for knownnodes modification""" knownNodes = {stream: {} for stream in range(1, 4)} +"""The dict of known nodes for each stream""" knownNodesTrimAmount = 2000 +"""trim stream knownnodes dict to this length""" -# forget a node after rating is this low knownNodesForgetRating = -0.5 +"""forget a node after rating is this low""" knownNodesActual = False +logger = logging.getLogger('default') + DEFAULT_NODES = ( - state.Peer('5.45.99.75', 8444), - state.Peer('75.167.159.54', 8444), - state.Peer('95.165.168.168', 8444), - state.Peer('85.180.139.241', 8444), - state.Peer('158.222.217.190', 8080), - state.Peer('178.62.12.187', 8448), - state.Peer('24.188.198.204', 8111), - state.Peer('109.147.204.113', 1195), - state.Peer('178.11.46.221', 8444) + Peer('5.45.99.75', 8444), + Peer('75.167.159.54', 8444), + Peer('95.165.168.168', 8444), + Peer('85.180.139.241', 8444), + Peer('158.222.217.190', 8080), + Peer('178.62.12.187', 8448), + Peer('24.188.198.204', 8111), + Peer('109.147.204.113', 1195), + Peer('178.11.46.221', 8444) ) @@ -57,19 +63,17 @@ def json_deserialize_knownnodes(source): for node in json.load(source): peer = node['peer'] info = node['info'] - peer = state.Peer(str(peer['host']), peer.get('port', 8444)) + peer = Peer(str(peer['host']), peer.get('port', 8444)) knownNodes[node['stream']][peer] = info - if ( - not (knownNodesActual or info.get('self')) and - peer not in DEFAULT_NODES - ): + if not (knownNodesActual + or info.get('self')) and peer not in DEFAULT_NODES: knownNodesActual = True def pickle_deserialize_old_knownnodes(source): """ - Unpickle source and reorganize knownnodes dict if it's in old format + Unpickle source and reorganize knownnodes dict if it has old format the old format was {Peer:lastseen, ...} the new format is {Peer:{"lastseen":i, "rating":f}} """ @@ -129,7 +133,7 @@ def readKnownNodes(): if onionhostname and ".onion" in onionhostname: onionport = config.safeGetInt('bitmessagesettings', 'onionport') if onionport: - self_peer = state.Peer(onionhostname, onionport) + self_peer = Peer(onionhostname, onionport) addKnownNode(1, self_peer, is_self=True) state.ownAddresses[self_peer] = True @@ -182,7 +186,7 @@ def dns(): """Add DNS names to knownnodes""" for port in [8080, 8444]: addKnownNode( - 1, state.Peer('bootstrap%s.bitmessage.org' % port, port)) + 1, Peer('bootstrap%s.bitmessage.org' % port, port)) def cleanupKnownNodes(): @@ -208,8 +212,8 @@ def cleanupKnownNodes(): del knownNodes[stream][node] continue # scrap old nodes (age > 3 hours) with low rating - if (age > 10800 and knownNodes[stream][node]["rating"] <= - knownNodesForgetRating): + if (age > 10800 and knownNodes[stream][node]["rating"] + <= knownNodesForgetRating): needToWriteKnownNodesToDisk = True del knownNodes[stream][node] continue diff --git a/src/network/__init__.py b/src/network/__init__.py index e69de29b..51c4c4da 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -0,0 +1,17 @@ +from addrthread import AddrThread +from announcethread import AnnounceThread +from connectionpool import BMConnectionPool +from dandelion import Dandelion +from downloadthread import DownloadThread +from invthread import InvThread +from networkthread import BMNetworkThread +from receivequeuethread import ReceiveQueueThread +from threads import StoppableThread +from uploadthread import UploadThread + + +__all__ = [ + "BMConnectionPool", "Dandelion", + "AddrThread", "AnnounceThread", "BMNetworkThread", "DownloadThread", + "InvThread", "ReceiveQueueThread", "UploadThread", "StoppableThread" +] diff --git a/src/network/announcethread.py b/src/network/announcethread.py index 5cd27ede..f635fc90 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -10,6 +10,7 @@ from bmconfigparser import BMConfigParser from network.bmproto import BMProto from network.connectionpool import BMConnectionPool from network.udp import UDPSocket +from node import Peer from threads import StoppableThread @@ -36,6 +37,8 @@ class AnnounceThread(StoppableThread): for stream in state.streamsInWhichIAmParticipating: addr = ( stream, - state.Peer('127.0.0.1', BMConfigParser().safeGetInt("bitmessagesettings", "port")), + Peer( + '127.0.0.1', + BMConfigParser().safeGetInt('bitmessagesettings', 'port')), time.time()) connection.append_write_buf(BMProto.assembleAddr([addr])) diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 6375f393..bf0b5742 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -24,8 +24,8 @@ from network.bmobject import ( BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError) -from network.node import Node from network.proxy import ProxyError +from node import Node, Peer from objectracker import missingObjects, ObjectTracker from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue from randomtrackingdict import RandomTrackingDict @@ -443,7 +443,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): seenTime > time.time() - BMProto.addressAlive and port > 0 ): - peer = state.Peer(decodedIP, port) + peer = Peer(decodedIP, port) try: if knownnodes.knownNodes[stream][peer]["lastseen"] > seenTime: continue @@ -464,7 +464,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def bm_command_portcheck(self): """Incoming port check request, queue it.""" - portCheckerQueue.put(state.Peer(self.destination, self.peerNode.port)) + portCheckerQueue.put(Peer(self.destination, self.peerNode.port)) return True def bm_command_ping(self): @@ -594,12 +594,14 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # incoming from a peer we're connected to as outbound, # or server full report the same error to counter deanonymisation if ( - state.Peer(self.destination.host, self.peerNode.port) in - connectionpool.BMConnectionPool().inboundConnections or - len(connectionpool.BMConnectionPool().inboundConnections) + - len(connectionpool.BMConnectionPool().outboundConnections) > - BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections") + - BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections") + Peer(self.destination.host, self.peerNode.port) + in connectionpool.BMConnectionPool().inboundConnections + or len(connectionpool.BMConnectionPool().inboundConnections) + + len(connectionpool.BMConnectionPool().outboundConnections) + > BMConfigParser().safeGetInt( + 'bitmessagesettings', 'maxtotalconnections') + + BMConfigParser().safeGetInt( + 'bitmessagesettings', 'maxbootstrapconnections') ): self.append_write_buf(protocol.assembleErrorMessage( errorText="Server full, please try again later.", fatal=2)) @@ -622,7 +624,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): @staticmethod def assembleAddr(peerList): """Build up a packed address""" - if isinstance(peerList, state.Peer): + if isinstance(peerList, Peer): peerList = (peerList) if not peerList: return b'' @@ -645,10 +647,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): @staticmethod def stopDownloadingObject(hashId, forwardAnyway=False): """Stop downloading an object""" - for connection in ( - connectionpool.BMConnectionPool().inboundConnections.values() + - connectionpool.BMConnectionPool().outboundConnections.values() - ): + for connection in connectionpool.BMConnectionPool().connections(): try: del connection.objectsNewToMe[hashId] except KeyError: @@ -689,7 +688,7 @@ class BMStringParser(BMProto): """ def __init__(self): super(BMStringParser, self).__init__() - self.destination = state.Peer('127.0.0.1', 8444) + self.destination = Peer('127.0.0.1', 8444) self.payload = None ObjectTracker.__init__(self) diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index 838ca45d..9d2f85d6 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -28,8 +28,6 @@ def chooseConnection(stream): "bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS' onionOnly = BMConfigParser().safeGetBoolean( "bitmessagesettings", "onionservicesonly") - if state.trustedPeer: - return state.trustedPeer try: retval = portCheckerQueue.get(False) portCheckerQueue.task_done() diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 1267522a..654b74a1 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -1,11 +1,11 @@ """ -src/network/connectionpool.py -================================== +`BMConnectionPool` class definition """ import errno import logging import re import socket +import sys import time import asyncore_pollchoose as asyncore @@ -15,6 +15,7 @@ import protocol import state from bmconfigparser import BMConfigParser from connectionchooser import chooseConnection +from node import Peer from proxy import Proxy from singleton import Singleton from tcp import ( @@ -26,9 +27,23 @@ logger = logging.getLogger('default') @Singleton -# pylint: disable=too-many-instance-attributes class BMConnectionPool(object): """Pool of all existing connections""" + # pylint: disable=too-many-instance-attributes + + trustedPeer = None + """ + If the trustedpeer option is specified in keys.dat then this will + contain a Peer which will be connected to instead of using the + addresses advertised by other peers. + + The expected use case is where the user has a trusted server where + they run a Bitmessage daemon permanently. If they then run a second + instance of the client on a local machine periodically when they want + to check for messages it will sync with the network a lot faster + without compromising security. + """ + def __init__(self): asyncore.set_rates( BMConfigParser().safeGetInt( @@ -41,9 +56,33 @@ class BMConnectionPool(object): self.listeningSockets = {} self.udpSockets = {} self.streams = [] - self.lastSpawned = 0 - self.spawnWait = 2 - self.bootstrapped = False + self._lastSpawned = 0 + self._spawnWait = 2 + self._bootstrapped = False + + trustedPeer = BMConfigParser().safeGet( + 'bitmessagesettings', 'trustedpeer') + try: + if trustedPeer: + host, port = trustedPeer.split(':') + self.trustedPeer = Peer(host, int(port)) + except ValueError: + sys.exit( + 'Bad trustedpeer config setting! It should be set as' + ' trustedpeer=:' + ) + + def connections(self): + """ + Shortcut for combined list of connections from + `inboundConnections` and `outboundConnections` dicts + """ + return self.inboundConnections.values() + self.outboundConnections.values() + + def establishedConnections(self): + """Shortcut for list of connections having fullyEstablished == True""" + return [ + x for x in self.connections() if x.fullyEstablished] def connectToStream(self, streamNumber): """Connect to a bitmessage stream""" @@ -74,10 +113,7 @@ class BMConnectionPool(object): def isAlreadyConnected(self, nodeid): """Check if we're already connected to this peer""" - for i in ( - self.inboundConnections.values() + - self.outboundConnections.values() - ): + for i in self.connections(): try: if nodeid == i.nodeid: return True @@ -103,7 +139,7 @@ class BMConnectionPool(object): if isinstance(connection, UDPSocket): del self.udpSockets[connection.listening.host] elif isinstance(connection, TCPServer): - del self.listeningSockets[state.Peer( + del self.listeningSockets[Peer( connection.destination.host, connection.destination.port)] elif connection.isOutbound: try: @@ -129,10 +165,11 @@ class BMConnectionPool(object): "bitmessagesettings", "onionbindip") else: host = '127.0.0.1' - if (BMConfigParser().safeGetBoolean( - "bitmessagesettings", "sockslisten") or - BMConfigParser().safeGet( - "bitmessagesettings", "socksproxytype") == "none"): + if ( + BMConfigParser().safeGetBoolean("bitmessagesettings", "sockslisten") + or BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") + == "none" + ): # python doesn't like bind + INADDR_ANY? # host = socket.INADDR_ANY host = BMConfigParser().get("network", "bind") @@ -205,11 +242,13 @@ class BMConnectionPool(object): 'bitmessagesettings', 'socksproxytype', '') onionsocksproxytype = BMConfigParser().safeGet( 'bitmessagesettings', 'onionsocksproxytype', '') - if (socksproxytype[:5] == 'SOCKS' and - not BMConfigParser().safeGetBoolean( - 'bitmessagesettings', 'sockslisten') and - '.onion' not in BMConfigParser().safeGet( - 'bitmessagesettings', 'onionhostname', '')): + if ( + socksproxytype[:5] == 'SOCKS' + and not BMConfigParser().safeGetBoolean( + 'bitmessagesettings', 'sockslisten') + and '.onion' not in BMConfigParser().safeGet( + 'bitmessagesettings', 'onionhostname', '') + ): acceptConnections = False # pylint: disable=too-many-nested-blocks @@ -217,8 +256,8 @@ class BMConnectionPool(object): if not knownnodes.knownNodesActual: self.startBootstrappers() knownnodes.knownNodesActual = True - if not self.bootstrapped: - self.bootstrapped = True + if not self._bootstrapped: + self._bootstrapped = True Proxy.proxy = ( BMConfigParser().safeGet( 'bitmessagesettings', 'sockshostname'), @@ -247,7 +286,7 @@ class BMConnectionPool(object): for i in range( state.maximumNumberOfHalfOpenConnections - pending): try: - chosen = chooseConnection( + chosen = self.trustedPeer or chooseConnection( helper_random.randomchoice(self.streams)) except ValueError: continue @@ -260,8 +299,7 @@ class BMConnectionPool(object): continue try: - if (chosen.host.endswith(".onion") and - Proxy.onion_proxy is not None): + if chosen.host.endswith(".onion") and Proxy.onion_proxy: if onionsocksproxytype == "SOCKS5": self.addConnection(Socks5BMConnection(chosen)) elif onionsocksproxytype == "SOCKS4a": @@ -276,12 +314,9 @@ class BMConnectionPool(object): if e.errno == errno.ENETUNREACH: continue - self.lastSpawned = time.time() + self._lastSpawned = time.time() else: - for i in ( - self.inboundConnections.values() + - self.outboundConnections.values() - ): + for i in self.connections(): # FIXME: rating will be increased after next connection i.handle_close() @@ -291,8 +326,8 @@ class BMConnectionPool(object): self.startListening() else: for bind in re.sub( - '[^\w.]+', ' ', # pylint: disable=anomalous-backslash-in-string - BMConfigParser().safeGet('network', 'bind') + r'[^\w.]+', ' ', + BMConfigParser().safeGet('network', 'bind') ).split(): self.startListening(bind) logger.info('Listening for incoming connections.') @@ -301,8 +336,8 @@ class BMConnectionPool(object): self.startUDPSocket() else: for bind in re.sub( - '[^\w.]+', ' ', # pylint: disable=anomalous-backslash-in-string - BMConfigParser().safeGet('network', 'bind') + r'[^\w.]+', ' ', + BMConfigParser().safeGet('network', 'bind') ).split(): self.startUDPSocket(bind) self.startUDPSocket(False) @@ -319,16 +354,13 @@ class BMConnectionPool(object): i.accepting = i.connecting = i.connected = False logger.info('Stopped udp sockets.') - loopTime = float(self.spawnWait) - if self.lastSpawned < time.time() - self.spawnWait: + loopTime = float(self._spawnWait) + if self._lastSpawned < time.time() - self._spawnWait: loopTime = 2.0 asyncore.loop(timeout=loopTime, count=1000) reaper = [] - for i in ( - self.inboundConnections.values() + - self.outboundConnections.values() - ): + for i in self.connections(): minTx = time.time() - 20 if i.fullyEstablished: minTx -= 300 - 20 @@ -340,10 +372,8 @@ class BMConnectionPool(object): time.time() - i.lastTx) i.set_state("close") for i in ( - self.inboundConnections.values() + - self.outboundConnections.values() + - self.listeningSockets.values() + - self.udpSockets.values() + self.connections() + + self.listeningSockets.values() + self.udpSockets.values() ): if not (i.accepting or i.connecting or i.connected): reaper.append(i) diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 472b32c0..e882f6de 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -1,6 +1,5 @@ """ -src/network/downloadthread.py -============================= +`DownloadThread` class definition """ import time @@ -29,7 +28,7 @@ class DownloadThread(StoppableThread): def cleanPending(self): """Expire pending downloads eventually""" - deadline = time.time() - DownloadThread.requestExpires + deadline = time.time() - self.requestExpires try: toDelete = [k for k, v in missingObjects.iteritems() if v < deadline] except RuntimeError: @@ -43,15 +42,12 @@ class DownloadThread(StoppableThread): while not self._stopped: requested = 0 # Choose downloading peers randomly - connections = [ - x for x in - BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values() - if x.fullyEstablished] + connections = BMConnectionPool().establishedConnections() helper_random.randomshuffle(connections) - try: - requestChunk = max(int(min(DownloadThread.maxRequestChunk, len(missingObjects)) / len(connections)), 1) - except ZeroDivisionError: - requestChunk = 1 + requestChunk = max(int( + min(self.maxRequestChunk, len(missingObjects)) + / len(connections)), 1) if connections else 1 + for i in connections: now = time.time() # avoid unnecessary delay @@ -81,7 +77,7 @@ class DownloadThread(StoppableThread): '%s:%i Requesting %i objects', i.destination.host, i.destination.port, chunkCount) requested += chunkCount - if time.time() >= self.lastCleaned + DownloadThread.cleanInterval: + if time.time() >= self.lastCleaned + self.cleanInterval: self.cleanPending() if not requested: self.stop.wait(1) diff --git a/src/network/invthread.py b/src/network/invthread.py index bffa6ecb..d5690486 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -20,9 +20,7 @@ def handleExpiredDandelion(expired): the object""" if not expired: return - for i in \ - BMConnectionPool().inboundConnections.values() + \ - BMConnectionPool().outboundConnections.values(): + for i in BMConnectionPool().connections(): if not i.fullyEstablished: continue for x in expired: @@ -44,9 +42,7 @@ class InvThread(StoppableThread): def handleLocallyGenerated(stream, hashId): """Locally generated inventory items require special handling""" Dandelion().addHash(hashId, stream=stream) - for connection in \ - BMConnectionPool().inboundConnections.values() + \ - BMConnectionPool().outboundConnections.values(): + for connection in BMConnectionPool().connections(): if state.dandelion and connection != Dandelion().objectChildStem(hashId): continue connection.objectsNewToThem[hashId] = time() @@ -67,8 +63,7 @@ class InvThread(StoppableThread): break if chunk: - for connection in BMConnectionPool().inboundConnections.values() + \ - BMConnectionPool().outboundConnections.values(): + for connection in BMConnectionPool().connections(): fluffs = [] stems = [] for inv in chunk: @@ -96,13 +91,13 @@ class InvThread(StoppableThread): if fluffs: random.shuffle(fluffs) connection.append_write_buf(protocol.CreatePacket( - 'inv', addresses.encodeVarint(len(fluffs)) + - "".join(fluffs))) + 'inv', + addresses.encodeVarint(len(fluffs)) + ''.join(fluffs))) if stems: random.shuffle(stems) connection.append_write_buf(protocol.CreatePacket( - 'dinv', addresses.encodeVarint(len(stems)) + - "".join(stems))) + 'dinv', + addresses.encodeVarint(len(stems)) + ''.join(stems))) invQueue.iterate() for i in range(len(chunk)): diff --git a/src/network/node.py b/src/network/node.py index 0bfda653..4c532b81 100644 --- a/src/network/node.py +++ b/src/network/node.py @@ -1,7 +1,7 @@ """ -src/network/node.py -=================== +Named tuples representing the network peers """ import collections +Peer = collections.namedtuple('Peer', ['host', 'port']) Node = collections.namedtuple('Node', ['services', 'host', 'port']) diff --git a/src/network/objectracker.py b/src/network/objectracker.py index a8e3292a..b97aee46 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -95,8 +95,7 @@ class ObjectTracker(object): def handleReceivedObject(self, streamNumber, hashid): """Handling received object""" - for i in network.connectionpool.BMConnectionPool().inboundConnections.values( - ) + network.connectionpool.BMConnectionPool().outboundConnections.values(): + for i in network.connectionpool.BMConnectionPool().connections(): if not i.fullyEstablished: continue try: diff --git a/src/network/proxy.py b/src/network/proxy.py index e65ac6a7..e0bb5e78 100644 --- a/src/network/proxy.py +++ b/src/network/proxy.py @@ -8,9 +8,9 @@ import socket import time import asyncore_pollchoose as asyncore -import state from advanceddispatcher import AdvancedDispatcher from bmconfigparser import BMConfigParser +from node import Peer logger = logging.getLogger('default') @@ -90,9 +90,10 @@ class Proxy(AdvancedDispatcher): def onion_proxy(self, address): """Set onion proxy address""" if address is not None and ( - not isinstance(address, tuple) or len(address) < 2 or - not isinstance(address[0], str) or - not isinstance(address[1], int)): + not isinstance(address, tuple) or len(address) < 2 + or not isinstance(address[0], str) + or not isinstance(address[1], int) + ): raise ValueError self.__class__._onion_proxy = address @@ -107,7 +108,7 @@ class Proxy(AdvancedDispatcher): self.__class__._onion_auth = authTuple def __init__(self, address): - if not isinstance(address, state.Peer): + if not isinstance(address, Peer): raise ValueError AdvancedDispatcher.__init__(self) self.destination = address diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 13c12ce2..cd904065 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -32,14 +32,12 @@ class ReceiveQueueThread(StoppableThread): try: connection = BMConnectionPool().getConnectionByAddr(dest) - # KeyError = connection object not found - except KeyError: + except KeyError: # connection object not found receiveDataQueue.task_done() continue try: connection.process() - # UnknownStateError = state isn't implemented - except UnknownStateError: + except UnknownStateError: # state isn't implemented pass except socket.error as err: if err.errno == errno.EBADF: diff --git a/src/network/socks5.py b/src/network/socks5.py index e0cb7202..f0241744 100644 --- a/src/network/socks5.py +++ b/src/network/socks5.py @@ -8,7 +8,7 @@ src/network/socks5.py import socket import struct -import state +from node import Peer from proxy import GeneralProxyError, Proxy, ProxyError @@ -200,7 +200,7 @@ class Socks5Resolver(Socks5): def __init__(self, host): self.host = host self.port = 8444 - Socks5.__init__(self, address=state.Peer(self.host, self.port)) + Socks5.__init__(self, address=Peer(self.host, self.port)) def state_auth_done(self): """Perform resolving""" diff --git a/src/network/stats.py b/src/network/stats.py index fedfbbc1..d760ace2 100644 --- a/src/network/stats.py +++ b/src/network/stats.py @@ -19,16 +19,7 @@ currentSentSpeed = 0 def connectedHostsList(): """List of all the connected hosts""" - retval = [] - for i in BMConnectionPool().inboundConnections.values() + \ - BMConnectionPool().outboundConnections.values(): - if not i.fullyEstablished: - continue - try: - retval.append(i) - except AttributeError: - pass - return retval + return BMConnectionPool().establishedConnections() def sentBytes(): @@ -71,12 +62,6 @@ def downloadSpeed(): def pendingDownload(): """Getting pending downloads""" return len(missingObjects) - # tmp = {} - # for connection in BMConnectionPool().inboundConnections.values() + \ - # BMConnectionPool().outboundConnections.values(): - # for k in connection.objectsNewToMe.keys(): - # tmp[k] = True - # return len(tmp) def pendingUpload(): diff --git a/src/network/tcp.py b/src/network/tcp.py index a1691ceb..97b00784 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -28,6 +28,7 @@ from network.objectracker import ObjectTracker from network.socks4a import Socks4aConnection from network.socks5 import Socks5Connection from network.tls import TLSDispatcher +from node import Peer from queues import UISignalQueue, invQueue, receiveDataQueue logger = logging.getLogger('default') @@ -49,7 +50,7 @@ class TCPConnection(BMProto, TLSDispatcher): self.connectedAt = 0 self.skipUntil = 0 if address is None and sock is not None: - self.destination = state.Peer(*sock.getpeername()) + self.destination = Peer(*sock.getpeername()) self.isOutbound = False TLSDispatcher.__init__(self, sock, server_side=True) self.connectedAt = time.time() @@ -334,7 +335,7 @@ def bootstrap(connection_class): _connection_base = connection_class def __init__(self, host, port): - self._connection_base.__init__(self, state.Peer(host, port)) + self._connection_base.__init__(self, Peer(host, port)) self.close_reason = self._succeed = False def bm_command_addr(self): @@ -384,7 +385,7 @@ class TCPServer(AdvancedDispatcher): 'bitmessagesettings', 'port', str(port)) BMConfigParser().save() break - self.destination = state.Peer(host, port) + self.destination = Peer(host, port) self.bound = True self.listen(5) @@ -402,7 +403,7 @@ class TCPServer(AdvancedDispatcher): except (TypeError, IndexError): return - state.ownAddresses[state.Peer(*sock.getsockname())] = True + state.ownAddresses[Peer(*sock.getsockname())] = True if ( len(connectionpool.BMConnectionPool().inboundConnections) + len(connectionpool.BMConnectionPool().outboundConnections) > diff --git a/src/network/udp.py b/src/network/udp.py index 97c6aee5..cf694567 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -9,6 +9,7 @@ import socket import state import protocol from bmproto import BMProto +from node import Peer from objectracker import ObjectTracker from queues import receiveDataQueue @@ -43,8 +44,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes else: self.socket = sock self.set_socket_reuse() - self.listening = state.Peer(*self.socket.getsockname()) - self.destination = state.Peer(*self.socket.getsockname()) + self.listening = Peer(*self.socket.getsockname()) + self.destination = Peer(*self.socket.getsockname()) ObjectTracker.__init__(self) self.connecting = False self.connected = True @@ -96,7 +97,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes self.destination.host, self.destination.port, remoteport) if self.local: state.discoveredPeers[ - state.Peer(self.destination.host, remoteport) + Peer(self.destination.host, remoteport) ] = time.time() return True @@ -131,7 +132,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes logger.error("socket error: %s", e) return - self.destination = state.Peer(*addr) + self.destination = Peer(*addr) encodedAddr = protocol.encodeHost(addr[0]) self.local = bool(protocol.checkIPAddress(encodedAddr, True)) # overwrite the old buffer to avoid mixing data and so that diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index 1b57bd9a..7d80d789 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -1,5 +1,5 @@ """ -src/network/uploadthread.py +`UploadThread` class definition """ import time @@ -22,19 +22,19 @@ class UploadThread(StoppableThread): def run(self): while not self._stopped: uploaded = 0 - # Choose downloading peers randomly - connections = [x for x in BMConnectionPool().inboundConnections.values() + - BMConnectionPool().outboundConnections.values() if x.fullyEstablished] + # Choose uploading peers randomly + connections = BMConnectionPool().establishedConnections() helper_random.randomshuffle(connections) for i in connections: now = time.time() # avoid unnecessary delay if i.skipUntil >= now: continue - if len(i.write_buf) > UploadThread.maxBufSize: + if len(i.write_buf) > self.maxBufSize: continue try: - request = i.pendingUpload.randomKeys(RandomTrackingDict.maxPending) + request = i.pendingUpload.randomKeys( + RandomTrackingDict.maxPending) except KeyError: continue payload = bytearray() diff --git a/src/queues.py b/src/queues.py index 7b6bbade..7d9e284a 100644 --- a/src/queues.py +++ b/src/queues.py @@ -1,20 +1,51 @@ -import Queue +"""Most of the queues used by bitmessage threads are defined here.""" + +import Queue +import threading +import time -from class_objectProcessorQueue import ObjectProcessorQueue from multiqueue import MultiQueue + +class ObjectProcessorQueue(Queue.Queue): + """Special queue class using lock for `.threads.objectProcessor`""" + + maxSize = 32000000 + + def __init__(self): + Queue.Queue.__init__(self) + self.sizeLock = threading.Lock() + #: in Bytes. We maintain this to prevent nodes from flooding us + #: with objects which take up too much memory. If this gets + #: too big we'll sleep before asking for further objects. + self.curSize = 0 + + def put(self, item, block=True, timeout=None): + while self.curSize >= self.maxSize: + time.sleep(1) + with self.sizeLock: + self.curSize += len(item[1]) + Queue.Queue.put(self, item, block, timeout) + + def get(self, block=True, timeout=None): + item = Queue.Queue.get(self, block, timeout) + with self.sizeLock: + self.curSize -= len(item[1]) + return item + + workerQueue = Queue.Queue() UISignalQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue() -# receiveDataThreads dump objects they hear on the network into this -# queue to be processed. +#: `.network.ReceiveQueueThread` instances dump objects they hear +#: on the network into this queue to be processed. objectProcessorQueue = ObjectProcessorQueue() invQueue = MultiQueue() addrQueue = MultiQueue() portCheckerQueue = Queue.Queue() receiveDataQueue = Queue.Queue() -# The address generator thread uses this queue to get information back -# to the API thread. +#: The address generator thread uses this queue to get information back +#: to the API thread. apiAddressGeneratorReturnQueue = Queue.Queue() -# Exceptions +#: for exceptions excQueue = Queue.Queue() diff --git a/src/shutdown.py b/src/shutdown.py index 1d40a90f..c81a519a 100644 --- a/src/shutdown.py +++ b/src/shutdown.py @@ -10,7 +10,7 @@ from debug import logger from helper_sql import sqlQuery, sqlStoredProcedure from inventory import Inventory from knownnodes import saveKnownNodes -from network.threads import StoppableThread +from network import StoppableThread from queues import ( addressGeneratorQueue, objectProcessorQueue, UISignalQueue, workerQueue) diff --git a/src/state.py b/src/state.py index a3b930ab..f5526029 100644 --- a/src/state.py +++ b/src/state.py @@ -1,7 +1,6 @@ """ Global runtime variables. """ -import collections neededPubkeys = {} streamsInWhichIAmParticipating = [] @@ -47,24 +46,8 @@ uploadThread = None ownAddresses = {} -trustedPeer = None -""" - If the trustedpeer option is specified in keys.dat then this will - contain a Peer which will be connected to instead of using the - addresses advertised by other peers. The client will only connect to - this peer and the timing attack mitigation will be disabled in order - to download data faster. The expected use case is where the user has - a fast connection to a trusted server where they run a BitMessage - daemon permanently. If they then run a second instance of the client - on a local machine periodically when they want to check for messages - it will sync with the network a lot faster without compromising - security. -""" - discoveredPeers = {} -Peer = collections.namedtuple('Peer', ['host', 'port']) - dandelion = 0 testmode = False diff --git a/src/tests/core.py b/src/tests/core.py index 8d24a768..3871946d 100644 --- a/src/tests/core.py +++ b/src/tests/core.py @@ -17,6 +17,7 @@ from bmconfigparser import BMConfigParser from helper_msgcoding import MsgEncode, MsgDecode from network import asyncore_pollchoose as asyncore from network.connectionpool import BMConnectionPool +from network.node import Peer from network.tcp import Socks4aBMConnection, Socks5BMConnection, TCPConnection from queues import excQueue @@ -30,7 +31,7 @@ def pickle_knownnodes(): with open(knownnodes_file, 'wb') as dst: pickle.dump({ stream: { - state.Peer( + Peer( '%i.%i.%i.%i' % tuple([ random.randint(1, 255) for i in range(4)]), 8444): {'lastseen': now, 'rating': 0.1} @@ -90,7 +91,7 @@ class TestCore(unittest.TestCase): """initial fill script from network.tcp""" BMConfigParser().set('bitmessagesettings', 'dontconnect', 'true') try: - for peer in (state.Peer("127.0.0.1", 8448),): + for peer in (Peer("127.0.0.1", 8448),): direct = TCPConnection(peer) while asyncore.socket_map: print("loop, state = %s" % direct.state) @@ -147,7 +148,7 @@ class TestCore(unittest.TestCase): def _initiate_bootstrap(self): BMConfigParser().set('bitmessagesettings', 'dontconnect', 'true') self._outdate_knownnodes() - knownnodes.addKnownNode(1, state.Peer('127.0.0.1', 8444), is_self=True) + knownnodes.addKnownNode(1, Peer('127.0.0.1', 8444), is_self=True) knownnodes.cleanupKnownNodes() time.sleep(2) diff --git a/src/threads.py b/src/threads.py new file mode 100644 index 00000000..08d61196 --- /dev/null +++ b/src/threads.py @@ -0,0 +1,46 @@ +""" +PyBitmessage does various tasks in separate threads. Most of them inherit +from `.network.StoppableThread`. There are `addressGenerator` for +addresses generation, `objectProcessor` for processing the network objects +passed minimal validation, `singleCleaner` to periodically clean various +internal storages (like inventory and knownnodes) and do forced garbage +collection, `singleWorker` for doing PoW, `sqlThread` for querying sqlite +database. + +There are also other threads in the `.network` package. + +:func:`set_thread_name` is defined here for the threads that don't inherit from +:class:`.network.StoppableThread` +""" + +import threading + +try: + import prctl +except ImportError: + def set_thread_name(name): + """Set a name for the thread for python internal use.""" + threading.current_thread().name = name +else: + def set_thread_name(name): + """Set the thread name for external use (visible from the OS).""" + prctl.set_name(name) + + def _thread_name_hack(self): + set_thread_name(self.name) + threading.Thread.__bootstrap_original__(self) + # pylint: disable=protected-access + threading.Thread.__bootstrap_original__ = threading.Thread._Thread__bootstrap + threading.Thread._Thread__bootstrap = _thread_name_hack + +from class_addressGenerator import addressGenerator +from class_objectProcessor import objectProcessor +from class_singleCleaner import singleCleaner +from class_singleWorker import singleWorker +from class_sqlThread import sqlThread + + +__all__ = [ + "addressGenerator", "objectProcessor", "singleCleaner", "singleWorker", + "sqlThread" +] diff --git a/src/upnp.py b/src/upnp.py index b1ee2e7b..99000413 100644 --- a/src/upnp.py +++ b/src/upnp.py @@ -1,9 +1,6 @@ # pylint: disable=too-many-statements,too-many-branches,protected-access,no-self-use """ -src/upnp.py -=========== - -A simple upnp module to forward port for BitMessage +Complete UPnP port forwarding implementation in separate thread. Reference: http://mattscodecave.com/posts/using-python-and-upnp-to-forward-a-port """ @@ -21,8 +18,8 @@ import state import tr from bmconfigparser import BMConfigParser from debug import logger -from network.connectionpool import BMConnectionPool -from network.threads import StoppableThread +from network import BMConnectionPool, StoppableThread +from network.node import Peer def createRequestXML(service, action, arguments=None): @@ -263,7 +260,7 @@ class uPnPThread(StoppableThread): self.routers.append(newRouter) self.createPortMapping(newRouter) try: - self_peer = state.Peer( + self_peer = Peer( newRouter.GetExternalIPAddress(), self.extPort )