Refactor parent package dependency in network module #2255

Open
anand-skss wants to merge 2 commits from test into v0.6
21 changed files with 115 additions and 95 deletions

View File

@ -414,7 +414,7 @@ class SettingsDialog(QtGui.QDialog):
'bitmessagesettings', 'udp'):
self.config.set('bitmessagesettings', 'udp', str(udp_enabled))
if udp_enabled:
announceThread = AnnounceThread()
announceThread = AnnounceThread(self.config)
announceThread.daemon = True
announceThread.start()
else:

View File

@ -1,9 +1,18 @@
"""
Network subsystem package
"""
try:
import networkdepsinterface
except ImportError:
from pybitmessage import networkdepsinterface
from .dandelion import Dandelion
from .threads import StoppableThread
(
state, queues, config, protocol,
randomtrackingdict, addresses, paths) = networkdepsinterface.importParentPackageDepsToNetwork()
dandelion_ins = Dandelion()
__all__ = ["StoppableThread"]
@ -11,7 +20,6 @@ __all__ = ["StoppableThread"]
def start(config, state):
"""Start network threads"""
import state
from .announcethread import AnnounceThread
import connectionpool # pylint: disable=relative-import
from .addrthread import AddrThread
@ -30,18 +38,19 @@ def start(config, state):
readKnownNodes()
connectionpool.pool.connectToStream(1)
for thread in (
BMNetworkThread(), InvThread(), AddrThread(),
DownloadThread(), UploadThread()
BMNetworkThread(queues), InvThread(protocol, state, queues, addresses),
AddrThread(protocol, queues), DownloadThread(state, protocol, addresses),
UploadThread(protocol, state)
):
thread.daemon = True
thread.start()
# Optional components
for i in range(config.getint('threads', 'receive')):
thread = ReceiveQueueThread(i)
thread = ReceiveQueueThread(queues, i)
thread.daemon = True
thread.start()
if config.safeGetBoolean('bitmessagesettings', 'udp'):
state.announceThread = AnnounceThread()
state.announceThread = AnnounceThread(config)
state.announceThread.daemon = True
state.announceThread.start()

View File

@ -6,8 +6,6 @@ from six.moves import queue
# magic imports!
import connectionpool
from helper_random import randomshuffle
from protocol import assembleAddrMessage
from queues import addrQueue # FIXME: init with queue
from threads import StoppableThread
@ -16,12 +14,17 @@ class AddrThread(StoppableThread):
"""(Node) address broadcasting thread"""
name = "AddrBroadcaster"
def __init__(self, protocol, queues):
self.protocol = protocol
self.queues = queues
StoppableThread.__init__(self)
def run(self):
while not self._stopped:
chunk = []
while True:
try:
data = addrQueue.get(False)
data = self.queues.addrQueue.get(False)
chunk.append(data)
except queue.Empty:
break
@ -41,9 +44,9 @@ class AddrThread(StoppableThread):
continue
filtered.append((stream, peer, seen))
if filtered:
i.append_write_buf(assembleAddrMessage(filtered))
i.append_write_buf(self.protocol.assembleAddrMessage(filtered))
addrQueue.iterate()
self.queues.addrQueue.iterate()
for i in range(len(chunk)):
addrQueue.task_done()
self.queues.addrQueue.task_done()
self.stop.wait(1)

View File

@ -6,7 +6,7 @@ import threading
import time
import network.asyncore_pollchoose as asyncore
import state
from network import state
from threads import BusyError, nonBlocking

View File

@ -5,7 +5,6 @@ import time
# magic imports!
import connectionpool
from bmconfigparser import config
from protocol import assembleAddrMessage
from node import Peer
@ -17,18 +16,22 @@ class AnnounceThread(StoppableThread):
name = "Announcer"
announceInterval = 60
def __init__(self, config):
self.config = config
StoppableThread.__init__(self)
def run(self):
lastSelfAnnounced = 0
while not self._stopped:
processed = 0
if lastSelfAnnounced < time.time() - self.announceInterval:
self.announceSelf()
self.announceSelf(self.config)
lastSelfAnnounced = time.time()
if processed == 0:
self.stop.wait(10)
@staticmethod
def announceSelf():
def announceSelf(config):
"""Announce our presence"""
for connection in connectionpool.pool.udpSockets.values():
if not connection.announcing:

View File

@ -4,8 +4,7 @@ BMObject and it's exceptions.
import logging
import time
import protocol
import state
from network import state, protocol
import connectionpool
from network import dandelion_ins
from highlevelcrypto import calculateInventoryHash

View File

@ -11,13 +11,9 @@ import struct
import time
# magic imports!
import addresses
import knownnodes
import protocol
import state
from network import protocol, state, config, queues, addresses, dandelion_ins
import connectionpool
from bmconfigparser import config
from queues import invQueue, objectProcessorQueue, portCheckerQueue
from randomtrackingdict import RandomTrackingDict
from network.advanceddispatcher import AdvancedDispatcher
from network.bmobject import (
@ -26,7 +22,6 @@ from network.bmobject import (
BMObjectUnwantedStreamError
)
from network.proxy import ProxyError
from network import dandelion_ins
from node import Node, Peer
from objectracker import ObjectTracker, missingObjects
@ -409,7 +404,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
try:
self.object.checkObjectByType()
objectProcessorQueue.put((
queues.objectProcessorQueue.put((
self.object.objectType, buffer(self.object.data))) # noqa: F821
except BMObjectInvalidError:
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
@ -431,7 +426,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
)
self.handleReceivedObject(
self.object.streamNumber, self.object.inventoryHash)
invQueue.put((
queues.invQueue.put((
self.object.streamNumber, self.object.inventoryHash,
self.destination))
return True
@ -472,7 +467,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def bm_command_portcheck(self):
"""Incoming port check request, queue it."""
portCheckerQueue.put(Peer(self.destination, self.peerNode.port))
queues.portCheckerQueue.put(Peer(self.destination, self.peerNode.port))
return True
def bm_command_ping(self):

View File

@ -6,10 +6,7 @@ import logging
import random
import knownnodes
import protocol
import state
from bmconfigparser import config
from queues import queue, portCheckerQueue
from network import protocol, state, config, queues
logger = logging.getLogger('default')
@ -34,10 +31,10 @@ def chooseConnection(stream):
onionOnly = config.safeGetBoolean(
"bitmessagesettings", "onionservicesonly")
try:
retval = portCheckerQueue.get(False)
portCheckerQueue.task_done()
retval = queues.portCheckerQueue.get(False)
queues.portCheckerQueue.task_done()
return retval
except queue.Empty:
except queues.queue.Empty:
pass
# with a probability of 0.5, connect to a discovered peer
if random.choice((False, True)) and not haveOnion: # nosec B311

View File

@ -11,9 +11,7 @@ import time
import asyncore_pollchoose as asyncore
import helper_random
import knownnodes
import protocol
import state
from bmconfigparser import config
from network import protocol, state, config
from connectionchooser import chooseConnection
from node import Peer
from proxy import Proxy

View File

@ -2,12 +2,9 @@
`DownloadThread` class definition
"""
import time
import state
import addresses
import helper_random
import protocol
import connectionpool
from network import dandelion_ins
import helper_random
import connectionpool
from objectracker import missingObjects
from threads import StoppableThread
@ -20,8 +17,11 @@ class DownloadThread(StoppableThread):
cleanInterval = 60
requestExpires = 3600
def __init__(self):
def __init__(self, state, protocol, addresses):
super(DownloadThread, self).__init__(name="Downloader")
self.state = state
self.protocol = protocol
self.addresses = addresses
self.lastCleaned = time.time()
def cleanPending(self):
@ -60,7 +60,7 @@ class DownloadThread(StoppableThread):
payload = bytearray()
chunkCount = 0
for chunk in request:
if chunk in state.Inventory and not dandelion_ins.hasHash(chunk):
if chunk in self.state.Inventory and not dandelion_ins.hasHash(chunk):
try:
del i.objectsNewToMe[chunk]
except KeyError:
@ -71,8 +71,8 @@ class DownloadThread(StoppableThread):
missingObjects[chunk] = now
if not chunkCount:
continue
payload[0:0] = addresses.encodeVarint(chunkCount)
i.append_write_buf(protocol.CreatePacket('getdata', payload))
payload[0:0] = self.addresses.encodeVarint(chunkCount)
i.append_write_buf(self.protocol.CreatePacket('getdata', payload))
self.logger.debug(
'%s:%i Requesting %i objects',
i.destination.host, i.destination.port, chunkCount)

View File

@ -5,12 +5,8 @@ import Queue
import random
from time import time
import addresses
import protocol
import state
import connectionpool
from network import dandelion_ins
from queues import invQueue
from threads import StoppableThread
@ -37,6 +33,13 @@ class InvThread(StoppableThread):
name = "InvBroadcaster"
def __init__(self, protocol, state, queues, addresses):
self.protocol = protocol
self.state = state
self.queues = queues
self.addresses = addresses
StoppableThread.__init__(self)
@staticmethod
def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling"""
@ -48,13 +51,13 @@ class InvThread(StoppableThread):
connection.objectsNewToThem[hashId] = time()
def run(self): # pylint: disable=too-many-branches
while not state.shutdown: # pylint: disable=too-many-nested-blocks
while not self.state.shutdown: # pylint: disable=too-many-nested-blocks
chunk = []
while True:
# Dandelion fluff trigger by expiration
handleExpiredDandelion(dandelion_ins.expire(invQueue))
handleExpiredDandelion(dandelion_ins.expire(self.queues.invQueue))
try:
data = invQueue.get(False)
data = self.queues.invQueue.get(False)
chunk.append((data[0], data[1]))
# locally generated
if len(data) == 2 or data[2] is None:
@ -81,7 +84,7 @@ class InvThread(StoppableThread):
if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311
fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0:
elif connection.services & self.protocol.NODE_DANDELION > 0:
stems.append(inv[1])
else:
fluffs.append(inv[1])
@ -90,20 +93,20 @@ class InvThread(StoppableThread):
if fluffs:
random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket(
connection.append_write_buf(self.protocol.CreatePacket(
'inv',
addresses.encodeVarint(
self.addresses.encodeVarint(
len(fluffs)) + ''.join(fluffs)))
if stems:
random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket(
connection.append_write_buf(self.protocol.CreatePacket(
'dinv',
addresses.encodeVarint(
self.addresses.encodeVarint(
len(stems)) + ''.join(stems)))
invQueue.iterate()
self.queues.invQueue.iterate()
for _ in range(len(chunk)):
invQueue.task_done()
self.queues.invQueue.task_done()
dandelion_ins.reRandomiseStems()

View File

@ -15,8 +15,7 @@ try:
except ImportError:
from collections import Iterable
import state
from bmconfigparser import config
from network import state, config
from network.node import Peer
state.Peer = Peer

View File

@ -3,7 +3,6 @@ A thread to handle network concerns
"""
import network.asyncore_pollchoose as asyncore
import connectionpool
from queues import excQueue
from threads import StoppableThread
@ -11,12 +10,16 @@ class BMNetworkThread(StoppableThread):
"""Main network thread"""
name = "Asyncore"
def __init__(self, queues):
self.queues = queues
StoppableThread.__init__(self)
def run(self):
try:
while not self._stopped:
connectionpool.pool.loop()
except Exception as e:
excQueue.put((self.name, e))
self.queues.excQueue.put((self.name, e))
raise
def stopThread(self):

View File

@ -8,7 +8,7 @@ import time
import asyncore_pollchoose as asyncore
from advanceddispatcher import AdvancedDispatcher
from bmconfigparser import config
from network import config
from node import Peer
logger = logging.getLogger('default')

View File

@ -7,20 +7,20 @@ import socket
import connectionpool
from network.advanceddispatcher import UnknownStateError
from queues import receiveDataQueue
from threads import StoppableThread
class ReceiveQueueThread(StoppableThread):
"""This thread processes data received from the network
(which is done by the asyncore thread)"""
def __init__(self, num=0):
def __init__(self, queues, num=0):
self.queues = queues
super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num)
def run(self):
while not self._stopped:
try:
dest = receiveDataQueue.get(block=True, timeout=1)
dest = self.queues.receiveDataQueue.get(block=True, timeout=1)
except Queue.Empty:
continue
@ -38,7 +38,7 @@ class ReceiveQueueThread(StoppableThread):
connection = connectionpool.pool.getConnectionByAddr(dest)
# connection object not found
except KeyError:
receiveDataQueue.task_done()
self.queues.receiveDataQueue.task_done()
continue
try:
connection.process()
@ -52,4 +52,4 @@ class ReceiveQueueThread(StoppableThread):
self.logger.error('Socket error: %s', err)
except: # noqa:E722
self.logger.error('Error processing', exc_info=True)
receiveDataQueue.task_done()
self.queues.receiveDataQueue.task_done()

View File

@ -10,16 +10,11 @@ import socket
import time
# magic imports!
import addresses
import helper_random
import l10n
import protocol
import state
from network import protocol, state, config, queues, addresses, dandelion_ins
import connectionpool
from bmconfigparser import config
from highlevelcrypto import randomBytes
from network import dandelion_ins
from queues import invQueue, receiveDataQueue, UISignalQueue
from tr import _translate
import asyncore_pollchoose as asyncore
@ -109,7 +104,7 @@ class TCPConnection(BMProto, TLSDispatcher):
max_known_nodes = max(
len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes)
delay = math.ceil(math.log(max_known_nodes + 2, 20)) * (
0.2 + invQueue.queueCount / 2.0)
0.2 + queues.invQueue.queueCount / 2.0)
# take the stream with maximum amount of nodes
# +2 is to avoid problems with log(0) and log(1)
# 20 is avg connected nodes count
@ -135,7 +130,7 @@ class TCPConnection(BMProto, TLSDispatcher):
if BMProto.timeOffsetWrongCount > \
maximumTimeOffsetWrongCount and \
not self.fullyEstablished:
UISignalQueue.put((
queues.UISignalQueue.put((
'updateStatusBar',
_translate(
"MainWindow",
@ -158,8 +153,8 @@ class TCPConnection(BMProto, TLSDispatcher):
"""Initiate inventory synchronisation."""
if not self.isOutbound and not self.local:
state.clientHasReceivedIncomingConnections = True
UISignalQueue.put(('setStatusIcon', 'green'))
UISignalQueue.put((
queues.UISignalQueue.put(('setStatusIcon', 'green'))
queues.UISignalQueue.put((
'updateNetworkStatusTab', (self.isOutbound, True, self.destination)
))
self.antiIntersectionDelay(True)
@ -169,7 +164,7 @@ class TCPConnection(BMProto, TLSDispatcher):
knownnodes.increaseRating(self.destination)
knownnodes.addKnownNode(
self.streams, self.destination, time.time())
dandelion_ins.maybeAddStem(self, invQueue)
dandelion_ins.maybeAddStem(self, queues.invQueue)
self.sendAddr()
self.sendBigInv()
@ -271,12 +266,12 @@ class TCPConnection(BMProto, TLSDispatcher):
connectionpool.pool.streams, dandelion_ins.enabled,
False, nodeid=self.nodeid))
self.connectedAt = time.time()
receiveDataQueue.put(self.destination)
queues.receiveDataQueue.put(self.destination)
def handle_read(self):
"""Callback for reading from a socket"""
TLSDispatcher.handle_read(self)
receiveDataQueue.put(self.destination)
queues.receiveDataQueue.put(self.destination)
def handle_write(self):
"""Callback for writing to a socket"""
@ -286,7 +281,7 @@ class TCPConnection(BMProto, TLSDispatcher):
"""Callback for connection being closed."""
host_is_global = self.isOutbound or not self.local and not state.socksIP
if self.fullyEstablished:
UISignalQueue.put((
queues.UISignalQueue.put((
'updateNetworkStatusTab',
(self.isOutbound, False, self.destination)
))

View File

@ -10,7 +10,7 @@ import sys
import network.asyncore_pollchoose as asyncore
import paths
from network.advanceddispatcher import AdvancedDispatcher
from queues import receiveDataQueue
from network import queues
logger = logging.getLogger('default')
@ -216,5 +216,5 @@ class TLSDispatcher(AdvancedDispatcher):
self.bm_proto_reset()
self.set_state("connection_fully_established")
receiveDataQueue.put(self.destination)
queues.receiveDataQueue.put(self.destination)
return False

View File

@ -6,10 +6,8 @@ import socket
import time
# magic imports!
import protocol
import state
from network import protocol, state, queues
import connectionpool
from queues import receiveDataQueue
from bmproto import BMProto
from node import Peer
@ -138,7 +136,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
# self.local works correctly
self.read_buf[0:] = recdata
self.bm_proto_reset()
receiveDataQueue.put(self.listening)
queues.receiveDataQueue.put(self.listening)
def handle_write(self):
try:

View File

@ -4,8 +4,6 @@
import time
import helper_random
import protocol
import state
import connectionpool
from randomtrackingdict import RandomTrackingDict
from network import dandelion_ins
@ -19,6 +17,11 @@ class UploadThread(StoppableThread):
maxBufSize = 2097152 # 2MB
name = "Uploader"
def __init__(self, protocol, state):
self.protocol = protocol
self.state = state
StoppableThread.__init__(self)
def run(self):
while not self._stopped:
uploaded = 0
@ -49,8 +52,8 @@ class UploadThread(StoppableThread):
i.destination)
break
try:
payload.extend(protocol.CreatePacket(
'object', state.Inventory[chunk].payload))
payload.extend(self.protocol.CreatePacket(
'object', self.state.Inventory[chunk].payload))
chunk_count += 1
except KeyError:
i.antiIntersectionDelay()

View File

@ -0,0 +1,15 @@
import state
import queues
import protocol
import paths
import randomtrackingdict
import addresses
from bmconfigparser import config
def importParentPackageDepsToNetwork():
"""
Exports parent package dependencies to the network.
"""
return (state, queues, config, protocol, randomtrackingdict, addresses, paths)

View File

@ -74,7 +74,7 @@ class TestNetwork(TestPartialRun):
for _ in range(10):
try:
self.state.announceThread.announceSelf()
self.state.announceThread.announceSelf(self.config)
except AttributeError:
self.fail('state.announceThread is not set properly')
time.sleep(1)