Start network subsystem with app object with config, state and protocol slots

This commit is contained in:
Dmitri Bogomolov 2021-08-18 19:32:47 +03:00
parent ee97277d5c
commit 963ffd11f3
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13
21 changed files with 392 additions and 399 deletions

View File

@ -21,6 +21,7 @@ app_dir = pathmagic.setup()
import depends
depends.check_dependencies()
import collections
import getopt
import multiprocessing
# Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully.
@ -30,6 +31,7 @@ import time
import traceback
import defaults
import protocol
import shared
import shutdown
import state
@ -80,6 +82,9 @@ def signal_handler(signum, frame):
' because the UI captures the signal.')
App = collections.namedtuple('App', ['state', 'config', 'protocol'])
class Main(object):
"""Main PyBitmessage class"""
def start(self):
@ -186,13 +191,13 @@ class Main(object):
shared.reloadBroadcastSendersForWhichImWatching()
# Start the address generation thread
addressGeneratorThread = addressGenerator(state)
addressGeneratorThread = addressGenerator()
# close the main program even if there are threads left
addressGeneratorThread.daemon = True
addressGeneratorThread.start()
# Start the thread that calculates POWs
singleWorkerThread = singleWorker(state)
singleWorkerThread = singleWorker()
# close the main program even if there are threads left
singleWorkerThread.daemon = True
singleWorkerThread.start()
@ -209,26 +214,26 @@ class Main(object):
if daemon and config.safeGet(
'bitmessagesettings', 'smtpdeliver', '') != '':
from class_smtpDeliver import smtpDeliver
smtpDeliveryThread = smtpDeliver(state)
smtpDeliveryThread = smtpDeliver()
smtpDeliveryThread.start()
# SMTP daemon thread
if daemon and config.safeGetBoolean(
'bitmessagesettings', 'smtpd'):
from class_smtpServer import smtpServer
smtpServerThread = smtpServer(state)
smtpServerThread = smtpServer()
smtpServerThread.start()
# API is also objproc dependent
if config.safeGetBoolean('bitmessagesettings', 'apienabled'):
import api # pylint: disable=relative-import
singleAPIThread = api.singleAPI(state)
from api import singleAPI
singleAPIThread = singleAPI()
# close the main program even if there are threads left
singleAPIThread.daemon = True
singleAPIThread.start()
# Start the cleanerThread
singleCleanerThread = singleCleaner(state)
singleCleanerThread = singleCleaner()
# close the main program even if there are threads left
singleCleanerThread.daemon = True
singleCleanerThread.start()
@ -236,7 +241,7 @@ class Main(object):
# start network components if networking is enabled
if state.enableNetwork:
start_proxyconfig()
network.start(config, state)
network.start(App(state, config, protocol))
if config.safeGetBoolean('bitmessagesettings', 'upnp'):
import upnp

View File

@ -9,6 +9,7 @@ import defaults
import highlevelcrypto
import queues
import shared
import state
import tr
from addresses import decodeAddress, encodeAddress, encodeVarint
from bmconfigparser import BMConfigParser
@ -44,7 +45,7 @@ class addressGenerator(StoppableThread):
"""
# pylint: disable=too-many-locals, too-many-branches
# pylint: disable=protected-access, too-many-statements
while self.state.shutdown == 0:
while state.shutdown == 0:
queueValue = queues.addressGeneratorQueue.get()
nonceTrialsPerByte = 0
payloadLengthExtraBytes = 0

View File

@ -23,6 +23,7 @@ import proofofwork
import protocol
import queues
import shared
import state
import tr
from addresses import (
calculateInventoryHash, decodeAddress, decodeVarint, encodeVarint
@ -47,8 +48,8 @@ def sizeof_fmt(num, suffix='h/s'):
class singleWorker(StoppableThread):
"""Thread for performing PoW"""
def __init__(self, state):
super(singleWorker, self).__init__(state, name="singleWorker")
def __init__(self):
super(singleWorker, self).__init__(name="singleWorker")
proofofwork.init()
def stopThread(self):
@ -63,9 +64,9 @@ class singleWorker(StoppableThread):
def run(self):
# pylint: disable=attribute-defined-outside-init
while not helper_sql.sql_ready.wait(1.0) and self.state.shutdown == 0:
while not helper_sql.sql_ready.wait(1.0) and state.shutdown == 0:
self.stop.wait(1.0)
if self.state.shutdown > 0:
if state.shutdown > 0:
return
# Initialize the neededPubkeys dictionary.
@ -78,7 +79,7 @@ class singleWorker(StoppableThread):
_, toAddressVersionNumber, toStreamNumber, toRipe = \
decodeAddress(toAddress)
if toAddressVersionNumber <= 3:
self.state.neededPubkeys[toAddress] = 0
state.neededPubkeys[toAddress] = 0
elif toAddressVersionNumber >= 4:
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(
encodeVarint(toAddressVersionNumber)
@ -89,7 +90,7 @@ class singleWorker(StoppableThread):
tag = doubleHashOfAddressData[32:]
# We'll need this for when we receive a pubkey reply:
# it will be encrypted and we'll need to decrypt it.
self.state.neededPubkeys[tag] = (
state.neededPubkeys[tag] = (
toAddress,
highlevelcrypto.makeCryptor(
hexlify(privEncryptionKey))
@ -101,19 +102,19 @@ class singleWorker(StoppableThread):
for row in queryreturn:
ackdata, = row
self.logger.info('Watching for ackdata %s', hexlify(ackdata))
self.state.ackdataForWhichImWatching[ackdata] = 0
state.ackdataForWhichImWatching[ackdata] = 0
# Fix legacy (headerless) watched ackdata to include header
for oldack in self.state.ackdataForWhichImWatching:
for oldack in state.ackdataForWhichImWatching:
if len(oldack) == 32:
# attach legacy header, always constant (msg/1/1)
newack = '\x00\x00\x00\x02\x01\x01' + oldack
self.state.ackdataForWhichImWatching[newack] = 0
state.ackdataForWhichImWatching[newack] = 0
sqlExecute(
'''UPDATE sent SET ackdata=? WHERE ackdata=? AND folder = 'sent' ''',
newack, oldack
)
del self.state.ackdataForWhichImWatching[oldack]
del state.ackdataForWhichImWatching[oldack]
# For the case if user deleted knownnodes
# but is still having onionpeer objects in inventory
@ -128,7 +129,7 @@ class singleWorker(StoppableThread):
# before we start on existing POW tasks.
self.stop.wait(10)
if self.state.shutdown:
if state.shutdown:
return
# just in case there are any pending tasks for msg
@ -141,7 +142,7 @@ class singleWorker(StoppableThread):
# send onionpeer object
queues.workerQueue.put(('sendOnionPeerObj', ''))
while self.state.shutdown == 0:
while state.shutdown == 0:
self.busy = 0
command, data = queues.workerQueue.get()
self.busy = 1
@ -493,7 +494,7 @@ class singleWorker(StoppableThread):
def sendOnionPeerObj(self, peer=None):
"""Send onionpeer object representing peer"""
if not peer: # find own onionhostname
for peer in self.state.ownAddresses:
for peer in state.ownAddresses:
if peer.host.endswith('.onion'):
break
else:
@ -798,8 +799,8 @@ class singleWorker(StoppableThread):
encodeVarint(toAddressVersionNumber)
+ encodeVarint(toStreamNumber) + toRipe
).digest()).digest()[32:]
if toaddress in self.state.neededPubkeys or \
toTag in self.state.neededPubkeys:
if toaddress in state.neededPubkeys or \
toTag in state.neededPubkeys:
# We already sent a request for the pubkey
sqlExecute(
'''UPDATE sent SET status='awaitingpubkey', '''
@ -840,7 +841,7 @@ class singleWorker(StoppableThread):
privEncryptionKey = doubleHashOfToAddressData[:32]
# The second half of the sha512 hash.
tag = doubleHashOfToAddressData[32:]
self.state.neededPubkeys[tag] = (
state.neededPubkeys[tag] = (
toaddress,
highlevelcrypto.makeCryptor(
hexlify(privEncryptionKey))
@ -863,7 +864,7 @@ class singleWorker(StoppableThread):
''' status='doingpubkeypow') AND '''
''' folder='sent' ''',
toaddress)
del self.state.neededPubkeys[tag]
del state.neededPubkeys[tag]
break
# else:
# There was something wrong with this
@ -905,7 +906,7 @@ class singleWorker(StoppableThread):
# if we aren't sending this to ourselves or a chan
if not BMConfigParser().has_section(toaddress):
self.state.ackdataForWhichImWatching[ackdata] = 0
state.ackdataForWhichImWatching[ackdata] = 0
queues.UISignalQueue.put((
'updateSentItemStatusByAckdata', (
ackdata,
@ -1399,7 +1400,7 @@ class singleWorker(StoppableThread):
retryNumber = queryReturn[0][0]
if addressVersionNumber <= 3:
self.state.neededPubkeys[toAddress] = 0
state.neededPubkeys[toAddress] = 0
elif addressVersionNumber >= 4:
# If the user just clicked 'send' then the tag
# (and other information) will already be in the
@ -1416,10 +1417,10 @@ class singleWorker(StoppableThread):
encodeVarint(addressVersionNumber)
+ encodeVarint(streamNumber) + ripe
).digest()).digest()[32:]
if tag not in self.state.neededPubkeys:
if tag not in state.neededPubkeys:
# We'll need this for when we receive a pubkey reply:
# it will be encrypted and we'll need to decrypt it.
self.state.neededPubkeys[tag] = (
state.neededPubkeys[tag] = (
toAddress,
highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))
)

View File

@ -4,17 +4,18 @@ Network subsystem package
from announcethread import AnnounceThread
from connectionpool import BMConnectionPool
from node import Peer
from threads import StoppableThread
__all__ = [
"AnnounceThread", "BMConnectionPool", "StoppableThread"
"AnnounceThread", "BMConnectionPool", "Peer", "StoppableThread"
# "AddrThread", "AnnounceThread", "BMNetworkThread", "Dandelion",
# "DownloadThread", "InvThread", "ReceiveQueueThread", "UploadThread",
]
def start(config, state):
def start(app):
"""Start network threads"""
from addrthread import AddrThread
from dandelion import Dandelion
@ -28,29 +29,31 @@ def start(config, state):
readKnownNodes()
# init, needs to be early because other thread may access it early
Dandelion()
BMConnectionPool().connectToStream(1)
asyncoreThread = BMNetworkThread(state)
# init
pool = BMConnectionPool(app)
pool.connectToStream(1)
asyncoreThread = BMNetworkThread(pool)
asyncoreThread.daemon = True
asyncoreThread.start()
invThread = InvThread(state)
invThread = InvThread(app)
invThread.daemon = True
invThread.start()
addrThread = AddrThread(state)
addrThread = AddrThread(app)
addrThread.daemon = True
addrThread.start()
downloadThread = DownloadThread()
downloadThread = DownloadThread(app)
downloadThread.daemon = True
downloadThread.start()
uploadThread = UploadThread(state) # state is not used
uploadThread = UploadThread(app)
uploadThread.daemon = True
uploadThread.start()
# Optional components
for i in range(config.getint('threads', 'receive')):
receiveQueueThread = ReceiveQueueThread(state, i)
for i in range(app.config.getint('threads', 'receive')):
receiveQueueThread = ReceiveQueueThread(app, i)
receiveQueueThread.daemon = True
receiveQueueThread.start()
if config.safeGetBoolean('bitmessagesettings', 'udp'):
state.announceThread = AnnounceThread(state)
state.announceThread.daemon = True
state.announceThread.start()
if app.config.safeGetBoolean('bitmessagesettings', 'udp'):
app.state.announceThread = AnnounceThread(app)
app.state.announceThread.daemon = True
app.state.announceThread.start()

View File

@ -5,13 +5,12 @@ from six.moves import queue
from helper_random import randomshuffle
from network.assemble import assemble_addr
from network.connectionpool import BMConnectionPool
from queues import addrQueue
from threads import StoppableThread
from threads import StatefulThread
class AddrThread(StoppableThread):
class AddrThread(StatefulThread):
"""(Node) address broadcasting thread"""
name = "AddrBroadcaster"
@ -40,7 +39,8 @@ class AddrThread(StoppableThread):
continue
filtered.append((stream, peer, seen))
if filtered:
i.append_write_buf(assemble_addr(filtered))
i.append_write_buf(
self.protocol.assembleAddrMessage(filtered))
addrQueue.iterate()
for i in range(len(chunk)):

View File

@ -3,14 +3,12 @@ Announce myself (node address)
"""
import time
from bmconfigparser import BMConfigParser
from network.assemble import assemble_addr
from network.connectionpool import BMConnectionPool
from connectionpool import BMConnectionPool
from node import Peer
from threads import StoppableThread
from threads import StatefulThread
class AnnounceThread(StoppableThread):
class AnnounceThread(StatefulThread):
"""A thread to manage regular announcing of this node"""
name = "Announcer"
announceInterval = 60
@ -31,11 +29,9 @@ class AnnounceThread(StoppableThread):
if not connection.announcing:
continue
for stream in self.state.streamsInWhichIAmParticipating:
addr = (
stream,
Peer(
addr = (stream, Peer(
'127.0.0.1',
BMConfigParser().safeGetInt(
'bitmessagesettings', 'port')),
self.config.safeGetInt('bitmessagesettings', 'port')),
time.time())
connection.append_write_buf(assemble_addr([addr]))
connection.append_write_buf(
self.protocol.assembleAddrMessage([addr]))

View File

@ -1,31 +0,0 @@
"""
Create bitmessage protocol command packets
"""
import struct
import addresses
from network.constants import MAX_ADDR_COUNT
from network.node import Peer
from protocol import CreatePacket, encodeHost
def assemble_addr(peerList):
"""Create address command"""
if isinstance(peerList, Peer):
peerList = [peerList]
if not peerList:
return b''
retval = b''
for i in range(0, len(peerList), MAX_ADDR_COUNT):
payload = addresses.encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT]))
for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]:
# 64-bit time
payload += struct.pack('>Q', timestamp)
payload += struct.pack('>I', stream)
# service bit flags offered by this node
payload += struct.pack('>q', 1)
payload += encodeHost(peer.host)
# remote port
payload += struct.pack('>H', peer.port)
retval += CreatePacket('addr', payload)
return retval

View File

@ -1,77 +0,0 @@
"""
Select which node to connect to
"""
# pylint: disable=too-many-branches
import logging
import random # nosec
import knownnodes
import protocol
import state
from bmconfigparser import BMConfigParser
from queues import queue, portCheckerQueue
logger = logging.getLogger('default')
def getDiscoveredPeer():
"""Get a peer from the local peer discovery list"""
try:
peer = random.choice(state.discoveredPeers.keys())
except (IndexError, KeyError):
raise ValueError
try:
del state.discoveredPeers[peer]
except KeyError:
pass
return peer
def chooseConnection(stream):
"""Returns an appropriate connection"""
haveOnion = BMConfigParser().safeGet(
"bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS'
onionOnly = BMConfigParser().safeGetBoolean(
"bitmessagesettings", "onionservicesonly")
try:
retval = portCheckerQueue.get(False)
portCheckerQueue.task_done()
return retval
except queue.Empty:
pass
# with a probability of 0.5, connect to a discovered peer
if random.choice((False, True)) and not haveOnion:
# discovered peers are already filtered by allowed streams
return getDiscoveredPeer()
for _ in range(50):
peer = random.choice(knownnodes.knownNodes[stream].keys())
try:
peer_info = knownnodes.knownNodes[stream][peer]
if peer_info.get('self'):
continue
rating = peer_info["rating"]
except TypeError:
logger.warning('Error in %s', peer)
rating = 0
if haveOnion:
# do not connect to raw IP addresses
# --keep all traffic within Tor overlay
if onionOnly and not peer.host.endswith('.onion'):
continue
# onion addresses have a higher priority when SOCKS
if peer.host.endswith('.onion') and rating > 0:
rating = 1
# TODO: need better check
elif not peer.host.startswith('bootstrap'):
encodedAddr = protocol.encodeHost(peer.host)
# don't connect to local IPs when using SOCKS
if not protocol.checkIPAddress(encodedAddr, False):
continue
if rating > 1:
rating = 1
try:
if 0.05 / (1.0 - rating) > random.random():
return peer
except ZeroDivisionError:
return peer
raise ValueError

View File

@ -3,29 +3,91 @@
"""
import errno
import logging
import random # noseq
import re
import socket
import sys
import time
import asyncore_pollchoose as asyncore
# magic imports
import helper_random
from queues import queue, portCheckerQueue
from singleton import Singleton
import asyncore_pollchoose as asyncore
import knownnodes
import protocol
import state
from bmconfigparser import BMConfigParser
from connectionchooser import chooseConnection
from advanceddispatcher import AdvancedDispatcher
from node import Peer
from proxy import Proxy
from singleton import Singleton
from tcp import (
bootstrap, Socks4aBMConnection, Socks5BMConnection,
TCPConnection, TCPServer)
bootstrap, Socks4aBMConnection, Socks5BMConnection, TCPConnection)
from udp import UDPSocket
logger = logging.getLogger('default')
class TCPServer(AdvancedDispatcher):
"""TCP connection server for Bitmessage protocol"""
def __init__(self, pool, host='127.0.0.1', port=8444):
self.pool = pool
if not hasattr(self, '_map'):
AdvancedDispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
for attempt in range(50):
try:
if attempt > 0:
logger.warning('Failed to bind on port %s', port)
port = random.randint(32767, 65535)
self.bind((host, port))
except socket.error as e:
if e.errno in (asyncore.EADDRINUSE, asyncore.WSAEADDRINUSE):
continue
else:
if attempt > 0:
logger.warning('Setting port to %s', port)
self.pool.config.set(
'bitmessagesettings', 'port', str(port))
self.pool.config.save()
break
self.destination = Peer(host, port)
self.bound = True
self.listen(5)
def is_bound(self):
"""Is the socket bound?"""
try:
return self.bound
except AttributeError:
return False
def handle_accept(self):
"""Incoming connection callback"""
try:
sock = self.accept()[0]
except (TypeError, IndexError):
return
self.pool.state.ownAddresses[Peer(*sock.getsockname())] = True
if (
len(self.pool)
> self.pool.config.safeGetInt(
'bitmessagesettings', 'maxtotalconnections')
+ self.pool.config.safeGetInt(
'bitmessagesettings', 'maxbootstrapconnections') + 10
):
# 10 is a sort of buffer, in between it will go through
# the version handshake and return an error to the peer
logger.warning("Server full, dropping connection")
sock.close()
return
try:
self.pool.addConnection(TCPConnection(self.pool, sock=sock))
except socket.error:
pass
@Singleton
class BMConnectionPool(object):
"""Pool of all existing connections"""
@ -44,11 +106,16 @@ class BMConnectionPool(object):
without compromising security.
"""
def __init__(self):
def __init__(self, app=None):
if not app:
return # for singleton convenience
self.config = app.config
self.protocol = app.protocol
self.state = app.state
asyncore.set_rates(
BMConfigParser().safeGetInt(
app.config.safeGetInt(
"bitmessagesettings", "maxdownloadrate"),
BMConfigParser().safeGetInt(
app.config.safeGetInt(
"bitmessagesettings", "maxuploadrate")
)
self.outboundConnections = {}
@ -60,7 +127,7 @@ class BMConnectionPool(object):
self._spawnWait = 2
self._bootstrapped = False
trustedPeer = BMConfigParser().safeGet(
trustedPeer = app.config.safeGet(
'bitmessagesettings', 'trustedpeer')
try:
if trustedPeer:
@ -90,7 +157,7 @@ class BMConnectionPool(object):
def connectToStream(self, streamNumber):
"""Connect to a bitmessage stream"""
self.streams.append(streamNumber)
state.streamsInWhichIAmParticipating.append(streamNumber)
self.state.streamsInWhichIAmParticipating.append(streamNumber)
def getConnectionByAddr(self, addr):
"""
@ -160,32 +227,30 @@ class BMConnectionPool(object):
pass
connection.handle_close()
@staticmethod
def getListeningIP():
def getListeningIP(self):
"""What IP are we supposed to be listening on?"""
if BMConfigParser().safeGet(
if self.config.safeGet(
"bitmessagesettings", "onionhostname").endswith(".onion"):
host = BMConfigParser().safeGet(
"bitmessagesettings", "onionbindip")
host = self.config.safeGet("bitmessagesettings", "onionbindip")
else:
host = '127.0.0.1'
if (
BMConfigParser().safeGetBoolean("bitmessagesettings", "sockslisten")
or BMConfigParser().safeGet("bitmessagesettings", "socksproxytype")
self.config.safeGetBoolean("bitmessagesettings", "sockslisten")
or self.config.safeGet("bitmessagesettings", "socksproxytype")
== "none"
):
# python doesn't like bind + INADDR_ANY?
# host = socket.INADDR_ANY
host = BMConfigParser().get("network", "bind")
host = self.config.get("network", "bind")
return host
def startListening(self, bind=None):
"""Open a listening socket and start accepting connections on it"""
if bind is None:
bind = self.getListeningIP()
port = BMConfigParser().safeGetInt("bitmessagesettings", "port")
port = self.config.safeGetInt("bitmessagesettings", "port")
# correct port even if it changed
ls = TCPServer(host=bind, port=port)
ls = TCPServer(self, host=bind, port=port)
self.listeningSockets[ls.destination] = ls
def startUDPSocket(self, bind=None):
@ -195,17 +260,17 @@ class BMConnectionPool(object):
"""
if bind is None:
host = self.getListeningIP()
udpSocket = UDPSocket(host=host, announcing=True)
udpSocket = UDPSocket(self, host=host, announcing=True)
else:
if bind is False:
udpSocket = UDPSocket(announcing=False)
udpSocket = UDPSocket(self, announcing=False)
else:
udpSocket = UDPSocket(host=bind, announcing=True)
udpSocket = UDPSocket(self, host=bind, announcing=True)
self.udpSockets[udpSocket.listening.host] = udpSocket
def startBootstrappers(self):
"""Run the process of resolving bootstrap hostnames"""
proxy_type = BMConfigParser().safeGet(
proxy_type = self.config.safeGet(
'bitmessagesettings', 'socksproxytype')
# A plugins may be added here
hostname = None
@ -229,7 +294,68 @@ class BMConnectionPool(object):
hostname = 'bootstrap%s.bitmessage.org' % port
else:
port = 8444
self.addConnection(bootstrapper(hostname, port))
self.addConnection(bootstrapper(self, hostname, port))
def chooseConnection(self, stream):
"""Returns an appropriate connection"""
def getDiscoveredPeer():
"""Get a peer from the local peer discovery list"""
try:
peer = random.choice(self.state.discoveredPeers.keys())
except (IndexError, KeyError):
raise ValueError
try:
del self.state.discoveredPeers[peer]
except KeyError:
pass
return peer
haveOnion = self.config.safeGet(
"bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS'
onionOnly = self.config.safeGetBoolean(
"bitmessagesettings", "onionservicesonly")
try:
retval = portCheckerQueue.get(False)
portCheckerQueue.task_done()
return retval
except queue.Empty:
pass
# with a probability of 0.5, connect to a discovered peer
if random.choice((False, True)) and not haveOnion:
# discovered peers are already filtered by allowed streams
return getDiscoveredPeer()
for _ in range(50):
peer = random.choice(knownnodes.knownNodes[stream].keys())
try:
peer_info = knownnodes.knownNodes[stream][peer]
if peer_info.get('self'):
continue
rating = peer_info["rating"]
except TypeError:
logger.warning('Error in %s', peer)
rating = 0
if haveOnion:
# do not connect to raw IP addresses
# --keep all traffic within Tor overlay
if onionOnly and not peer.host.endswith('.onion'):
continue
# onion addresses have a higher priority when SOCKS
if peer.host.endswith('.onion') and rating > 0:
rating = 1
# TODO: need better check
elif not peer.host.startswith('bootstrap'):
encodedAddr = self.protocol.encodeHost(peer.host)
# don't connect to local IPs when using SOCKS
if not self.protocol.checkIPAddress(encodedAddr, False):
continue
if rating > 1:
rating = 1
try:
if 0.05 / (1.0 - rating) > random.random():
return peer
except ZeroDivisionError:
return peer
raise ValueError
def loop(self): # pylint: disable=too-many-branches,too-many-statements
"""Main Connectionpool's loop"""
@ -237,21 +363,20 @@ class BMConnectionPool(object):
# defaults to empty loop if outbound connections are maxed
spawnConnections = False
acceptConnections = True
if BMConfigParser().safeGetBoolean(
'bitmessagesettings', 'dontconnect'):
if self.config.safeGetBoolean('bitmessagesettings', 'dontconnect'):
acceptConnections = False
elif BMConfigParser().safeGetBoolean(
elif self.config.safeGetBoolean(
'bitmessagesettings', 'sendoutgoingconnections'):
spawnConnections = True
socksproxytype = BMConfigParser().safeGet(
socksproxytype = self.config.safeGet(
'bitmessagesettings', 'socksproxytype', '')
onionsocksproxytype = BMConfigParser().safeGet(
onionsocksproxytype = self.config.safeGet(
'bitmessagesettings', 'onionsocksproxytype', '')
if (
socksproxytype[:5] == 'SOCKS'
and not BMConfigParser().safeGetBoolean(
and not self.config.safeGetBoolean(
'bitmessagesettings', 'sockslisten')
and '.onion' not in BMConfigParser().safeGet(
and '.onion' not in self.config.safeGet(
'bitmessagesettings', 'onionhostname', '')
):
acceptConnections = False
@ -264,9 +389,9 @@ class BMConnectionPool(object):
if not self._bootstrapped:
self._bootstrapped = True
Proxy.proxy = (
BMConfigParser().safeGet(
self.config.safeGet(
'bitmessagesettings', 'sockshostname'),
BMConfigParser().safeGetInt(
self.config.safeGetInt(
'bitmessagesettings', 'socksport')
)
# TODO AUTH
@ -275,9 +400,9 @@ class BMConnectionPool(object):
if not onionsocksproxytype.startswith("SOCKS"):
raise ValueError
Proxy.onion_proxy = (
BMConfigParser().safeGet(
self.config.safeGet(
'network', 'onionsockshostname', None),
BMConfigParser().safeGet(
self.config.safeGet(
'network', 'onionsocksport', None)
)
except ValueError:
@ -286,12 +411,13 @@ class BMConnectionPool(object):
1 for c in self.outboundConnections.values()
if (c.connected and c.fullyEstablished))
pending = len(self.outboundConnections) - established
if established < BMConfigParser().safeGetInt(
if established < self.config.safeGetInt(
'bitmessagesettings', 'maxoutboundconnections'):
for i in range(
state.maximumNumberOfHalfOpenConnections - pending):
self.state.maximumNumberOfHalfOpenConnections - pending
):
try:
chosen = self.trustedPeer or chooseConnection(
chosen = self.trustedPeer or self.chooseConnection(
helper_random.randomchoice(self.streams))
except ValueError:
continue
@ -300,11 +426,11 @@ class BMConnectionPool(object):
if chosen.host in self.inboundConnections:
continue
# don't connect to self
if chosen in state.ownAddresses:
if chosen in self.state.ownAddresses:
continue
# don't connect to the hosts from the same
# network group, defense against sibyl attacks
host_network_group = protocol.network_group(
host_network_group = self.protocol.network_group(
chosen.host)
same_group = False
for j in self.outboundConnections.values():
@ -319,15 +445,19 @@ class BMConnectionPool(object):
try:
if chosen.host.endswith(".onion") and Proxy.onion_proxy:
if onionsocksproxytype == "SOCKS5":
self.addConnection(Socks5BMConnection(chosen))
self.addConnection(
Socks5BMConnection(self, chosen))
elif onionsocksproxytype == "SOCKS4a":
self.addConnection(Socks4aBMConnection(chosen))
self.addConnection(
Socks4aBMConnection(self, chosen))
elif socksproxytype == "SOCKS5":
self.addConnection(Socks5BMConnection(chosen))
self.addConnection(
Socks5BMConnection(self, chosen))
elif socksproxytype == "SOCKS4a":
self.addConnection(Socks4aBMConnection(chosen))
self.addConnection(
Socks4aBMConnection(self, chosen))
else:
self.addConnection(TCPConnection(chosen))
self.addConnection(TCPConnection(self, chosen))
except socket.error as e:
if e.errno == errno.ENETUNREACH:
continue
@ -340,22 +470,21 @@ class BMConnectionPool(object):
if acceptConnections:
if not self.listeningSockets:
if BMConfigParser().safeGet('network', 'bind') == '':
if self.config.safeGet('network', 'bind') == '':
self.startListening()
else:
for bind in re.sub(
r'[^\w.]+', ' ',
BMConfigParser().safeGet('network', 'bind')
r'[^\w.]+', ' ', self.config.safeGet('network', 'bind')
).split():
self.startListening(bind)
logger.info('Listening for incoming connections.')
if not self.udpSockets:
if BMConfigParser().safeGet('network', 'bind') == '':
if self.config.safeGet('network', 'bind') == '':
self.startUDPSocket()
else:
for bind in re.sub(
r'[^\w.]+', ' ',
BMConfigParser().safeGet('network', 'bind')
self.config.safeGet('network', 'bind')
).split():
self.startUDPSocket(bind)
self.startUDPSocket(False)
@ -384,7 +513,7 @@ class BMConnectionPool(object):
minTx -= 300 - 20
if i.lastTx < minTx:
if i.fullyEstablished:
i.append_write_buf(protocol.CreatePacket('ping'))
i.append_write_buf(self.protocol.CreatePacket('ping'))
else:
i.close_reason = "Timeout (%is)" % (
time.time() - i.lastTx)

View File

@ -5,8 +5,6 @@ Network protocol constants
#: address is online if online less than this many seconds ago
ADDRESS_ALIVE = 10800
#: protocol specification says max 1000 addresses in one addr command
MAX_ADDR_COUNT = 1000
#: ~1.6 MB which is the maximum possible size of an inv message.
MAX_MESSAGE_SIZE = 1600100
#: 2**18 = 256kB is the maximum size of an object payload

View File

@ -3,17 +3,17 @@
"""
import time
import addresses
import helper_random
import protocol
from inventory import Inventory # magic import
from dandelion import Dandelion
from inventory import Inventory
# TODO: from connectionpool import
from network.connectionpool import BMConnectionPool
from objectracker import missingObjects
from threads import StoppableThread
from threads import StatefulThread
class DownloadThread(StoppableThread):
class DownloadThread(StatefulThread):
"""Thread-based class for downloading from connections"""
minPending = 200
maxRequestChunk = 1000
@ -21,8 +21,8 @@ class DownloadThread(StoppableThread):
cleanInterval = 60
requestExpires = 3600
def __init__(self):
super(DownloadThread, self).__init__(None, name="Downloader")
def __init__(self, app):
super(DownloadThread, self).__init__(app, name="Downloader")
self.lastCleaned = time.time()
def cleanPending(self):
@ -72,8 +72,9 @@ class DownloadThread(StoppableThread):
missingObjects[chunk] = now
if not chunkCount:
continue
payload[0:0] = addresses.encodeVarint(chunkCount)
i.append_write_buf(protocol.CreatePacket('getdata', payload))
payload[0:0] = self.protocol.encodeVarint(chunkCount)
i.append_write_buf(
self.protocol.CreatePacket('getdata', payload))
self.logger.debug(
'%s:%i Requesting %i objects',
i.destination.host, i.destination.port, chunkCount)

View File

@ -5,12 +5,10 @@ import Queue
import random
from time import time
import addresses
import protocol
from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from queues import invQueue
from threads import StoppableThread
from queues import invQueue # FIXME: get from app?
from threads import StatefulThread
def handleExpiredDandelion(expired):
@ -31,7 +29,7 @@ def handleExpiredDandelion(expired):
i.objectsNewToThem[hashid] = time()
class InvThread(StoppableThread):
class InvThread(StatefulThread):
"""Main thread that sends inv annoucements"""
name = "InvBroadcaster"
@ -79,7 +77,9 @@ class InvThread(StoppableThread):
if random.randint(1, 100) >= self.state.dandelion:
fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0:
elif (
connection.services
& self.protocol.NODE_DANDELION > 0):
stems.append(inv[1])
else:
fluffs.append(inv[1])
@ -88,15 +88,15 @@ class InvThread(StoppableThread):
if fluffs:
random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket(
connection.append_write_buf(self.protocol.CreatePacket(
'inv',
addresses.encodeVarint(
self.protocol.encodeVarint(
len(fluffs)) + ''.join(fluffs)))
if stems:
random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket(
connection.append_write_buf(self.protocol.CreatePacket(
'dinv',
addresses.encodeVarint(
self.protocol.encodeVarint(
len(stems)) + ''.join(stems)))
invQueue.iterate()

View File

@ -1,9 +1,10 @@
"""
A thread to handle network concerns
"""
from queues import excQueue # magic import
import network.asyncore_pollchoose as asyncore
from network.connectionpool import BMConnectionPool
from queues import excQueue
from threads import StoppableThread
@ -11,27 +12,31 @@ class BMNetworkThread(StoppableThread):
"""Main network thread"""
name = "Asyncore"
def __init__(self, pool):
self.pool = pool
super(BMNetworkThread, self).__init__(self.name)
def run(self):
try:
while not self._stopped and self.state.shutdown == 0:
BMConnectionPool().loop()
while not self._stopped and self.pool.state.shutdown == 0:
self.pool.loop()
except Exception as e:
excQueue.put((self.name, e))
raise
def stopThread(self):
super(BMNetworkThread, self).stopThread()
for i in BMConnectionPool().listeningSockets.values():
for i in self.pool.listeningSockets.values():
try:
i.close()
except:
pass
for i in BMConnectionPool().outboundConnections.values():
for i in self.pool.outboundConnections.values():
try:
i.close()
except:
pass
for i in BMConnectionPool().inboundConnections.values():
for i in self.pool.inboundConnections.values():
try:
i.close()
except:

View File

@ -8,15 +8,15 @@ import socket
from network.advanceddispatcher import UnknownStateError
from network.connectionpool import BMConnectionPool
from queues import receiveDataQueue
from threads import StoppableThread
from threads import StatefulThread
class ReceiveQueueThread(StoppableThread):
class ReceiveQueueThread(StatefulThread):
"""This thread processes data received from the network
(which is done by the asyncore thread)"""
def __init__(self, state, num=0):
def __init__(self, app, num=0):
super(ReceiveQueueThread, self).__init__(
state, name="ReceiveQueue_%i" % num)
app, name="ReceiveQueue_%i" % num)
def run(self):
while not self._stopped and self.state.shutdown == 0:

View File

@ -5,22 +5,20 @@ TCP protocol handler
import l10n
import logging
import math
import random
import socket
import time
import addresses
import asyncore_pollchoose as asyncore
import connectionpool
# magic imports
import helper_random
import knownnodes
import protocol
import state
from bmconfigparser import BMConfigParser
from helper_random import randomBytes
from inventory import Inventory
from queues import invQueue, receiveDataQueue, UISignalQueue
from tr import _translate
import asyncore_pollchoose as asyncore
import knownnodes
# TODO: not use network. here because this module is inside the network package
from network.advanceddispatcher import AdvancedDispatcher
from network.assemble import assemble_addr
from network.bmproto import BMProto
from network.constants import MAX_OBJECT_COUNT
from network.dandelion import Dandelion
@ -29,8 +27,7 @@ from network.socks4a import Socks4aConnection
from network.socks5 import Socks5Connection
from network.tls import TLSDispatcher
from node import Peer
from queues import invQueue, receiveDataQueue, UISignalQueue
from tr import _translate
logger = logging.getLogger('default')
@ -45,8 +42,10 @@ class TCPConnection(BMProto, TLSDispatcher):
.. todo:: Look to understand and/or fix the non-parent-init-called
"""
def __init__(self, address=None, sock=None):
def __init__(self, pool, address=None, sock=None):
BMProto.__init__(self, address=address, sock=sock)
self.pool = pool
self.protocol = pool.protocol
self.verackReceived = False
self.verackSent = False
self.streams = [0]
@ -60,7 +59,7 @@ class TCPConnection(BMProto, TLSDispatcher):
logger.debug(
'Received connection from %s:%i',
self.destination.host, self.destination.port)
self.nodeid = randomBytes(8)
self.nodeid = helper_random.randomBytes(8)
elif address is not None and sock is not None:
TLSDispatcher.__init__(self, sock, server_side=False)
self.isOutbound = True
@ -81,17 +80,17 @@ class TCPConnection(BMProto, TLSDispatcher):
self.destination.host, self.destination.port)
try:
self.local = (
protocol.checkIPAddress(
protocol.encodeHost(self.destination.host), True)
and not protocol.checkSocksIP(self.destination.host)
self.protocol.checkIPAddress(
self.protocol.encodeHost(self.destination.host), True)
and not self.protocol.checkSocksIP(self.destination.host)
)
except socket.error:
# it's probably a hostname
pass
self.network_group = protocol.network_group(self.destination.host)
self.network_group = self.protocol.network_group(self.destination.host)
ObjectTracker.__init__(self) # pylint: disable=non-parent-init-called
self.bm_proto_reset()
self.set_state("bm_header", expectBytes=protocol.Header.size)
self.set_state("bm_header", expectBytes=self.protocol.Header.size)
def antiIntersectionDelay(self, initial=False):
"""
@ -156,7 +155,7 @@ class TCPConnection(BMProto, TLSDispatcher):
def set_connection_fully_established(self):
"""Initiate inventory synchronisation."""
if not self.isOutbound and not self.local:
state.clientHasReceivedIncomingConnections = True
self.pool.state.clientHasReceivedIncomingConnections = True
UISignalQueue.put(('setStatusIcon', 'green'))
UISignalQueue.put((
'updateNetworkStatusTab', (self.isOutbound, True, self.destination)
@ -164,7 +163,7 @@ class TCPConnection(BMProto, TLSDispatcher):
self.antiIntersectionDelay(True)
self.fullyEstablished = True
# The connection having host suitable for knownnodes
if self.isOutbound or not self.local and not state.socksIP:
if self.isOutbound or not self.local and not self.pool.state.socksIP:
knownnodes.increaseRating(self.destination)
knownnodes.addKnownNode(
self.streams, self.destination, time.time())
@ -177,7 +176,7 @@ class TCPConnection(BMProto, TLSDispatcher):
# We are going to share a maximum number of 1000 addrs (per overlapping
# stream) with our peer. 500 from overlapping streams, 250 from the
# left child stream, and 250 from the right child stream.
maxAddrCount = BMConfigParser().safeGetInt(
maxAddrCount = self.pool.config.safeGetInt(
"bitmessagesettings", "maxaddrperstreamsend", 500)
templist = []
@ -205,7 +204,7 @@ class TCPConnection(BMProto, TLSDispatcher):
for peer, params in addrs[substream]:
templist.append((substream, peer, params["lastseen"]))
if templist:
self.append_write_buf(assemble_addr(templist))
self.append_write_buf(self.protocol.assembleAddrMessage(templist))
def sendBigInv(self):
"""
@ -219,8 +218,8 @@ class TCPConnection(BMProto, TLSDispatcher):
logger.debug(
'Sending huge inv message with %i objects to just this'
' one peer', objectCount)
self.append_write_buf(protocol.CreatePacket(
'inv', addresses.encodeVarint(objectCount) + payload))
self.append_write_buf(self.protocol.CreatePacket(
'inv', self.protocol.encodeVarint(objectCount) + payload))
# Select all hashes for objects in this stream.
bigInvList = {}
@ -263,12 +262,11 @@ class TCPConnection(BMProto, TLSDispatcher):
'%s:%i: Connection failed: %s',
self.destination.host, self.destination.port, e)
return
self.nodeid = randomBytes(8)
self.nodeid = helper_random.randomBytes(8)
self.append_write_buf(
protocol.assembleVersionMessage(
self.protocol.assembleVersionMessage(
self.destination.host, self.destination.port,
connectionpool.BMConnectionPool().streams,
False, nodeid=self.nodeid))
self.pool.streams, False, nodeid=self.nodeid))
self.connectedAt = time.time()
receiveDataQueue.put(self.destination)
@ -283,7 +281,8 @@ class TCPConnection(BMProto, TLSDispatcher):
def handle_close(self):
"""Callback for connection being closed."""
host_is_global = self.isOutbound or not self.local and not state.socksIP
host_is_global = (
self.isOutbound or not self.local and not self.pool.state.socksIP)
if self.fullyEstablished:
UISignalQueue.put((
'updateNetworkStatusTab',
@ -303,9 +302,9 @@ class TCPConnection(BMProto, TLSDispatcher):
class Socks5BMConnection(Socks5Connection, TCPConnection):
"""SOCKS5 wrapper for TCP connections"""
def __init__(self, address):
def __init__(self, pool, address):
Socks5Connection.__init__(self, address=address)
TCPConnection.__init__(self, address=address, sock=self.socket)
TCPConnection.__init__(self, pool, address=address, sock=self.socket)
self.set_state("init")
def state_proxy_handshake_done(self):
@ -314,38 +313,23 @@ class Socks5BMConnection(Socks5Connection, TCPConnection):
Bitmessage handshake to peer.
"""
Socks5Connection.state_proxy_handshake_done(self)
self.nodeid = randomBytes(8)
self.nodeid = helper_random.randomBytes(8)
self.append_write_buf(
protocol.assembleVersionMessage(
self.protocol.assembleVersionMessage(
self.destination.host, self.destination.port,
connectionpool.BMConnectionPool().streams,
self.pool.streams,
False, nodeid=self.nodeid))
self.set_state("bm_header", expectBytes=protocol.Header.size)
self.set_state("bm_header", expectBytes=self.protocol.Header.size)
return True
class Socks4aBMConnection(Socks4aConnection, TCPConnection):
class Socks4aBMConnection(Socks4aConnection, Socks5BMConnection, TCPConnection):
"""SOCKS4a wrapper for TCP connections"""
def __init__(self, address):
def __init__(self, pool, address):
Socks4aConnection.__init__(self, address=address)
TCPConnection.__init__(self, address=address, sock=self.socket)
self.set_state("init")
def state_proxy_handshake_done(self):
"""
State when SOCKS4a connection succeeds, we need to send a
Bitmessage handshake to peer.
"""
Socks4aConnection.state_proxy_handshake_done(self)
self.nodeid = randomBytes(8)
self.append_write_buf(
protocol.assembleVersionMessage(
self.destination.host, self.destination.port,
connectionpool.BMConnectionPool().streams,
False, nodeid=self.nodeid))
self.set_state("bm_header", expectBytes=protocol.Header.size)
return True
Socks5BMConnection.__init__(
self, pool, address=address, sock=self.socket)
def bootstrap(connection_class):
@ -354,8 +338,8 @@ def bootstrap(connection_class):
"""Base class for bootstrappers"""
_connection_base = connection_class
def __init__(self, host, port):
self._connection_base.__init__(self, Peer(host, port))
def __init__(self, pool, host, port):
self._connection_base.__init__(self, pool, Peer(host, port))
self.close_reason = self._succeed = False
def bm_command_addr(self):
@ -384,65 +368,3 @@ def bootstrap(connection_class):
knownnodes.knownNodesActual = False
return Bootstrapper
class TCPServer(AdvancedDispatcher):
"""TCP connection server for Bitmessage protocol"""
def __init__(self, host='127.0.0.1', port=8444):
if not hasattr(self, '_map'):
AdvancedDispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
for attempt in range(50):
try:
if attempt > 0:
logger.warning('Failed to bind on port %s', port)
port = random.randint(32767, 65535)
self.bind((host, port))
except socket.error as e:
if e.errno in (asyncore.EADDRINUSE, asyncore.WSAEADDRINUSE):
continue
else:
if attempt > 0:
logger.warning('Setting port to %s', port)
BMConfigParser().set(
'bitmessagesettings', 'port', str(port))
BMConfigParser().save()
break
self.destination = Peer(host, port)
self.bound = True
self.listen(5)
def is_bound(self):
"""Is the socket bound?"""
try:
return self.bound
except AttributeError:
return False
def handle_accept(self):
"""Incoming connection callback"""
try:
sock = self.accept()[0]
except (TypeError, IndexError):
return
state.ownAddresses[Peer(*sock.getsockname())] = True
if (
len(connectionpool.BMConnectionPool())
> BMConfigParser().safeGetInt(
'bitmessagesettings', 'maxtotalconnections')
+ BMConfigParser().safeGetInt(
'bitmessagesettings', 'maxbootstrapconnections') + 10
):
# 10 is a sort of buffer, in between it will go through
# the version handshake and return an error to the peer
logger.warning("Server full, dropping connection")
sock.close()
return
try:
connectionpool.BMConnectionPool().addConnection(
TCPConnection(sock=sock))
except socket.error:
pass

View File

@ -11,8 +11,7 @@ class StoppableThread(threading.Thread):
name = None
logger = logging.getLogger('default')
def __init__(self, state, name=None):
self.state = state
def __init__(self, name=None):
if name:
self.name = name
super(StoppableThread, self).__init__(name=self.name)
@ -27,6 +26,18 @@ class StoppableThread(threading.Thread):
self.stop.set()
class StatefulThread(StoppableThread):
"""
Base class for the network thread which uses protocol,
app config or state and initialized with an app object.
"""
def __init__(self, app, name=None):
self.state = app.state
self.config = app.config
self.protocol = app.protocol
super(StatefulThread, self).__init__(name=name or self.name)
class BusyError(threading.ThreadError):
"""
Thread error raised when another connection holds the lock

View File

@ -7,8 +7,9 @@ import socket
import ssl
import sys
import paths # magic import
import network.asyncore_pollchoose as asyncore
import paths
from network.advanceddispatcher import AdvancedDispatcher
from queues import receiveDataQueue

View File

@ -5,13 +5,13 @@ import logging
import socket
import time
import protocol
import state
from queues import receiveDataQueue # magic import
from bmproto import BMProto
from constants import MAX_TIME_OFFSET
from node import Peer
from objectracker import ObjectTracker
from queues import receiveDataQueue
logger = logging.getLogger('default')
@ -20,9 +20,10 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
"""Bitmessage protocol over UDP (class)"""
port = 8444
def __init__(self, host=None, sock=None, announcing=False):
# pylint: disable=bad-super-call
super(BMProto, self).__init__(sock=sock)
def __init__(self, pool, host=None, sock=None, announcing=False):
self.pool = pool
self.protocol = pool.protocol
super(UDPSocket, self).__init__(sock=sock)
self.verackReceived = True
self.verackSent = True
# .. todo:: sort out streams
@ -48,7 +49,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
self.connecting = False
self.connected = True
self.announcing = announcing
self.set_state("bm_header", expectBytes=protocol.Header.size)
self.set_state("bm_header", expectBytes=self.protocol.Header.size)
def set_socket_reuse(self):
"""Set socket reuse option"""
@ -78,8 +79,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
return True
remoteport = False
for seenTime, stream, _, ip, port in addresses:
decodedIP = protocol.checkIPAddress(str(ip))
if stream not in state.streamsInWhichIAmParticipating:
decodedIP = self.protocol.checkIPAddress(str(ip))
if stream not in self.pool.state.streamsInWhichIAmParticipating:
continue
if (seenTime < time.time() - MAX_TIME_OFFSET
or seenTime > time.time() + MAX_TIME_OFFSET):
@ -93,8 +94,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
logger.debug(
"received peer discovery from %s:%i (port %i):",
self.destination.host, self.destination.port, remoteport)
state.discoveredPeers[Peer(self.destination.host, remoteport)] = \
time.time()
self.pool.state.discoveredPeers[
Peer(self.destination.host, remoteport)] = time.time()
return True
def bm_command_portcheck(self):
@ -129,8 +130,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
return
self.destination = Peer(*addr)
encodedAddr = protocol.encodeHost(addr[0])
self.local = bool(protocol.checkIPAddress(encodedAddr, True))
encodedAddr = self.protocol.encodeHost(addr[0])
self.local = bool(self.protocol.checkIPAddress(encodedAddr, True))
# overwrite the old buffer to avoid mixing data and so that
# self.local works correctly
self.read_buf[0:] = recdata

View File

@ -4,15 +4,14 @@
import time
import helper_random
import protocol
from inventory import Inventory
# TODo: above two should also be supplied with app or app.protocol
from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from randomtrackingdict import RandomTrackingDict
from threads import StoppableThread
from threads import StatefulThread
class UploadThread(StoppableThread):
class UploadThread(StatefulThread):
"""
This is a thread that uploads the objects that the peers requested from me
"""
@ -34,7 +33,7 @@ class UploadThread(StoppableThread):
continue
try:
request = i.pendingUpload.randomKeys(
RandomTrackingDict.maxPending)
self.protocol.RandomTrackingDict.maxPending)
except KeyError:
continue
payload = bytearray()
@ -49,7 +48,7 @@ class UploadThread(StoppableThread):
i.destination)
break
try:
payload.extend(protocol.CreatePacket(
payload.extend(self.protocol.CreatePacket(
'object', Inventory()[chunk].payload))
chunk_count += 1
except KeyError:

View File

@ -22,8 +22,14 @@ from bmconfigparser import BMConfigParser
from debug import logger
from fallback import RIPEMD160Hash
from helper_sql import sqlExecute
from network.node import Peer
from randomtrackingdict import RandomTrackingDict
from version import softwareVersion
#: protocol specification says max 1000 addresses in one addr command
MAX_ADDR_COUNT = 1000
# Service flags
#: This is a normal network node
NODE_NETWORK = 1
@ -295,6 +301,28 @@ def CreatePacket(command, payload=b''):
return bytes(b)
def assembleAddrMessage(peerList):
"""Create address command"""
if isinstance(peerList, Peer):
peerList = [peerList]
if not peerList:
return b''
retval = b''
for i in range(0, len(peerList), MAX_ADDR_COUNT):
payload = encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT]))
for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]:
# 64-bit time
payload += pack('>Q', timestamp)
payload += pack('>I', stream)
# service bit flags offered by this node
payload += pack('>q', 1)
payload += encodeHost(peer.host)
# remote port
payload += pack('>H', peer.port)
retval += CreatePacket('addr', payload)
return retval
def assembleVersionMessage(
remoteHost, remotePort, participatingStreams, server=False, nodeid=None
):

View File

@ -5,7 +5,7 @@ Singleton decorator definition
from functools import wraps
def Singleton(cls):
def Singleton(cls, *args):
"""
Decorator implementing the singleton pattern:
it restricts the instantiation of a class to one "single" instance.
@ -14,9 +14,9 @@ def Singleton(cls):
# https://github.com/sphinx-doc/sphinx/issues/3783
@wraps(cls)
def getinstance():
def getinstance(*args):
"""Find an instance or save newly created one"""
if cls not in instances:
instances[cls] = cls()
instances[cls] = cls(*args)
return instances[cls]
return getinstance