diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index e39679a3..a0615f28 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -21,6 +21,7 @@ app_dir = pathmagic.setup() import depends depends.check_dependencies() +import collections import getopt import multiprocessing # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully. @@ -30,6 +31,7 @@ import time import traceback import defaults +import protocol import shared import shutdown import state @@ -80,6 +82,9 @@ def signal_handler(signum, frame): ' because the UI captures the signal.') +App = collections.namedtuple('App', ['state', 'config', 'protocol']) + + class Main(object): """Main PyBitmessage class""" def start(self): @@ -186,13 +191,13 @@ class Main(object): shared.reloadBroadcastSendersForWhichImWatching() # Start the address generation thread - addressGeneratorThread = addressGenerator(state) + addressGeneratorThread = addressGenerator() # close the main program even if there are threads left addressGeneratorThread.daemon = True addressGeneratorThread.start() # Start the thread that calculates POWs - singleWorkerThread = singleWorker(state) + singleWorkerThread = singleWorker() # close the main program even if there are threads left singleWorkerThread.daemon = True singleWorkerThread.start() @@ -209,26 +214,26 @@ class Main(object): if daemon and config.safeGet( 'bitmessagesettings', 'smtpdeliver', '') != '': from class_smtpDeliver import smtpDeliver - smtpDeliveryThread = smtpDeliver(state) + smtpDeliveryThread = smtpDeliver() smtpDeliveryThread.start() # SMTP daemon thread if daemon and config.safeGetBoolean( 'bitmessagesettings', 'smtpd'): from class_smtpServer import smtpServer - smtpServerThread = smtpServer(state) + smtpServerThread = smtpServer() smtpServerThread.start() # API is also objproc dependent if config.safeGetBoolean('bitmessagesettings', 'apienabled'): - import api # pylint: disable=relative-import - singleAPIThread = api.singleAPI(state) + from api import singleAPI + singleAPIThread = singleAPI() # close the main program even if there are threads left singleAPIThread.daemon = True singleAPIThread.start() # Start the cleanerThread - singleCleanerThread = singleCleaner(state) + singleCleanerThread = singleCleaner() # close the main program even if there are threads left singleCleanerThread.daemon = True singleCleanerThread.start() @@ -236,7 +241,7 @@ class Main(object): # start network components if networking is enabled if state.enableNetwork: start_proxyconfig() - network.start(config, state) + network.start(App(state, config, protocol)) if config.safeGetBoolean('bitmessagesettings', 'upnp'): import upnp diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index 2356a1b9..f8aae8f8 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -9,6 +9,7 @@ import defaults import highlevelcrypto import queues import shared +import state import tr from addresses import decodeAddress, encodeAddress, encodeVarint from bmconfigparser import BMConfigParser @@ -44,7 +45,7 @@ class addressGenerator(StoppableThread): """ # pylint: disable=too-many-locals, too-many-branches # pylint: disable=protected-access, too-many-statements - while self.state.shutdown == 0: + while state.shutdown == 0: queueValue = queues.addressGeneratorQueue.get() nonceTrialsPerByte = 0 payloadLengthExtraBytes = 0 diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 68659a4e..fea842ea 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -23,6 +23,7 @@ import proofofwork import protocol import queues import shared +import state import tr from addresses import ( calculateInventoryHash, decodeAddress, decodeVarint, encodeVarint @@ -47,8 +48,8 @@ def sizeof_fmt(num, suffix='h/s'): class singleWorker(StoppableThread): """Thread for performing PoW""" - def __init__(self, state): - super(singleWorker, self).__init__(state, name="singleWorker") + def __init__(self): + super(singleWorker, self).__init__(name="singleWorker") proofofwork.init() def stopThread(self): @@ -63,9 +64,9 @@ class singleWorker(StoppableThread): def run(self): # pylint: disable=attribute-defined-outside-init - while not helper_sql.sql_ready.wait(1.0) and self.state.shutdown == 0: + while not helper_sql.sql_ready.wait(1.0) and state.shutdown == 0: self.stop.wait(1.0) - if self.state.shutdown > 0: + if state.shutdown > 0: return # Initialize the neededPubkeys dictionary. @@ -78,7 +79,7 @@ class singleWorker(StoppableThread): _, toAddressVersionNumber, toStreamNumber, toRipe = \ decodeAddress(toAddress) if toAddressVersionNumber <= 3: - self.state.neededPubkeys[toAddress] = 0 + state.neededPubkeys[toAddress] = 0 elif toAddressVersionNumber >= 4: doubleHashOfAddressData = hashlib.sha512(hashlib.sha512( encodeVarint(toAddressVersionNumber) @@ -89,7 +90,7 @@ class singleWorker(StoppableThread): tag = doubleHashOfAddressData[32:] # We'll need this for when we receive a pubkey reply: # it will be encrypted and we'll need to decrypt it. - self.state.neededPubkeys[tag] = ( + state.neededPubkeys[tag] = ( toAddress, highlevelcrypto.makeCryptor( hexlify(privEncryptionKey)) @@ -101,19 +102,19 @@ class singleWorker(StoppableThread): for row in queryreturn: ackdata, = row self.logger.info('Watching for ackdata %s', hexlify(ackdata)) - self.state.ackdataForWhichImWatching[ackdata] = 0 + state.ackdataForWhichImWatching[ackdata] = 0 # Fix legacy (headerless) watched ackdata to include header - for oldack in self.state.ackdataForWhichImWatching: + for oldack in state.ackdataForWhichImWatching: if len(oldack) == 32: # attach legacy header, always constant (msg/1/1) newack = '\x00\x00\x00\x02\x01\x01' + oldack - self.state.ackdataForWhichImWatching[newack] = 0 + state.ackdataForWhichImWatching[newack] = 0 sqlExecute( '''UPDATE sent SET ackdata=? WHERE ackdata=? AND folder = 'sent' ''', newack, oldack ) - del self.state.ackdataForWhichImWatching[oldack] + del state.ackdataForWhichImWatching[oldack] # For the case if user deleted knownnodes # but is still having onionpeer objects in inventory @@ -128,7 +129,7 @@ class singleWorker(StoppableThread): # before we start on existing POW tasks. self.stop.wait(10) - if self.state.shutdown: + if state.shutdown: return # just in case there are any pending tasks for msg @@ -141,7 +142,7 @@ class singleWorker(StoppableThread): # send onionpeer object queues.workerQueue.put(('sendOnionPeerObj', '')) - while self.state.shutdown == 0: + while state.shutdown == 0: self.busy = 0 command, data = queues.workerQueue.get() self.busy = 1 @@ -493,7 +494,7 @@ class singleWorker(StoppableThread): def sendOnionPeerObj(self, peer=None): """Send onionpeer object representing peer""" if not peer: # find own onionhostname - for peer in self.state.ownAddresses: + for peer in state.ownAddresses: if peer.host.endswith('.onion'): break else: @@ -798,8 +799,8 @@ class singleWorker(StoppableThread): encodeVarint(toAddressVersionNumber) + encodeVarint(toStreamNumber) + toRipe ).digest()).digest()[32:] - if toaddress in self.state.neededPubkeys or \ - toTag in self.state.neededPubkeys: + if toaddress in state.neededPubkeys or \ + toTag in state.neededPubkeys: # We already sent a request for the pubkey sqlExecute( '''UPDATE sent SET status='awaitingpubkey', ''' @@ -840,7 +841,7 @@ class singleWorker(StoppableThread): privEncryptionKey = doubleHashOfToAddressData[:32] # The second half of the sha512 hash. tag = doubleHashOfToAddressData[32:] - self.state.neededPubkeys[tag] = ( + state.neededPubkeys[tag] = ( toaddress, highlevelcrypto.makeCryptor( hexlify(privEncryptionKey)) @@ -863,7 +864,7 @@ class singleWorker(StoppableThread): ''' status='doingpubkeypow') AND ''' ''' folder='sent' ''', toaddress) - del self.state.neededPubkeys[tag] + del state.neededPubkeys[tag] break # else: # There was something wrong with this @@ -905,7 +906,7 @@ class singleWorker(StoppableThread): # if we aren't sending this to ourselves or a chan if not BMConfigParser().has_section(toaddress): - self.state.ackdataForWhichImWatching[ackdata] = 0 + state.ackdataForWhichImWatching[ackdata] = 0 queues.UISignalQueue.put(( 'updateSentItemStatusByAckdata', ( ackdata, @@ -1399,7 +1400,7 @@ class singleWorker(StoppableThread): retryNumber = queryReturn[0][0] if addressVersionNumber <= 3: - self.state.neededPubkeys[toAddress] = 0 + state.neededPubkeys[toAddress] = 0 elif addressVersionNumber >= 4: # If the user just clicked 'send' then the tag # (and other information) will already be in the @@ -1416,10 +1417,10 @@ class singleWorker(StoppableThread): encodeVarint(addressVersionNumber) + encodeVarint(streamNumber) + ripe ).digest()).digest()[32:] - if tag not in self.state.neededPubkeys: + if tag not in state.neededPubkeys: # We'll need this for when we receive a pubkey reply: # it will be encrypted and we'll need to decrypt it. - self.state.neededPubkeys[tag] = ( + state.neededPubkeys[tag] = ( toAddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey)) ) diff --git a/src/network/__init__.py b/src/network/__init__.py index cd517094..a34b4fc6 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -4,17 +4,18 @@ Network subsystem package from announcethread import AnnounceThread from connectionpool import BMConnectionPool +from node import Peer from threads import StoppableThread __all__ = [ - "AnnounceThread", "BMConnectionPool", "StoppableThread" + "AnnounceThread", "BMConnectionPool", "Peer", "StoppableThread" # "AddrThread", "AnnounceThread", "BMNetworkThread", "Dandelion", # "DownloadThread", "InvThread", "ReceiveQueueThread", "UploadThread", ] -def start(config, state): +def start(app): """Start network threads""" from addrthread import AddrThread from dandelion import Dandelion @@ -28,29 +29,31 @@ def start(config, state): readKnownNodes() # init, needs to be early because other thread may access it early Dandelion() - BMConnectionPool().connectToStream(1) - asyncoreThread = BMNetworkThread(state) + # init + pool = BMConnectionPool(app) + pool.connectToStream(1) + asyncoreThread = BMNetworkThread(pool) asyncoreThread.daemon = True asyncoreThread.start() - invThread = InvThread(state) + invThread = InvThread(app) invThread.daemon = True invThread.start() - addrThread = AddrThread(state) + addrThread = AddrThread(app) addrThread.daemon = True addrThread.start() - downloadThread = DownloadThread() + downloadThread = DownloadThread(app) downloadThread.daemon = True downloadThread.start() - uploadThread = UploadThread(state) # state is not used + uploadThread = UploadThread(app) uploadThread.daemon = True uploadThread.start() # Optional components - for i in range(config.getint('threads', 'receive')): - receiveQueueThread = ReceiveQueueThread(state, i) + for i in range(app.config.getint('threads', 'receive')): + receiveQueueThread = ReceiveQueueThread(app, i) receiveQueueThread.daemon = True receiveQueueThread.start() - if config.safeGetBoolean('bitmessagesettings', 'udp'): - state.announceThread = AnnounceThread(state) - state.announceThread.daemon = True - state.announceThread.start() + if app.config.safeGetBoolean('bitmessagesettings', 'udp'): + app.state.announceThread = AnnounceThread(app) + app.state.announceThread.daemon = True + app.state.announceThread.start() diff --git a/src/network/addrthread.py b/src/network/addrthread.py index 53f665b5..aee4809a 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -5,13 +5,12 @@ from six.moves import queue from helper_random import randomshuffle -from network.assemble import assemble_addr from network.connectionpool import BMConnectionPool from queues import addrQueue -from threads import StoppableThread +from threads import StatefulThread -class AddrThread(StoppableThread): +class AddrThread(StatefulThread): """(Node) address broadcasting thread""" name = "AddrBroadcaster" @@ -40,7 +39,8 @@ class AddrThread(StoppableThread): continue filtered.append((stream, peer, seen)) if filtered: - i.append_write_buf(assemble_addr(filtered)) + i.append_write_buf( + self.protocol.assembleAddrMessage(filtered)) addrQueue.iterate() for i in range(len(chunk)): diff --git a/src/network/announcethread.py b/src/network/announcethread.py index c64951a0..c4673046 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -3,14 +3,12 @@ Announce myself (node address) """ import time -from bmconfigparser import BMConfigParser -from network.assemble import assemble_addr -from network.connectionpool import BMConnectionPool +from connectionpool import BMConnectionPool from node import Peer -from threads import StoppableThread +from threads import StatefulThread -class AnnounceThread(StoppableThread): +class AnnounceThread(StatefulThread): """A thread to manage regular announcing of this node""" name = "Announcer" announceInterval = 60 @@ -31,11 +29,9 @@ class AnnounceThread(StoppableThread): if not connection.announcing: continue for stream in self.state.streamsInWhichIAmParticipating: - addr = ( - stream, - Peer( + addr = (stream, Peer( '127.0.0.1', - BMConfigParser().safeGetInt( - 'bitmessagesettings', 'port')), - time.time()) - connection.append_write_buf(assemble_addr([addr])) + self.config.safeGetInt('bitmessagesettings', 'port')), + time.time()) + connection.append_write_buf( + self.protocol.assembleAddrMessage([addr])) diff --git a/src/network/assemble.py b/src/network/assemble.py deleted file mode 100644 index 32fad3e4..00000000 --- a/src/network/assemble.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -Create bitmessage protocol command packets -""" -import struct - -import addresses -from network.constants import MAX_ADDR_COUNT -from network.node import Peer -from protocol import CreatePacket, encodeHost - - -def assemble_addr(peerList): - """Create address command""" - if isinstance(peerList, Peer): - peerList = [peerList] - if not peerList: - return b'' - retval = b'' - for i in range(0, len(peerList), MAX_ADDR_COUNT): - payload = addresses.encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT])) - for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]: - # 64-bit time - payload += struct.pack('>Q', timestamp) - payload += struct.pack('>I', stream) - # service bit flags offered by this node - payload += struct.pack('>q', 1) - payload += encodeHost(peer.host) - # remote port - payload += struct.pack('>H', peer.port) - retval += CreatePacket('addr', payload) - return retval diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py deleted file mode 100644 index c31bbb6a..00000000 --- a/src/network/connectionchooser.py +++ /dev/null @@ -1,77 +0,0 @@ -""" -Select which node to connect to -""" -# pylint: disable=too-many-branches -import logging -import random # nosec - -import knownnodes -import protocol -import state -from bmconfigparser import BMConfigParser -from queues import queue, portCheckerQueue - -logger = logging.getLogger('default') - - -def getDiscoveredPeer(): - """Get a peer from the local peer discovery list""" - try: - peer = random.choice(state.discoveredPeers.keys()) - except (IndexError, KeyError): - raise ValueError - try: - del state.discoveredPeers[peer] - except KeyError: - pass - return peer - - -def chooseConnection(stream): - """Returns an appropriate connection""" - haveOnion = BMConfigParser().safeGet( - "bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS' - onionOnly = BMConfigParser().safeGetBoolean( - "bitmessagesettings", "onionservicesonly") - try: - retval = portCheckerQueue.get(False) - portCheckerQueue.task_done() - return retval - except queue.Empty: - pass - # with a probability of 0.5, connect to a discovered peer - if random.choice((False, True)) and not haveOnion: - # discovered peers are already filtered by allowed streams - return getDiscoveredPeer() - for _ in range(50): - peer = random.choice(knownnodes.knownNodes[stream].keys()) - try: - peer_info = knownnodes.knownNodes[stream][peer] - if peer_info.get('self'): - continue - rating = peer_info["rating"] - except TypeError: - logger.warning('Error in %s', peer) - rating = 0 - if haveOnion: - # do not connect to raw IP addresses - # --keep all traffic within Tor overlay - if onionOnly and not peer.host.endswith('.onion'): - continue - # onion addresses have a higher priority when SOCKS - if peer.host.endswith('.onion') and rating > 0: - rating = 1 - # TODO: need better check - elif not peer.host.startswith('bootstrap'): - encodedAddr = protocol.encodeHost(peer.host) - # don't connect to local IPs when using SOCKS - if not protocol.checkIPAddress(encodedAddr, False): - continue - if rating > 1: - rating = 1 - try: - if 0.05 / (1.0 - rating) > random.random(): - return peer - except ZeroDivisionError: - return peer - raise ValueError diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index fffc0bc3..b78e8b49 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -3,29 +3,91 @@ """ import errno import logging +import random # noseq import re import socket import sys import time -import asyncore_pollchoose as asyncore +# magic imports import helper_random +from queues import queue, portCheckerQueue +from singleton import Singleton + +import asyncore_pollchoose as asyncore import knownnodes -import protocol -import state -from bmconfigparser import BMConfigParser -from connectionchooser import chooseConnection +from advanceddispatcher import AdvancedDispatcher from node import Peer from proxy import Proxy -from singleton import Singleton from tcp import ( - bootstrap, Socks4aBMConnection, Socks5BMConnection, - TCPConnection, TCPServer) + bootstrap, Socks4aBMConnection, Socks5BMConnection, TCPConnection) from udp import UDPSocket logger = logging.getLogger('default') +class TCPServer(AdvancedDispatcher): + """TCP connection server for Bitmessage protocol""" + + def __init__(self, pool, host='127.0.0.1', port=8444): + self.pool = pool + if not hasattr(self, '_map'): + AdvancedDispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + for attempt in range(50): + try: + if attempt > 0: + logger.warning('Failed to bind on port %s', port) + port = random.randint(32767, 65535) + self.bind((host, port)) + except socket.error as e: + if e.errno in (asyncore.EADDRINUSE, asyncore.WSAEADDRINUSE): + continue + else: + if attempt > 0: + logger.warning('Setting port to %s', port) + self.pool.config.set( + 'bitmessagesettings', 'port', str(port)) + self.pool.config.save() + break + self.destination = Peer(host, port) + self.bound = True + self.listen(5) + + def is_bound(self): + """Is the socket bound?""" + try: + return self.bound + except AttributeError: + return False + + def handle_accept(self): + """Incoming connection callback""" + try: + sock = self.accept()[0] + except (TypeError, IndexError): + return + + self.pool.state.ownAddresses[Peer(*sock.getsockname())] = True + if ( + len(self.pool) + > self.pool.config.safeGetInt( + 'bitmessagesettings', 'maxtotalconnections') + + self.pool.config.safeGetInt( + 'bitmessagesettings', 'maxbootstrapconnections') + 10 + ): + # 10 is a sort of buffer, in between it will go through + # the version handshake and return an error to the peer + logger.warning("Server full, dropping connection") + sock.close() + return + try: + self.pool.addConnection(TCPConnection(self.pool, sock=sock)) + except socket.error: + pass + + @Singleton class BMConnectionPool(object): """Pool of all existing connections""" @@ -44,11 +106,16 @@ class BMConnectionPool(object): without compromising security. """ - def __init__(self): + def __init__(self, app=None): + if not app: + return # for singleton convenience + self.config = app.config + self.protocol = app.protocol + self.state = app.state asyncore.set_rates( - BMConfigParser().safeGetInt( + app.config.safeGetInt( "bitmessagesettings", "maxdownloadrate"), - BMConfigParser().safeGetInt( + app.config.safeGetInt( "bitmessagesettings", "maxuploadrate") ) self.outboundConnections = {} @@ -60,7 +127,7 @@ class BMConnectionPool(object): self._spawnWait = 2 self._bootstrapped = False - trustedPeer = BMConfigParser().safeGet( + trustedPeer = app.config.safeGet( 'bitmessagesettings', 'trustedpeer') try: if trustedPeer: @@ -90,7 +157,7 @@ class BMConnectionPool(object): def connectToStream(self, streamNumber): """Connect to a bitmessage stream""" self.streams.append(streamNumber) - state.streamsInWhichIAmParticipating.append(streamNumber) + self.state.streamsInWhichIAmParticipating.append(streamNumber) def getConnectionByAddr(self, addr): """ @@ -160,32 +227,30 @@ class BMConnectionPool(object): pass connection.handle_close() - @staticmethod - def getListeningIP(): + def getListeningIP(self): """What IP are we supposed to be listening on?""" - if BMConfigParser().safeGet( + if self.config.safeGet( "bitmessagesettings", "onionhostname").endswith(".onion"): - host = BMConfigParser().safeGet( - "bitmessagesettings", "onionbindip") + host = self.config.safeGet("bitmessagesettings", "onionbindip") else: host = '127.0.0.1' if ( - BMConfigParser().safeGetBoolean("bitmessagesettings", "sockslisten") - or BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") + self.config.safeGetBoolean("bitmessagesettings", "sockslisten") + or self.config.safeGet("bitmessagesettings", "socksproxytype") == "none" ): # python doesn't like bind + INADDR_ANY? # host = socket.INADDR_ANY - host = BMConfigParser().get("network", "bind") + host = self.config.get("network", "bind") return host def startListening(self, bind=None): """Open a listening socket and start accepting connections on it""" if bind is None: bind = self.getListeningIP() - port = BMConfigParser().safeGetInt("bitmessagesettings", "port") + port = self.config.safeGetInt("bitmessagesettings", "port") # correct port even if it changed - ls = TCPServer(host=bind, port=port) + ls = TCPServer(self, host=bind, port=port) self.listeningSockets[ls.destination] = ls def startUDPSocket(self, bind=None): @@ -195,17 +260,17 @@ class BMConnectionPool(object): """ if bind is None: host = self.getListeningIP() - udpSocket = UDPSocket(host=host, announcing=True) + udpSocket = UDPSocket(self, host=host, announcing=True) else: if bind is False: - udpSocket = UDPSocket(announcing=False) + udpSocket = UDPSocket(self, announcing=False) else: - udpSocket = UDPSocket(host=bind, announcing=True) + udpSocket = UDPSocket(self, host=bind, announcing=True) self.udpSockets[udpSocket.listening.host] = udpSocket def startBootstrappers(self): """Run the process of resolving bootstrap hostnames""" - proxy_type = BMConfigParser().safeGet( + proxy_type = self.config.safeGet( 'bitmessagesettings', 'socksproxytype') # A plugins may be added here hostname = None @@ -229,7 +294,68 @@ class BMConnectionPool(object): hostname = 'bootstrap%s.bitmessage.org' % port else: port = 8444 - self.addConnection(bootstrapper(hostname, port)) + self.addConnection(bootstrapper(self, hostname, port)) + + def chooseConnection(self, stream): + """Returns an appropriate connection""" + def getDiscoveredPeer(): + """Get a peer from the local peer discovery list""" + try: + peer = random.choice(self.state.discoveredPeers.keys()) + except (IndexError, KeyError): + raise ValueError + try: + del self.state.discoveredPeers[peer] + except KeyError: + pass + return peer + + haveOnion = self.config.safeGet( + "bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS' + onionOnly = self.config.safeGetBoolean( + "bitmessagesettings", "onionservicesonly") + try: + retval = portCheckerQueue.get(False) + portCheckerQueue.task_done() + return retval + except queue.Empty: + pass + # with a probability of 0.5, connect to a discovered peer + if random.choice((False, True)) and not haveOnion: + # discovered peers are already filtered by allowed streams + return getDiscoveredPeer() + for _ in range(50): + peer = random.choice(knownnodes.knownNodes[stream].keys()) + try: + peer_info = knownnodes.knownNodes[stream][peer] + if peer_info.get('self'): + continue + rating = peer_info["rating"] + except TypeError: + logger.warning('Error in %s', peer) + rating = 0 + if haveOnion: + # do not connect to raw IP addresses + # --keep all traffic within Tor overlay + if onionOnly and not peer.host.endswith('.onion'): + continue + # onion addresses have a higher priority when SOCKS + if peer.host.endswith('.onion') and rating > 0: + rating = 1 + # TODO: need better check + elif not peer.host.startswith('bootstrap'): + encodedAddr = self.protocol.encodeHost(peer.host) + # don't connect to local IPs when using SOCKS + if not self.protocol.checkIPAddress(encodedAddr, False): + continue + if rating > 1: + rating = 1 + try: + if 0.05 / (1.0 - rating) > random.random(): + return peer + except ZeroDivisionError: + return peer + raise ValueError def loop(self): # pylint: disable=too-many-branches,too-many-statements """Main Connectionpool's loop""" @@ -237,21 +363,20 @@ class BMConnectionPool(object): # defaults to empty loop if outbound connections are maxed spawnConnections = False acceptConnections = True - if BMConfigParser().safeGetBoolean( - 'bitmessagesettings', 'dontconnect'): + if self.config.safeGetBoolean('bitmessagesettings', 'dontconnect'): acceptConnections = False - elif BMConfigParser().safeGetBoolean( + elif self.config.safeGetBoolean( 'bitmessagesettings', 'sendoutgoingconnections'): spawnConnections = True - socksproxytype = BMConfigParser().safeGet( + socksproxytype = self.config.safeGet( 'bitmessagesettings', 'socksproxytype', '') - onionsocksproxytype = BMConfigParser().safeGet( + onionsocksproxytype = self.config.safeGet( 'bitmessagesettings', 'onionsocksproxytype', '') if ( socksproxytype[:5] == 'SOCKS' - and not BMConfigParser().safeGetBoolean( + and not self.config.safeGetBoolean( 'bitmessagesettings', 'sockslisten') - and '.onion' not in BMConfigParser().safeGet( + and '.onion' not in self.config.safeGet( 'bitmessagesettings', 'onionhostname', '') ): acceptConnections = False @@ -264,9 +389,9 @@ class BMConnectionPool(object): if not self._bootstrapped: self._bootstrapped = True Proxy.proxy = ( - BMConfigParser().safeGet( + self.config.safeGet( 'bitmessagesettings', 'sockshostname'), - BMConfigParser().safeGetInt( + self.config.safeGetInt( 'bitmessagesettings', 'socksport') ) # TODO AUTH @@ -275,9 +400,9 @@ class BMConnectionPool(object): if not onionsocksproxytype.startswith("SOCKS"): raise ValueError Proxy.onion_proxy = ( - BMConfigParser().safeGet( + self.config.safeGet( 'network', 'onionsockshostname', None), - BMConfigParser().safeGet( + self.config.safeGet( 'network', 'onionsocksport', None) ) except ValueError: @@ -286,12 +411,13 @@ class BMConnectionPool(object): 1 for c in self.outboundConnections.values() if (c.connected and c.fullyEstablished)) pending = len(self.outboundConnections) - established - if established < BMConfigParser().safeGetInt( + if established < self.config.safeGetInt( 'bitmessagesettings', 'maxoutboundconnections'): for i in range( - state.maximumNumberOfHalfOpenConnections - pending): + self.state.maximumNumberOfHalfOpenConnections - pending + ): try: - chosen = self.trustedPeer or chooseConnection( + chosen = self.trustedPeer or self.chooseConnection( helper_random.randomchoice(self.streams)) except ValueError: continue @@ -300,11 +426,11 @@ class BMConnectionPool(object): if chosen.host in self.inboundConnections: continue # don't connect to self - if chosen in state.ownAddresses: + if chosen in self.state.ownAddresses: continue # don't connect to the hosts from the same # network group, defense against sibyl attacks - host_network_group = protocol.network_group( + host_network_group = self.protocol.network_group( chosen.host) same_group = False for j in self.outboundConnections.values(): @@ -319,15 +445,19 @@ class BMConnectionPool(object): try: if chosen.host.endswith(".onion") and Proxy.onion_proxy: if onionsocksproxytype == "SOCKS5": - self.addConnection(Socks5BMConnection(chosen)) + self.addConnection( + Socks5BMConnection(self, chosen)) elif onionsocksproxytype == "SOCKS4a": - self.addConnection(Socks4aBMConnection(chosen)) + self.addConnection( + Socks4aBMConnection(self, chosen)) elif socksproxytype == "SOCKS5": - self.addConnection(Socks5BMConnection(chosen)) + self.addConnection( + Socks5BMConnection(self, chosen)) elif socksproxytype == "SOCKS4a": - self.addConnection(Socks4aBMConnection(chosen)) + self.addConnection( + Socks4aBMConnection(self, chosen)) else: - self.addConnection(TCPConnection(chosen)) + self.addConnection(TCPConnection(self, chosen)) except socket.error as e: if e.errno == errno.ENETUNREACH: continue @@ -340,22 +470,21 @@ class BMConnectionPool(object): if acceptConnections: if not self.listeningSockets: - if BMConfigParser().safeGet('network', 'bind') == '': + if self.config.safeGet('network', 'bind') == '': self.startListening() else: for bind in re.sub( - r'[^\w.]+', ' ', - BMConfigParser().safeGet('network', 'bind') + r'[^\w.]+', ' ', self.config.safeGet('network', 'bind') ).split(): self.startListening(bind) logger.info('Listening for incoming connections.') if not self.udpSockets: - if BMConfigParser().safeGet('network', 'bind') == '': + if self.config.safeGet('network', 'bind') == '': self.startUDPSocket() else: for bind in re.sub( r'[^\w.]+', ' ', - BMConfigParser().safeGet('network', 'bind') + self.config.safeGet('network', 'bind') ).split(): self.startUDPSocket(bind) self.startUDPSocket(False) @@ -384,7 +513,7 @@ class BMConnectionPool(object): minTx -= 300 - 20 if i.lastTx < minTx: if i.fullyEstablished: - i.append_write_buf(protocol.CreatePacket('ping')) + i.append_write_buf(self.protocol.CreatePacket('ping')) else: i.close_reason = "Timeout (%is)" % ( time.time() - i.lastTx) diff --git a/src/network/constants.py b/src/network/constants.py index f8f4120f..a4a2fcc5 100644 --- a/src/network/constants.py +++ b/src/network/constants.py @@ -5,8 +5,6 @@ Network protocol constants #: address is online if online less than this many seconds ago ADDRESS_ALIVE = 10800 -#: protocol specification says max 1000 addresses in one addr command -MAX_ADDR_COUNT = 1000 #: ~1.6 MB which is the maximum possible size of an inv message. MAX_MESSAGE_SIZE = 1600100 #: 2**18 = 256kB is the maximum size of an object payload diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 22b589f6..1f7500a3 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -3,17 +3,17 @@ """ import time -import addresses import helper_random -import protocol +from inventory import Inventory # magic import + from dandelion import Dandelion -from inventory import Inventory +# TODO: from connectionpool import from network.connectionpool import BMConnectionPool from objectracker import missingObjects -from threads import StoppableThread +from threads import StatefulThread -class DownloadThread(StoppableThread): +class DownloadThread(StatefulThread): """Thread-based class for downloading from connections""" minPending = 200 maxRequestChunk = 1000 @@ -21,8 +21,8 @@ class DownloadThread(StoppableThread): cleanInterval = 60 requestExpires = 3600 - def __init__(self): - super(DownloadThread, self).__init__(None, name="Downloader") + def __init__(self, app): + super(DownloadThread, self).__init__(app, name="Downloader") self.lastCleaned = time.time() def cleanPending(self): @@ -72,8 +72,9 @@ class DownloadThread(StoppableThread): missingObjects[chunk] = now if not chunkCount: continue - payload[0:0] = addresses.encodeVarint(chunkCount) - i.append_write_buf(protocol.CreatePacket('getdata', payload)) + payload[0:0] = self.protocol.encodeVarint(chunkCount) + i.append_write_buf( + self.protocol.CreatePacket('getdata', payload)) self.logger.debug( '%s:%i Requesting %i objects', i.destination.host, i.destination.port, chunkCount) diff --git a/src/network/invthread.py b/src/network/invthread.py index 2e513d59..5243fe40 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -5,12 +5,10 @@ import Queue import random from time import time -import addresses -import protocol from network.connectionpool import BMConnectionPool from network.dandelion import Dandelion -from queues import invQueue -from threads import StoppableThread +from queues import invQueue # FIXME: get from app? +from threads import StatefulThread def handleExpiredDandelion(expired): @@ -31,7 +29,7 @@ def handleExpiredDandelion(expired): i.objectsNewToThem[hashid] = time() -class InvThread(StoppableThread): +class InvThread(StatefulThread): """Main thread that sends inv annoucements""" name = "InvBroadcaster" @@ -79,7 +77,9 @@ class InvThread(StoppableThread): if random.randint(1, 100) >= self.state.dandelion: fluffs.append(inv[1]) # send a dinv only if the stem node supports dandelion - elif connection.services & protocol.NODE_DANDELION > 0: + elif ( + connection.services + & self.protocol.NODE_DANDELION > 0): stems.append(inv[1]) else: fluffs.append(inv[1]) @@ -88,15 +88,15 @@ class InvThread(StoppableThread): if fluffs: random.shuffle(fluffs) - connection.append_write_buf(protocol.CreatePacket( + connection.append_write_buf(self.protocol.CreatePacket( 'inv', - addresses.encodeVarint( + self.protocol.encodeVarint( len(fluffs)) + ''.join(fluffs))) if stems: random.shuffle(stems) - connection.append_write_buf(protocol.CreatePacket( + connection.append_write_buf(self.protocol.CreatePacket( 'dinv', - addresses.encodeVarint( + self.protocol.encodeVarint( len(stems)) + ''.join(stems))) invQueue.iterate() diff --git a/src/network/networkthread.py b/src/network/networkthread.py index d88bc2e9..a34b58a0 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -1,9 +1,10 @@ """ A thread to handle network concerns """ + +from queues import excQueue # magic import + import network.asyncore_pollchoose as asyncore -from network.connectionpool import BMConnectionPool -from queues import excQueue from threads import StoppableThread @@ -11,27 +12,31 @@ class BMNetworkThread(StoppableThread): """Main network thread""" name = "Asyncore" + def __init__(self, pool): + self.pool = pool + super(BMNetworkThread, self).__init__(self.name) + def run(self): try: - while not self._stopped and self.state.shutdown == 0: - BMConnectionPool().loop() + while not self._stopped and self.pool.state.shutdown == 0: + self.pool.loop() except Exception as e: excQueue.put((self.name, e)) raise def stopThread(self): super(BMNetworkThread, self).stopThread() - for i in BMConnectionPool().listeningSockets.values(): + for i in self.pool.listeningSockets.values(): try: i.close() except: pass - for i in BMConnectionPool().outboundConnections.values(): + for i in self.pool.outboundConnections.values(): try: i.close() except: pass - for i in BMConnectionPool().inboundConnections.values(): + for i in self.pool.inboundConnections.values(): try: i.close() except: diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index fa8bf99c..d1ee252a 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -8,15 +8,15 @@ import socket from network.advanceddispatcher import UnknownStateError from network.connectionpool import BMConnectionPool from queues import receiveDataQueue -from threads import StoppableThread +from threads import StatefulThread -class ReceiveQueueThread(StoppableThread): +class ReceiveQueueThread(StatefulThread): """This thread processes data received from the network (which is done by the asyncore thread)""" - def __init__(self, state, num=0): + def __init__(self, app, num=0): super(ReceiveQueueThread, self).__init__( - state, name="ReceiveQueue_%i" % num) + app, name="ReceiveQueue_%i" % num) def run(self): while not self._stopped and self.state.shutdown == 0: diff --git a/src/network/tcp.py b/src/network/tcp.py index ff778378..07e6d030 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -5,22 +5,20 @@ TCP protocol handler import l10n import logging import math -import random import socket import time -import addresses -import asyncore_pollchoose as asyncore -import connectionpool +# magic imports import helper_random -import knownnodes -import protocol -import state -from bmconfigparser import BMConfigParser -from helper_random import randomBytes from inventory import Inventory +from queues import invQueue, receiveDataQueue, UISignalQueue +from tr import _translate + +import asyncore_pollchoose as asyncore + +import knownnodes +# TODO: not use network. here because this module is inside the network package from network.advanceddispatcher import AdvancedDispatcher -from network.assemble import assemble_addr from network.bmproto import BMProto from network.constants import MAX_OBJECT_COUNT from network.dandelion import Dandelion @@ -29,8 +27,7 @@ from network.socks4a import Socks4aConnection from network.socks5 import Socks5Connection from network.tls import TLSDispatcher from node import Peer -from queues import invQueue, receiveDataQueue, UISignalQueue -from tr import _translate + logger = logging.getLogger('default') @@ -45,8 +42,10 @@ class TCPConnection(BMProto, TLSDispatcher): .. todo:: Look to understand and/or fix the non-parent-init-called """ - def __init__(self, address=None, sock=None): + def __init__(self, pool, address=None, sock=None): BMProto.__init__(self, address=address, sock=sock) + self.pool = pool + self.protocol = pool.protocol self.verackReceived = False self.verackSent = False self.streams = [0] @@ -60,7 +59,7 @@ class TCPConnection(BMProto, TLSDispatcher): logger.debug( 'Received connection from %s:%i', self.destination.host, self.destination.port) - self.nodeid = randomBytes(8) + self.nodeid = helper_random.randomBytes(8) elif address is not None and sock is not None: TLSDispatcher.__init__(self, sock, server_side=False) self.isOutbound = True @@ -81,17 +80,17 @@ class TCPConnection(BMProto, TLSDispatcher): self.destination.host, self.destination.port) try: self.local = ( - protocol.checkIPAddress( - protocol.encodeHost(self.destination.host), True) - and not protocol.checkSocksIP(self.destination.host) + self.protocol.checkIPAddress( + self.protocol.encodeHost(self.destination.host), True) + and not self.protocol.checkSocksIP(self.destination.host) ) except socket.error: # it's probably a hostname pass - self.network_group = protocol.network_group(self.destination.host) + self.network_group = self.protocol.network_group(self.destination.host) ObjectTracker.__init__(self) # pylint: disable=non-parent-init-called self.bm_proto_reset() - self.set_state("bm_header", expectBytes=protocol.Header.size) + self.set_state("bm_header", expectBytes=self.protocol.Header.size) def antiIntersectionDelay(self, initial=False): """ @@ -156,7 +155,7 @@ class TCPConnection(BMProto, TLSDispatcher): def set_connection_fully_established(self): """Initiate inventory synchronisation.""" if not self.isOutbound and not self.local: - state.clientHasReceivedIncomingConnections = True + self.pool.state.clientHasReceivedIncomingConnections = True UISignalQueue.put(('setStatusIcon', 'green')) UISignalQueue.put(( 'updateNetworkStatusTab', (self.isOutbound, True, self.destination) @@ -164,7 +163,7 @@ class TCPConnection(BMProto, TLSDispatcher): self.antiIntersectionDelay(True) self.fullyEstablished = True # The connection having host suitable for knownnodes - if self.isOutbound or not self.local and not state.socksIP: + if self.isOutbound or not self.local and not self.pool.state.socksIP: knownnodes.increaseRating(self.destination) knownnodes.addKnownNode( self.streams, self.destination, time.time()) @@ -177,7 +176,7 @@ class TCPConnection(BMProto, TLSDispatcher): # We are going to share a maximum number of 1000 addrs (per overlapping # stream) with our peer. 500 from overlapping streams, 250 from the # left child stream, and 250 from the right child stream. - maxAddrCount = BMConfigParser().safeGetInt( + maxAddrCount = self.pool.config.safeGetInt( "bitmessagesettings", "maxaddrperstreamsend", 500) templist = [] @@ -205,7 +204,7 @@ class TCPConnection(BMProto, TLSDispatcher): for peer, params in addrs[substream]: templist.append((substream, peer, params["lastseen"])) if templist: - self.append_write_buf(assemble_addr(templist)) + self.append_write_buf(self.protocol.assembleAddrMessage(templist)) def sendBigInv(self): """ @@ -219,8 +218,8 @@ class TCPConnection(BMProto, TLSDispatcher): logger.debug( 'Sending huge inv message with %i objects to just this' ' one peer', objectCount) - self.append_write_buf(protocol.CreatePacket( - 'inv', addresses.encodeVarint(objectCount) + payload)) + self.append_write_buf(self.protocol.CreatePacket( + 'inv', self.protocol.encodeVarint(objectCount) + payload)) # Select all hashes for objects in this stream. bigInvList = {} @@ -263,12 +262,11 @@ class TCPConnection(BMProto, TLSDispatcher): '%s:%i: Connection failed: %s', self.destination.host, self.destination.port, e) return - self.nodeid = randomBytes(8) + self.nodeid = helper_random.randomBytes(8) self.append_write_buf( - protocol.assembleVersionMessage( + self.protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.BMConnectionPool().streams, - False, nodeid=self.nodeid)) + self.pool.streams, False, nodeid=self.nodeid)) self.connectedAt = time.time() receiveDataQueue.put(self.destination) @@ -283,7 +281,8 @@ class TCPConnection(BMProto, TLSDispatcher): def handle_close(self): """Callback for connection being closed.""" - host_is_global = self.isOutbound or not self.local and not state.socksIP + host_is_global = ( + self.isOutbound or not self.local and not self.pool.state.socksIP) if self.fullyEstablished: UISignalQueue.put(( 'updateNetworkStatusTab', @@ -303,9 +302,9 @@ class TCPConnection(BMProto, TLSDispatcher): class Socks5BMConnection(Socks5Connection, TCPConnection): """SOCKS5 wrapper for TCP connections""" - def __init__(self, address): + def __init__(self, pool, address): Socks5Connection.__init__(self, address=address) - TCPConnection.__init__(self, address=address, sock=self.socket) + TCPConnection.__init__(self, pool, address=address, sock=self.socket) self.set_state("init") def state_proxy_handshake_done(self): @@ -314,38 +313,23 @@ class Socks5BMConnection(Socks5Connection, TCPConnection): Bitmessage handshake to peer. """ Socks5Connection.state_proxy_handshake_done(self) - self.nodeid = randomBytes(8) + self.nodeid = helper_random.randomBytes(8) self.append_write_buf( - protocol.assembleVersionMessage( + self.protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.BMConnectionPool().streams, + self.pool.streams, False, nodeid=self.nodeid)) - self.set_state("bm_header", expectBytes=protocol.Header.size) + self.set_state("bm_header", expectBytes=self.protocol.Header.size) return True -class Socks4aBMConnection(Socks4aConnection, TCPConnection): +class Socks4aBMConnection(Socks4aConnection, Socks5BMConnection, TCPConnection): """SOCKS4a wrapper for TCP connections""" - def __init__(self, address): + def __init__(self, pool, address): Socks4aConnection.__init__(self, address=address) - TCPConnection.__init__(self, address=address, sock=self.socket) - self.set_state("init") - - def state_proxy_handshake_done(self): - """ - State when SOCKS4a connection succeeds, we need to send a - Bitmessage handshake to peer. - """ - Socks4aConnection.state_proxy_handshake_done(self) - self.nodeid = randomBytes(8) - self.append_write_buf( - protocol.assembleVersionMessage( - self.destination.host, self.destination.port, - connectionpool.BMConnectionPool().streams, - False, nodeid=self.nodeid)) - self.set_state("bm_header", expectBytes=protocol.Header.size) - return True + Socks5BMConnection.__init__( + self, pool, address=address, sock=self.socket) def bootstrap(connection_class): @@ -354,8 +338,8 @@ def bootstrap(connection_class): """Base class for bootstrappers""" _connection_base = connection_class - def __init__(self, host, port): - self._connection_base.__init__(self, Peer(host, port)) + def __init__(self, pool, host, port): + self._connection_base.__init__(self, pool, Peer(host, port)) self.close_reason = self._succeed = False def bm_command_addr(self): @@ -384,65 +368,3 @@ def bootstrap(connection_class): knownnodes.knownNodesActual = False return Bootstrapper - - -class TCPServer(AdvancedDispatcher): - """TCP connection server for Bitmessage protocol""" - - def __init__(self, host='127.0.0.1', port=8444): - if not hasattr(self, '_map'): - AdvancedDispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - for attempt in range(50): - try: - if attempt > 0: - logger.warning('Failed to bind on port %s', port) - port = random.randint(32767, 65535) - self.bind((host, port)) - except socket.error as e: - if e.errno in (asyncore.EADDRINUSE, asyncore.WSAEADDRINUSE): - continue - else: - if attempt > 0: - logger.warning('Setting port to %s', port) - BMConfigParser().set( - 'bitmessagesettings', 'port', str(port)) - BMConfigParser().save() - break - self.destination = Peer(host, port) - self.bound = True - self.listen(5) - - def is_bound(self): - """Is the socket bound?""" - try: - return self.bound - except AttributeError: - return False - - def handle_accept(self): - """Incoming connection callback""" - try: - sock = self.accept()[0] - except (TypeError, IndexError): - return - - state.ownAddresses[Peer(*sock.getsockname())] = True - if ( - len(connectionpool.BMConnectionPool()) - > BMConfigParser().safeGetInt( - 'bitmessagesettings', 'maxtotalconnections') - + BMConfigParser().safeGetInt( - 'bitmessagesettings', 'maxbootstrapconnections') + 10 - ): - # 10 is a sort of buffer, in between it will go through - # the version handshake and return an error to the peer - logger.warning("Server full, dropping connection") - sock.close() - return - try: - connectionpool.BMConnectionPool().addConnection( - TCPConnection(sock=sock)) - except socket.error: - pass diff --git a/src/network/threads.py b/src/network/threads.py index 19c58e2f..59119e51 100644 --- a/src/network/threads.py +++ b/src/network/threads.py @@ -11,8 +11,7 @@ class StoppableThread(threading.Thread): name = None logger = logging.getLogger('default') - def __init__(self, state, name=None): - self.state = state + def __init__(self, name=None): if name: self.name = name super(StoppableThread, self).__init__(name=self.name) @@ -27,6 +26,18 @@ class StoppableThread(threading.Thread): self.stop.set() +class StatefulThread(StoppableThread): + """ + Base class for the network thread which uses protocol, + app config or state and initialized with an app object. + """ + def __init__(self, app, name=None): + self.state = app.state + self.config = app.config + self.protocol = app.protocol + super(StatefulThread, self).__init__(name=name or self.name) + + class BusyError(threading.ThreadError): """ Thread error raised when another connection holds the lock diff --git a/src/network/tls.py b/src/network/tls.py index a3774b44..d9a88bda 100644 --- a/src/network/tls.py +++ b/src/network/tls.py @@ -7,8 +7,9 @@ import socket import ssl import sys +import paths # magic import + import network.asyncore_pollchoose as asyncore -import paths from network.advanceddispatcher import AdvancedDispatcher from queues import receiveDataQueue diff --git a/src/network/udp.py b/src/network/udp.py index 3f999332..4d195267 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -5,13 +5,13 @@ import logging import socket import time -import protocol -import state +from queues import receiveDataQueue # magic import + from bmproto import BMProto from constants import MAX_TIME_OFFSET from node import Peer from objectracker import ObjectTracker -from queues import receiveDataQueue + logger = logging.getLogger('default') @@ -20,9 +20,10 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes """Bitmessage protocol over UDP (class)""" port = 8444 - def __init__(self, host=None, sock=None, announcing=False): - # pylint: disable=bad-super-call - super(BMProto, self).__init__(sock=sock) + def __init__(self, pool, host=None, sock=None, announcing=False): + self.pool = pool + self.protocol = pool.protocol + super(UDPSocket, self).__init__(sock=sock) self.verackReceived = True self.verackSent = True # .. todo:: sort out streams @@ -48,7 +49,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes self.connecting = False self.connected = True self.announcing = announcing - self.set_state("bm_header", expectBytes=protocol.Header.size) + self.set_state("bm_header", expectBytes=self.protocol.Header.size) def set_socket_reuse(self): """Set socket reuse option""" @@ -78,8 +79,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes return True remoteport = False for seenTime, stream, _, ip, port in addresses: - decodedIP = protocol.checkIPAddress(str(ip)) - if stream not in state.streamsInWhichIAmParticipating: + decodedIP = self.protocol.checkIPAddress(str(ip)) + if stream not in self.pool.state.streamsInWhichIAmParticipating: continue if (seenTime < time.time() - MAX_TIME_OFFSET or seenTime > time.time() + MAX_TIME_OFFSET): @@ -93,8 +94,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes logger.debug( "received peer discovery from %s:%i (port %i):", self.destination.host, self.destination.port, remoteport) - state.discoveredPeers[Peer(self.destination.host, remoteport)] = \ - time.time() + self.pool.state.discoveredPeers[ + Peer(self.destination.host, remoteport)] = time.time() return True def bm_command_portcheck(self): @@ -129,8 +130,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes return self.destination = Peer(*addr) - encodedAddr = protocol.encodeHost(addr[0]) - self.local = bool(protocol.checkIPAddress(encodedAddr, True)) + encodedAddr = self.protocol.encodeHost(addr[0]) + self.local = bool(self.protocol.checkIPAddress(encodedAddr, True)) # overwrite the old buffer to avoid mixing data and so that # self.local works correctly self.read_buf[0:] = recdata diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index 7d80d789..f8269072 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -4,15 +4,14 @@ import time import helper_random -import protocol from inventory import Inventory +# TODo: above two should also be supplied with app or app.protocol from network.connectionpool import BMConnectionPool from network.dandelion import Dandelion -from randomtrackingdict import RandomTrackingDict -from threads import StoppableThread +from threads import StatefulThread -class UploadThread(StoppableThread): +class UploadThread(StatefulThread): """ This is a thread that uploads the objects that the peers requested from me """ @@ -34,7 +33,7 @@ class UploadThread(StoppableThread): continue try: request = i.pendingUpload.randomKeys( - RandomTrackingDict.maxPending) + self.protocol.RandomTrackingDict.maxPending) except KeyError: continue payload = bytearray() @@ -49,7 +48,7 @@ class UploadThread(StoppableThread): i.destination) break try: - payload.extend(protocol.CreatePacket( + payload.extend(self.protocol.CreatePacket( 'object', Inventory()[chunk].payload)) chunk_count += 1 except KeyError: diff --git a/src/protocol.py b/src/protocol.py index 1934d9cc..e1a65363 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -22,8 +22,14 @@ from bmconfigparser import BMConfigParser from debug import logger from fallback import RIPEMD160Hash from helper_sql import sqlExecute +from network.node import Peer +from randomtrackingdict import RandomTrackingDict from version import softwareVersion + +#: protocol specification says max 1000 addresses in one addr command +MAX_ADDR_COUNT = 1000 + # Service flags #: This is a normal network node NODE_NETWORK = 1 @@ -295,6 +301,28 @@ def CreatePacket(command, payload=b''): return bytes(b) +def assembleAddrMessage(peerList): + """Create address command""" + if isinstance(peerList, Peer): + peerList = [peerList] + if not peerList: + return b'' + retval = b'' + for i in range(0, len(peerList), MAX_ADDR_COUNT): + payload = encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT])) + for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]: + # 64-bit time + payload += pack('>Q', timestamp) + payload += pack('>I', stream) + # service bit flags offered by this node + payload += pack('>q', 1) + payload += encodeHost(peer.host) + # remote port + payload += pack('>H', peer.port) + retval += CreatePacket('addr', payload) + return retval + + def assembleVersionMessage( remoteHost, remotePort, participatingStreams, server=False, nodeid=None ): diff --git a/src/singleton.py b/src/singleton.py index 5c6c43be..fd6b7c39 100644 --- a/src/singleton.py +++ b/src/singleton.py @@ -5,7 +5,7 @@ Singleton decorator definition from functools import wraps -def Singleton(cls): +def Singleton(cls, *args): """ Decorator implementing the singleton pattern: it restricts the instantiation of a class to one "single" instance. @@ -14,9 +14,9 @@ def Singleton(cls): # https://github.com/sphinx-doc/sphinx/issues/3783 @wraps(cls) - def getinstance(): + def getinstance(*args): """Find an instance or save newly created one""" if cls not in instances: - instances[cls] = cls() + instances[cls] = cls(*args) return instances[cls] return getinstance