network.BMConnectionPool: added shortcuts connections()

and establishedConnections(), some formatting fixes
This commit is contained in:
Dmitri Bogomolov 2019-11-03 14:09:00 +02:00
parent 4d8d9b169f
commit 7a1f803c92
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13
8 changed files with 77 additions and 104 deletions

View File

@ -1,5 +1,5 @@
""" """
The singleCleaner class is a timer-driven thread that cleans data structures The `singleCleaner` class is a timer-driven thread that cleans data structures
to free memory, resends messages when a remote node doesn't respond, and to free memory, resends messages when a remote node doesn't respond, and
sends pong messages to keep connections alive if the network isn't busy. sends pong messages to keep connections alive if the network isn't busy.
@ -45,12 +45,12 @@ class singleCleaner(StoppableThread):
try: try:
shared.maximumLengthOfTimeToBotherResendingMessages = ( shared.maximumLengthOfTimeToBotherResendingMessages = (
float(BMConfigParser().get( float(BMConfigParser().get(
'bitmessagesettings', 'stopresendingafterxdays')) * 'bitmessagesettings', 'stopresendingafterxdays'))
24 * 60 * 60 * 24 * 60 * 60
) + ( ) + (
float(BMConfigParser().get( float(BMConfigParser().get(
'bitmessagesettings', 'stopresendingafterxmonths')) * 'bitmessagesettings', 'stopresendingafterxmonths'))
(60 * 60 * 24 * 365) / 12) * (60 * 60 * 24 * 365) / 12)
except: except:
# Either the user hasn't set stopresendingafterxdays and # Either the user hasn't set stopresendingafterxdays and
# stopresendingafterxmonths yet or the options are missing # stopresendingafterxmonths yet or the options are missing
@ -92,8 +92,8 @@ class singleCleaner(StoppableThread):
"SELECT toaddress, ackdata, status FROM sent" "SELECT toaddress, ackdata, status FROM sent"
" WHERE ((status='awaitingpubkey' OR status='msgsent')" " WHERE ((status='awaitingpubkey' OR status='msgsent')"
" AND folder='sent' AND sleeptill<? AND senttime>?)", " AND folder='sent' AND sleeptill<? AND senttime>?)",
int(time.time()), int(time.time()) - int(time.time()), int(time.time())
shared.maximumLengthOfTimeToBotherResendingMessages - shared.maximumLengthOfTimeToBotherResendingMessages
) )
for row in queryreturn: for row in queryreturn:
if len(row) < 2: if len(row) < 2:
@ -139,9 +139,7 @@ class singleCleaner(StoppableThread):
# thread.downloadQueue.clear() # thread.downloadQueue.clear()
# inv/object tracking # inv/object tracking
for connection in \ for connection in BMConnectionPool().connections():
BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
connection.clean() connection.clean()
# discovery tracking # discovery tracking

View File

@ -645,10 +645,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
@staticmethod @staticmethod
def stopDownloadingObject(hashId, forwardAnyway=False): def stopDownloadingObject(hashId, forwardAnyway=False):
"""Stop downloading an object""" """Stop downloading an object"""
for connection in ( for connection in connectionpool.BMConnectionPool().connections():
connectionpool.BMConnectionPool().inboundConnections.values() +
connectionpool.BMConnectionPool().outboundConnections.values()
):
try: try:
del connection.objectsNewToMe[hashId] del connection.objectsNewToMe[hashId]
except KeyError: except KeyError:

View File

@ -1,6 +1,5 @@
""" """
src/network/connectionpool.py `BMConnectionPool` class definition
==================================
""" """
import errno import errno
import logging import logging
@ -26,9 +25,10 @@ logger = logging.getLogger('default')
@Singleton @Singleton
# pylint: disable=too-many-instance-attributes
class BMConnectionPool(object): class BMConnectionPool(object):
"""Pool of all existing connections""" """Pool of all existing connections"""
# pylint: disable=too-many-instance-attributes
def __init__(self): def __init__(self):
asyncore.set_rates( asyncore.set_rates(
BMConfigParser().safeGetInt( BMConfigParser().safeGetInt(
@ -41,9 +41,21 @@ class BMConnectionPool(object):
self.listeningSockets = {} self.listeningSockets = {}
self.udpSockets = {} self.udpSockets = {}
self.streams = [] self.streams = []
self.lastSpawned = 0 self._lastSpawned = 0
self.spawnWait = 2 self._spawnWait = 2
self.bootstrapped = False self._bootstrapped = False
def connections(self):
"""
Shortcut for combined list of connections from
`inboundConnections` and `outboundConnections` dicts
"""
return self.inboundConnections.values() + self.outboundConnections.values()
def establishedConnections(self):
"""Shortcut for list of connections having fullyEstablished == True"""
return [
x for x in self.connections() if x.fullyEstablished]
def connectToStream(self, streamNumber): def connectToStream(self, streamNumber):
"""Connect to a bitmessage stream""" """Connect to a bitmessage stream"""
@ -74,10 +86,7 @@ class BMConnectionPool(object):
def isAlreadyConnected(self, nodeid): def isAlreadyConnected(self, nodeid):
"""Check if we're already connected to this peer""" """Check if we're already connected to this peer"""
for i in ( for i in self.connections():
self.inboundConnections.values() +
self.outboundConnections.values()
):
try: try:
if nodeid == i.nodeid: if nodeid == i.nodeid:
return True return True
@ -129,10 +138,11 @@ class BMConnectionPool(object):
"bitmessagesettings", "onionbindip") "bitmessagesettings", "onionbindip")
else: else:
host = '127.0.0.1' host = '127.0.0.1'
if (BMConfigParser().safeGetBoolean( if (
"bitmessagesettings", "sockslisten") or BMConfigParser().safeGetBoolean("bitmessagesettings", "sockslisten")
BMConfigParser().safeGet( or BMConfigParser().safeGet("bitmessagesettings", "socksproxytype")
"bitmessagesettings", "socksproxytype") == "none"): == "none"
):
# python doesn't like bind + INADDR_ANY? # python doesn't like bind + INADDR_ANY?
# host = socket.INADDR_ANY # host = socket.INADDR_ANY
host = BMConfigParser().get("network", "bind") host = BMConfigParser().get("network", "bind")
@ -205,11 +215,13 @@ class BMConnectionPool(object):
'bitmessagesettings', 'socksproxytype', '') 'bitmessagesettings', 'socksproxytype', '')
onionsocksproxytype = BMConfigParser().safeGet( onionsocksproxytype = BMConfigParser().safeGet(
'bitmessagesettings', 'onionsocksproxytype', '') 'bitmessagesettings', 'onionsocksproxytype', '')
if (socksproxytype[:5] == 'SOCKS' and if (
not BMConfigParser().safeGetBoolean( socksproxytype[:5] == 'SOCKS'
'bitmessagesettings', 'sockslisten') and and not BMConfigParser().safeGetBoolean(
'.onion' not in BMConfigParser().safeGet( 'bitmessagesettings', 'sockslisten')
'bitmessagesettings', 'onionhostname', '')): and '.onion' not in BMConfigParser().safeGet(
'bitmessagesettings', 'onionhostname', '')
):
acceptConnections = False acceptConnections = False
# pylint: disable=too-many-nested-blocks # pylint: disable=too-many-nested-blocks
@ -217,8 +229,8 @@ class BMConnectionPool(object):
if not knownnodes.knownNodesActual: if not knownnodes.knownNodesActual:
self.startBootstrappers() self.startBootstrappers()
knownnodes.knownNodesActual = True knownnodes.knownNodesActual = True
if not self.bootstrapped: if not self._bootstrapped:
self.bootstrapped = True self._bootstrapped = True
Proxy.proxy = ( Proxy.proxy = (
BMConfigParser().safeGet( BMConfigParser().safeGet(
'bitmessagesettings', 'sockshostname'), 'bitmessagesettings', 'sockshostname'),
@ -260,8 +272,7 @@ class BMConnectionPool(object):
continue continue
try: try:
if (chosen.host.endswith(".onion") and if chosen.host.endswith(".onion") and Proxy.onion_proxy:
Proxy.onion_proxy is not None):
if onionsocksproxytype == "SOCKS5": if onionsocksproxytype == "SOCKS5":
self.addConnection(Socks5BMConnection(chosen)) self.addConnection(Socks5BMConnection(chosen))
elif onionsocksproxytype == "SOCKS4a": elif onionsocksproxytype == "SOCKS4a":
@ -276,12 +287,9 @@ class BMConnectionPool(object):
if e.errno == errno.ENETUNREACH: if e.errno == errno.ENETUNREACH:
continue continue
self.lastSpawned = time.time() self._lastSpawned = time.time()
else: else:
for i in ( for i in self.connections():
self.inboundConnections.values() +
self.outboundConnections.values()
):
# FIXME: rating will be increased after next connection # FIXME: rating will be increased after next connection
i.handle_close() i.handle_close()
@ -291,8 +299,8 @@ class BMConnectionPool(object):
self.startListening() self.startListening()
else: else:
for bind in re.sub( for bind in re.sub(
'[^\w.]+', ' ', # pylint: disable=anomalous-backslash-in-string r'[^\w.]+', ' ',
BMConfigParser().safeGet('network', 'bind') BMConfigParser().safeGet('network', 'bind')
).split(): ).split():
self.startListening(bind) self.startListening(bind)
logger.info('Listening for incoming connections.') logger.info('Listening for incoming connections.')
@ -301,8 +309,8 @@ class BMConnectionPool(object):
self.startUDPSocket() self.startUDPSocket()
else: else:
for bind in re.sub( for bind in re.sub(
'[^\w.]+', ' ', # pylint: disable=anomalous-backslash-in-string r'[^\w.]+', ' ',
BMConfigParser().safeGet('network', 'bind') BMConfigParser().safeGet('network', 'bind')
).split(): ).split():
self.startUDPSocket(bind) self.startUDPSocket(bind)
self.startUDPSocket(False) self.startUDPSocket(False)
@ -319,16 +327,13 @@ class BMConnectionPool(object):
i.accepting = i.connecting = i.connected = False i.accepting = i.connecting = i.connected = False
logger.info('Stopped udp sockets.') logger.info('Stopped udp sockets.')
loopTime = float(self.spawnWait) loopTime = float(self._spawnWait)
if self.lastSpawned < time.time() - self.spawnWait: if self._lastSpawned < time.time() - self._spawnWait:
loopTime = 2.0 loopTime = 2.0
asyncore.loop(timeout=loopTime, count=1000) asyncore.loop(timeout=loopTime, count=1000)
reaper = [] reaper = []
for i in ( for i in self.connections():
self.inboundConnections.values() +
self.outboundConnections.values()
):
minTx = time.time() - 20 minTx = time.time() - 20
if i.fullyEstablished: if i.fullyEstablished:
minTx -= 300 - 20 minTx -= 300 - 20
@ -340,10 +345,8 @@ class BMConnectionPool(object):
time.time() - i.lastTx) time.time() - i.lastTx)
i.set_state("close") i.set_state("close")
for i in ( for i in (
self.inboundConnections.values() + self.connections()
self.outboundConnections.values() + + self.listeningSockets.values() + self.udpSockets.values()
self.listeningSockets.values() +
self.udpSockets.values()
): ):
if not (i.accepting or i.connecting or i.connected): if not (i.accepting or i.connecting or i.connected):
reaper.append(i) reaper.append(i)

View File

@ -1,6 +1,5 @@
""" """
src/network/downloadthread.py `DownloadThread` class definition
=============================
""" """
import time import time
@ -29,7 +28,7 @@ class DownloadThread(StoppableThread):
def cleanPending(self): def cleanPending(self):
"""Expire pending downloads eventually""" """Expire pending downloads eventually"""
deadline = time.time() - DownloadThread.requestExpires deadline = time.time() - self.requestExpires
try: try:
toDelete = [k for k, v in missingObjects.iteritems() if v < deadline] toDelete = [k for k, v in missingObjects.iteritems() if v < deadline]
except RuntimeError: except RuntimeError:
@ -43,15 +42,12 @@ class DownloadThread(StoppableThread):
while not self._stopped: while not self._stopped:
requested = 0 requested = 0
# Choose downloading peers randomly # Choose downloading peers randomly
connections = [ connections = BMConnectionPool().establishedConnections()
x for x in
BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values()
if x.fullyEstablished]
helper_random.randomshuffle(connections) helper_random.randomshuffle(connections)
try: requestChunk = max(int(
requestChunk = max(int(min(DownloadThread.maxRequestChunk, len(missingObjects)) / len(connections)), 1) min(self.maxRequestChunk, len(missingObjects))
except ZeroDivisionError: / len(connections)), 1) if connections else 1
requestChunk = 1
for i in connections: for i in connections:
now = time.time() now = time.time()
# avoid unnecessary delay # avoid unnecessary delay
@ -81,7 +77,7 @@ class DownloadThread(StoppableThread):
'%s:%i Requesting %i objects', '%s:%i Requesting %i objects',
i.destination.host, i.destination.port, chunkCount) i.destination.host, i.destination.port, chunkCount)
requested += chunkCount requested += chunkCount
if time.time() >= self.lastCleaned + DownloadThread.cleanInterval: if time.time() >= self.lastCleaned + self.cleanInterval:
self.cleanPending() self.cleanPending()
if not requested: if not requested:
self.stop.wait(1) self.stop.wait(1)

View File

@ -20,9 +20,7 @@ def handleExpiredDandelion(expired):
the object""" the object"""
if not expired: if not expired:
return return
for i in \ for i in BMConnectionPool().connections():
BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
if not i.fullyEstablished: if not i.fullyEstablished:
continue continue
for x in expired: for x in expired:
@ -44,9 +42,7 @@ class InvThread(StoppableThread):
def handleLocallyGenerated(stream, hashId): def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling""" """Locally generated inventory items require special handling"""
Dandelion().addHash(hashId, stream=stream) Dandelion().addHash(hashId, stream=stream)
for connection in \ for connection in BMConnectionPool().connections():
BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
if state.dandelion and connection != Dandelion().objectChildStem(hashId): if state.dandelion and connection != Dandelion().objectChildStem(hashId):
continue continue
connection.objectsNewToThem[hashId] = time() connection.objectsNewToThem[hashId] = time()
@ -67,8 +63,7 @@ class InvThread(StoppableThread):
break break
if chunk: if chunk:
for connection in BMConnectionPool().inboundConnections.values() + \ for connection in BMConnectionPool().connections():
BMConnectionPool().outboundConnections.values():
fluffs = [] fluffs = []
stems = [] stems = []
for inv in chunk: for inv in chunk:
@ -96,13 +91,13 @@ class InvThread(StoppableThread):
if fluffs: if fluffs:
random.shuffle(fluffs) random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket( connection.append_write_buf(protocol.CreatePacket(
'inv', addresses.encodeVarint(len(fluffs)) + 'inv',
"".join(fluffs))) addresses.encodeVarint(len(fluffs)) + ''.join(fluffs)))
if stems: if stems:
random.shuffle(stems) random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket( connection.append_write_buf(protocol.CreatePacket(
'dinv', addresses.encodeVarint(len(stems)) + 'dinv',
"".join(stems))) addresses.encodeVarint(len(stems)) + ''.join(stems)))
invQueue.iterate() invQueue.iterate()
for i in range(len(chunk)): for i in range(len(chunk)):

View File

@ -95,8 +95,7 @@ class ObjectTracker(object):
def handleReceivedObject(self, streamNumber, hashid): def handleReceivedObject(self, streamNumber, hashid):
"""Handling received object""" """Handling received object"""
for i in network.connectionpool.BMConnectionPool().inboundConnections.values( for i in network.connectionpool.BMConnectionPool().connections():
) + network.connectionpool.BMConnectionPool().outboundConnections.values():
if not i.fullyEstablished: if not i.fullyEstablished:
continue continue
try: try:

View File

@ -19,16 +19,7 @@ currentSentSpeed = 0
def connectedHostsList(): def connectedHostsList():
"""List of all the connected hosts""" """List of all the connected hosts"""
retval = [] return BMConnectionPool().establishedConnections()
for i in BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
if not i.fullyEstablished:
continue
try:
retval.append(i)
except AttributeError:
pass
return retval
def sentBytes(): def sentBytes():
@ -71,12 +62,6 @@ def downloadSpeed():
def pendingDownload(): def pendingDownload():
"""Getting pending downloads""" """Getting pending downloads"""
return len(missingObjects) return len(missingObjects)
# tmp = {}
# for connection in BMConnectionPool().inboundConnections.values() + \
# BMConnectionPool().outboundConnections.values():
# for k in connection.objectsNewToMe.keys():
# tmp[k] = True
# return len(tmp)
def pendingUpload(): def pendingUpload():

View File

@ -1,5 +1,5 @@
""" """
src/network/uploadthread.py `UploadThread` class definition
""" """
import time import time
@ -22,19 +22,19 @@ class UploadThread(StoppableThread):
def run(self): def run(self):
while not self._stopped: while not self._stopped:
uploaded = 0 uploaded = 0
# Choose downloading peers randomly # Choose uploading peers randomly
connections = [x for x in BMConnectionPool().inboundConnections.values() + connections = BMConnectionPool().establishedConnections()
BMConnectionPool().outboundConnections.values() if x.fullyEstablished]
helper_random.randomshuffle(connections) helper_random.randomshuffle(connections)
for i in connections: for i in connections:
now = time.time() now = time.time()
# avoid unnecessary delay # avoid unnecessary delay
if i.skipUntil >= now: if i.skipUntil >= now:
continue continue
if len(i.write_buf) > UploadThread.maxBufSize: if len(i.write_buf) > self.maxBufSize:
continue continue
try: try:
request = i.pendingUpload.randomKeys(RandomTrackingDict.maxPending) request = i.pendingUpload.randomKeys(
RandomTrackingDict.maxPending)
except KeyError: except KeyError:
continue continue
payload = bytearray() payload = bytearray()