Refactor parent package dependency in network module #2255

Open
anand-skss wants to merge 2 commits from test into v0.6
21 changed files with 115 additions and 95 deletions

View File

@ -414,7 +414,7 @@ class SettingsDialog(QtGui.QDialog):
'bitmessagesettings', 'udp'): 'bitmessagesettings', 'udp'):
self.config.set('bitmessagesettings', 'udp', str(udp_enabled)) self.config.set('bitmessagesettings', 'udp', str(udp_enabled))
if udp_enabled: if udp_enabled:
announceThread = AnnounceThread() announceThread = AnnounceThread(self.config)
announceThread.daemon = True announceThread.daemon = True
announceThread.start() announceThread.start()
else: else:

View File

@ -1,9 +1,18 @@
""" """
Network subsystem package Network subsystem package
""" """
try:
import networkdepsinterface
except ImportError:
from pybitmessage import networkdepsinterface
from .dandelion import Dandelion from .dandelion import Dandelion
from .threads import StoppableThread from .threads import StoppableThread
(
state, queues, config, protocol,
randomtrackingdict, addresses, paths) = networkdepsinterface.importParentPackageDepsToNetwork()
dandelion_ins = Dandelion() dandelion_ins = Dandelion()
__all__ = ["StoppableThread"] __all__ = ["StoppableThread"]
@ -11,7 +20,6 @@ __all__ = ["StoppableThread"]
def start(config, state): def start(config, state):
"""Start network threads""" """Start network threads"""
import state
from .announcethread import AnnounceThread from .announcethread import AnnounceThread
import connectionpool # pylint: disable=relative-import import connectionpool # pylint: disable=relative-import
from .addrthread import AddrThread from .addrthread import AddrThread
@ -30,18 +38,19 @@ def start(config, state):
readKnownNodes() readKnownNodes()
connectionpool.pool.connectToStream(1) connectionpool.pool.connectToStream(1)
for thread in ( for thread in (
BMNetworkThread(), InvThread(), AddrThread(), BMNetworkThread(queues), InvThread(protocol, state, queues, addresses),
DownloadThread(), UploadThread() AddrThread(protocol, queues), DownloadThread(state, protocol, addresses),
UploadThread(protocol, state)
): ):
thread.daemon = True thread.daemon = True
thread.start() thread.start()
# Optional components # Optional components
for i in range(config.getint('threads', 'receive')): for i in range(config.getint('threads', 'receive')):
thread = ReceiveQueueThread(i) thread = ReceiveQueueThread(queues, i)
thread.daemon = True thread.daemon = True
thread.start() thread.start()
if config.safeGetBoolean('bitmessagesettings', 'udp'): if config.safeGetBoolean('bitmessagesettings', 'udp'):
state.announceThread = AnnounceThread() state.announceThread = AnnounceThread(config)
state.announceThread.daemon = True state.announceThread.daemon = True
state.announceThread.start() state.announceThread.start()

View File

@ -6,8 +6,6 @@ from six.moves import queue
# magic imports! # magic imports!
import connectionpool import connectionpool
from helper_random import randomshuffle from helper_random import randomshuffle
from protocol import assembleAddrMessage
from queues import addrQueue # FIXME: init with queue
from threads import StoppableThread from threads import StoppableThread
@ -16,12 +14,17 @@ class AddrThread(StoppableThread):
"""(Node) address broadcasting thread""" """(Node) address broadcasting thread"""
name = "AddrBroadcaster" name = "AddrBroadcaster"
def __init__(self, protocol, queues):
self.protocol = protocol
self.queues = queues
StoppableThread.__init__(self)
def run(self): def run(self):
while not self._stopped: while not self._stopped:
chunk = [] chunk = []
while True: while True:
try: try:
data = addrQueue.get(False) data = self.queues.addrQueue.get(False)
chunk.append(data) chunk.append(data)
except queue.Empty: except queue.Empty:
break break
@ -41,9 +44,9 @@ class AddrThread(StoppableThread):
continue continue
filtered.append((stream, peer, seen)) filtered.append((stream, peer, seen))
if filtered: if filtered:
i.append_write_buf(assembleAddrMessage(filtered)) i.append_write_buf(self.protocol.assembleAddrMessage(filtered))
addrQueue.iterate() self.queues.addrQueue.iterate()
for i in range(len(chunk)): for i in range(len(chunk)):
addrQueue.task_done() self.queues.addrQueue.task_done()
self.stop.wait(1) self.stop.wait(1)

View File

@ -6,7 +6,7 @@ import threading
import time import time
import network.asyncore_pollchoose as asyncore import network.asyncore_pollchoose as asyncore
import state from network import state
from threads import BusyError, nonBlocking from threads import BusyError, nonBlocking

View File

@ -5,7 +5,6 @@ import time
# magic imports! # magic imports!
import connectionpool import connectionpool
from bmconfigparser import config
from protocol import assembleAddrMessage from protocol import assembleAddrMessage
from node import Peer from node import Peer
@ -17,18 +16,22 @@ class AnnounceThread(StoppableThread):
name = "Announcer" name = "Announcer"
announceInterval = 60 announceInterval = 60
def __init__(self, config):
self.config = config
StoppableThread.__init__(self)
def run(self): def run(self):
lastSelfAnnounced = 0 lastSelfAnnounced = 0
while not self._stopped: while not self._stopped:
processed = 0 processed = 0
if lastSelfAnnounced < time.time() - self.announceInterval: if lastSelfAnnounced < time.time() - self.announceInterval:
self.announceSelf() self.announceSelf(self.config)
lastSelfAnnounced = time.time() lastSelfAnnounced = time.time()
if processed == 0: if processed == 0:
self.stop.wait(10) self.stop.wait(10)
@staticmethod @staticmethod
def announceSelf(): def announceSelf(config):
"""Announce our presence""" """Announce our presence"""
for connection in connectionpool.pool.udpSockets.values(): for connection in connectionpool.pool.udpSockets.values():
if not connection.announcing: if not connection.announcing:

View File

@ -4,8 +4,7 @@ BMObject and it's exceptions.
import logging import logging
import time import time
import protocol from network import state, protocol
import state
import connectionpool import connectionpool
from network import dandelion_ins from network import dandelion_ins
from highlevelcrypto import calculateInventoryHash from highlevelcrypto import calculateInventoryHash

View File

@ -11,13 +11,9 @@ import struct
import time import time
# magic imports! # magic imports!
import addresses
import knownnodes import knownnodes
import protocol from network import protocol, state, config, queues, addresses, dandelion_ins
import state
import connectionpool import connectionpool
from bmconfigparser import config
from queues import invQueue, objectProcessorQueue, portCheckerQueue
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
from network.advanceddispatcher import AdvancedDispatcher from network.advanceddispatcher import AdvancedDispatcher
from network.bmobject import ( from network.bmobject import (
@ -26,7 +22,6 @@ from network.bmobject import (
BMObjectUnwantedStreamError BMObjectUnwantedStreamError
) )
from network.proxy import ProxyError from network.proxy import ProxyError
from network import dandelion_ins
from node import Node, Peer from node import Node, Peer
from objectracker import ObjectTracker, missingObjects from objectracker import ObjectTracker, missingObjects
@ -409,7 +404,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
try: try:
self.object.checkObjectByType() self.object.checkObjectByType()
objectProcessorQueue.put(( queues.objectProcessorQueue.put((
self.object.objectType, buffer(self.object.data))) # noqa: F821 self.object.objectType, buffer(self.object.data))) # noqa: F821
except BMObjectInvalidError: except BMObjectInvalidError:
BMProto.stopDownloadingObject(self.object.inventoryHash, True) BMProto.stopDownloadingObject(self.object.inventoryHash, True)
@ -431,7 +426,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
) )
self.handleReceivedObject( self.handleReceivedObject(
self.object.streamNumber, self.object.inventoryHash) self.object.streamNumber, self.object.inventoryHash)
invQueue.put(( queues.invQueue.put((
self.object.streamNumber, self.object.inventoryHash, self.object.streamNumber, self.object.inventoryHash,
self.destination)) self.destination))
return True return True
@ -472,7 +467,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def bm_command_portcheck(self): def bm_command_portcheck(self):
"""Incoming port check request, queue it.""" """Incoming port check request, queue it."""
portCheckerQueue.put(Peer(self.destination, self.peerNode.port)) queues.portCheckerQueue.put(Peer(self.destination, self.peerNode.port))
return True return True
def bm_command_ping(self): def bm_command_ping(self):

View File

@ -6,10 +6,7 @@ import logging
import random import random
import knownnodes import knownnodes
import protocol from network import protocol, state, config, queues
import state
from bmconfigparser import config
from queues import queue, portCheckerQueue
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -34,10 +31,10 @@ def chooseConnection(stream):
onionOnly = config.safeGetBoolean( onionOnly = config.safeGetBoolean(
"bitmessagesettings", "onionservicesonly") "bitmessagesettings", "onionservicesonly")
try: try:
retval = portCheckerQueue.get(False) retval = queues.portCheckerQueue.get(False)
portCheckerQueue.task_done() queues.portCheckerQueue.task_done()
return retval return retval
except queue.Empty: except queues.queue.Empty:
pass pass
# with a probability of 0.5, connect to a discovered peer # with a probability of 0.5, connect to a discovered peer
if random.choice((False, True)) and not haveOnion: # nosec B311 if random.choice((False, True)) and not haveOnion: # nosec B311

View File

@ -11,9 +11,7 @@ import time
import asyncore_pollchoose as asyncore import asyncore_pollchoose as asyncore
import helper_random import helper_random
import knownnodes import knownnodes
import protocol from network import protocol, state, config
import state
from bmconfigparser import config
from connectionchooser import chooseConnection from connectionchooser import chooseConnection
from node import Peer from node import Peer
from proxy import Proxy from proxy import Proxy

View File

@ -2,12 +2,9 @@
`DownloadThread` class definition `DownloadThread` class definition
""" """
import time import time
import state
import addresses
import helper_random
import protocol
import connectionpool
from network import dandelion_ins from network import dandelion_ins
import helper_random
import connectionpool
from objectracker import missingObjects from objectracker import missingObjects
from threads import StoppableThread from threads import StoppableThread
@ -20,8 +17,11 @@ class DownloadThread(StoppableThread):
cleanInterval = 60 cleanInterval = 60
requestExpires = 3600 requestExpires = 3600
def __init__(self): def __init__(self, state, protocol, addresses):
super(DownloadThread, self).__init__(name="Downloader") super(DownloadThread, self).__init__(name="Downloader")
self.state = state
self.protocol = protocol
self.addresses = addresses
self.lastCleaned = time.time() self.lastCleaned = time.time()
def cleanPending(self): def cleanPending(self):
@ -60,7 +60,7 @@ class DownloadThread(StoppableThread):
payload = bytearray() payload = bytearray()
chunkCount = 0 chunkCount = 0
for chunk in request: for chunk in request:
if chunk in state.Inventory and not dandelion_ins.hasHash(chunk): if chunk in self.state.Inventory and not dandelion_ins.hasHash(chunk):
try: try:
del i.objectsNewToMe[chunk] del i.objectsNewToMe[chunk]
except KeyError: except KeyError:
@ -71,8 +71,8 @@ class DownloadThread(StoppableThread):
missingObjects[chunk] = now missingObjects[chunk] = now
if not chunkCount: if not chunkCount:
continue continue
payload[0:0] = addresses.encodeVarint(chunkCount) payload[0:0] = self.addresses.encodeVarint(chunkCount)
i.append_write_buf(protocol.CreatePacket('getdata', payload)) i.append_write_buf(self.protocol.CreatePacket('getdata', payload))
self.logger.debug( self.logger.debug(
'%s:%i Requesting %i objects', '%s:%i Requesting %i objects',
i.destination.host, i.destination.port, chunkCount) i.destination.host, i.destination.port, chunkCount)

View File

@ -5,12 +5,8 @@ import Queue
import random import random
from time import time from time import time
import addresses
import protocol
import state
import connectionpool import connectionpool
from network import dandelion_ins from network import dandelion_ins
from queues import invQueue
from threads import StoppableThread from threads import StoppableThread
@ -37,6 +33,13 @@ class InvThread(StoppableThread):
name = "InvBroadcaster" name = "InvBroadcaster"
def __init__(self, protocol, state, queues, addresses):
self.protocol = protocol
self.state = state
self.queues = queues
self.addresses = addresses
StoppableThread.__init__(self)
@staticmethod @staticmethod
def handleLocallyGenerated(stream, hashId): def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling""" """Locally generated inventory items require special handling"""
@ -48,13 +51,13 @@ class InvThread(StoppableThread):
connection.objectsNewToThem[hashId] = time() connection.objectsNewToThem[hashId] = time()
def run(self): # pylint: disable=too-many-branches def run(self): # pylint: disable=too-many-branches
while not state.shutdown: # pylint: disable=too-many-nested-blocks while not self.state.shutdown: # pylint: disable=too-many-nested-blocks
chunk = [] chunk = []
while True: while True:
# Dandelion fluff trigger by expiration # Dandelion fluff trigger by expiration
handleExpiredDandelion(dandelion_ins.expire(invQueue)) handleExpiredDandelion(dandelion_ins.expire(self.queues.invQueue))
try: try:
data = invQueue.get(False) data = self.queues.invQueue.get(False)
chunk.append((data[0], data[1])) chunk.append((data[0], data[1]))
# locally generated # locally generated
if len(data) == 2 or data[2] is None: if len(data) == 2 or data[2] is None:
@ -81,7 +84,7 @@ class InvThread(StoppableThread):
if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311 if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311
fluffs.append(inv[1]) fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion # send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0: elif connection.services & self.protocol.NODE_DANDELION > 0:
stems.append(inv[1]) stems.append(inv[1])
else: else:
fluffs.append(inv[1]) fluffs.append(inv[1])
@ -90,20 +93,20 @@ class InvThread(StoppableThread):
if fluffs: if fluffs:
random.shuffle(fluffs) random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket( connection.append_write_buf(self.protocol.CreatePacket(
'inv', 'inv',
addresses.encodeVarint( self.addresses.encodeVarint(
len(fluffs)) + ''.join(fluffs))) len(fluffs)) + ''.join(fluffs)))
if stems: if stems:
random.shuffle(stems) random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket( connection.append_write_buf(self.protocol.CreatePacket(
'dinv', 'dinv',
addresses.encodeVarint( self.addresses.encodeVarint(
len(stems)) + ''.join(stems))) len(stems)) + ''.join(stems)))
invQueue.iterate() self.queues.invQueue.iterate()
for _ in range(len(chunk)): for _ in range(len(chunk)):
invQueue.task_done() self.queues.invQueue.task_done()
dandelion_ins.reRandomiseStems() dandelion_ins.reRandomiseStems()

View File

@ -15,8 +15,7 @@ try:
except ImportError: except ImportError:
from collections import Iterable from collections import Iterable
import state from network import state, config
from bmconfigparser import config
from network.node import Peer from network.node import Peer
state.Peer = Peer state.Peer = Peer

View File

@ -3,7 +3,6 @@ A thread to handle network concerns
""" """
import network.asyncore_pollchoose as asyncore import network.asyncore_pollchoose as asyncore
import connectionpool import connectionpool
from queues import excQueue
from threads import StoppableThread from threads import StoppableThread
@ -11,12 +10,16 @@ class BMNetworkThread(StoppableThread):
"""Main network thread""" """Main network thread"""
name = "Asyncore" name = "Asyncore"
def __init__(self, queues):
self.queues = queues
StoppableThread.__init__(self)
def run(self): def run(self):
try: try:
while not self._stopped: while not self._stopped:
connectionpool.pool.loop() connectionpool.pool.loop()
except Exception as e: except Exception as e:
excQueue.put((self.name, e)) self.queues.excQueue.put((self.name, e))
raise raise
def stopThread(self): def stopThread(self):

View File

@ -8,7 +8,7 @@ import time
import asyncore_pollchoose as asyncore import asyncore_pollchoose as asyncore
from advanceddispatcher import AdvancedDispatcher from advanceddispatcher import AdvancedDispatcher
from bmconfigparser import config from network import config
from node import Peer from node import Peer
logger = logging.getLogger('default') logger = logging.getLogger('default')

View File

@ -7,20 +7,20 @@ import socket
import connectionpool import connectionpool
from network.advanceddispatcher import UnknownStateError from network.advanceddispatcher import UnknownStateError
from queues import receiveDataQueue
from threads import StoppableThread from threads import StoppableThread
class ReceiveQueueThread(StoppableThread): class ReceiveQueueThread(StoppableThread):
"""This thread processes data received from the network """This thread processes data received from the network
(which is done by the asyncore thread)""" (which is done by the asyncore thread)"""
def __init__(self, num=0): def __init__(self, queues, num=0):
self.queues = queues
super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num) super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num)
def run(self): def run(self):
while not self._stopped: while not self._stopped:
try: try:
dest = receiveDataQueue.get(block=True, timeout=1) dest = self.queues.receiveDataQueue.get(block=True, timeout=1)
except Queue.Empty: except Queue.Empty:
continue continue
@ -38,7 +38,7 @@ class ReceiveQueueThread(StoppableThread):
connection = connectionpool.pool.getConnectionByAddr(dest) connection = connectionpool.pool.getConnectionByAddr(dest)
# connection object not found # connection object not found
except KeyError: except KeyError:
receiveDataQueue.task_done() self.queues.receiveDataQueue.task_done()
continue continue
try: try:
connection.process() connection.process()
@ -52,4 +52,4 @@ class ReceiveQueueThread(StoppableThread):
self.logger.error('Socket error: %s', err) self.logger.error('Socket error: %s', err)
except: # noqa:E722 except: # noqa:E722
self.logger.error('Error processing', exc_info=True) self.logger.error('Error processing', exc_info=True)
receiveDataQueue.task_done() self.queues.receiveDataQueue.task_done()

View File

@ -10,16 +10,11 @@ import socket
import time import time
# magic imports! # magic imports!
import addresses
import helper_random import helper_random
import l10n import l10n
import protocol from network import protocol, state, config, queues, addresses, dandelion_ins
import state
import connectionpool import connectionpool
from bmconfigparser import config
from highlevelcrypto import randomBytes from highlevelcrypto import randomBytes
from network import dandelion_ins
from queues import invQueue, receiveDataQueue, UISignalQueue
from tr import _translate from tr import _translate
import asyncore_pollchoose as asyncore import asyncore_pollchoose as asyncore
@ -109,7 +104,7 @@ class TCPConnection(BMProto, TLSDispatcher):
max_known_nodes = max( max_known_nodes = max(
len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes)
delay = math.ceil(math.log(max_known_nodes + 2, 20)) * ( delay = math.ceil(math.log(max_known_nodes + 2, 20)) * (
0.2 + invQueue.queueCount / 2.0) 0.2 + queues.invQueue.queueCount / 2.0)
# take the stream with maximum amount of nodes # take the stream with maximum amount of nodes
# +2 is to avoid problems with log(0) and log(1) # +2 is to avoid problems with log(0) and log(1)
# 20 is avg connected nodes count # 20 is avg connected nodes count
@ -135,7 +130,7 @@ class TCPConnection(BMProto, TLSDispatcher):
if BMProto.timeOffsetWrongCount > \ if BMProto.timeOffsetWrongCount > \
maximumTimeOffsetWrongCount and \ maximumTimeOffsetWrongCount and \
not self.fullyEstablished: not self.fullyEstablished:
UISignalQueue.put(( queues.UISignalQueue.put((
'updateStatusBar', 'updateStatusBar',
_translate( _translate(
"MainWindow", "MainWindow",
@ -158,8 +153,8 @@ class TCPConnection(BMProto, TLSDispatcher):
"""Initiate inventory synchronisation.""" """Initiate inventory synchronisation."""
if not self.isOutbound and not self.local: if not self.isOutbound and not self.local:
state.clientHasReceivedIncomingConnections = True state.clientHasReceivedIncomingConnections = True
UISignalQueue.put(('setStatusIcon', 'green')) queues.UISignalQueue.put(('setStatusIcon', 'green'))
UISignalQueue.put(( queues.UISignalQueue.put((
'updateNetworkStatusTab', (self.isOutbound, True, self.destination) 'updateNetworkStatusTab', (self.isOutbound, True, self.destination)
)) ))
self.antiIntersectionDelay(True) self.antiIntersectionDelay(True)
@ -169,7 +164,7 @@ class TCPConnection(BMProto, TLSDispatcher):
knownnodes.increaseRating(self.destination) knownnodes.increaseRating(self.destination)
knownnodes.addKnownNode( knownnodes.addKnownNode(
self.streams, self.destination, time.time()) self.streams, self.destination, time.time())
dandelion_ins.maybeAddStem(self, invQueue) dandelion_ins.maybeAddStem(self, queues.invQueue)
self.sendAddr() self.sendAddr()
self.sendBigInv() self.sendBigInv()
@ -271,12 +266,12 @@ class TCPConnection(BMProto, TLSDispatcher):
connectionpool.pool.streams, dandelion_ins.enabled, connectionpool.pool.streams, dandelion_ins.enabled,
False, nodeid=self.nodeid)) False, nodeid=self.nodeid))
self.connectedAt = time.time() self.connectedAt = time.time()
receiveDataQueue.put(self.destination) queues.receiveDataQueue.put(self.destination)
def handle_read(self): def handle_read(self):
"""Callback for reading from a socket""" """Callback for reading from a socket"""
TLSDispatcher.handle_read(self) TLSDispatcher.handle_read(self)
receiveDataQueue.put(self.destination) queues.receiveDataQueue.put(self.destination)
def handle_write(self): def handle_write(self):
"""Callback for writing to a socket""" """Callback for writing to a socket"""
@ -286,7 +281,7 @@ class TCPConnection(BMProto, TLSDispatcher):
"""Callback for connection being closed.""" """Callback for connection being closed."""
host_is_global = self.isOutbound or not self.local and not state.socksIP host_is_global = self.isOutbound or not self.local and not state.socksIP
if self.fullyEstablished: if self.fullyEstablished:
UISignalQueue.put(( queues.UISignalQueue.put((
'updateNetworkStatusTab', 'updateNetworkStatusTab',
(self.isOutbound, False, self.destination) (self.isOutbound, False, self.destination)
)) ))

View File

@ -10,7 +10,7 @@ import sys
import network.asyncore_pollchoose as asyncore import network.asyncore_pollchoose as asyncore
import paths import paths
from network.advanceddispatcher import AdvancedDispatcher from network.advanceddispatcher import AdvancedDispatcher
from queues import receiveDataQueue from network import queues
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -216,5 +216,5 @@ class TLSDispatcher(AdvancedDispatcher):
self.bm_proto_reset() self.bm_proto_reset()
self.set_state("connection_fully_established") self.set_state("connection_fully_established")
receiveDataQueue.put(self.destination) queues.receiveDataQueue.put(self.destination)
return False return False

View File

@ -6,10 +6,8 @@ import socket
import time import time
# magic imports! # magic imports!
import protocol from network import protocol, state, queues
import state
import connectionpool import connectionpool
from queues import receiveDataQueue
from bmproto import BMProto from bmproto import BMProto
from node import Peer from node import Peer
@ -138,7 +136,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
# self.local works correctly # self.local works correctly
self.read_buf[0:] = recdata self.read_buf[0:] = recdata
self.bm_proto_reset() self.bm_proto_reset()
receiveDataQueue.put(self.listening) queues.receiveDataQueue.put(self.listening)
def handle_write(self): def handle_write(self):
try: try:

View File

@ -4,8 +4,6 @@
import time import time
import helper_random import helper_random
import protocol
import state
import connectionpool import connectionpool
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
from network import dandelion_ins from network import dandelion_ins
@ -19,6 +17,11 @@ class UploadThread(StoppableThread):
maxBufSize = 2097152 # 2MB maxBufSize = 2097152 # 2MB
name = "Uploader" name = "Uploader"
def __init__(self, protocol, state):
self.protocol = protocol
self.state = state
StoppableThread.__init__(self)
def run(self): def run(self):
while not self._stopped: while not self._stopped:
uploaded = 0 uploaded = 0
@ -49,8 +52,8 @@ class UploadThread(StoppableThread):
i.destination) i.destination)
break break
try: try:
payload.extend(protocol.CreatePacket( payload.extend(self.protocol.CreatePacket(
'object', state.Inventory[chunk].payload)) 'object', self.state.Inventory[chunk].payload))
chunk_count += 1 chunk_count += 1
except KeyError: except KeyError:
i.antiIntersectionDelay() i.antiIntersectionDelay()

View File

@ -0,0 +1,15 @@
import state
import queues
import protocol
import paths
import randomtrackingdict
import addresses
from bmconfigparser import config
def importParentPackageDepsToNetwork():
"""
Exports parent package dependencies to the network.
"""
return (state, queues, config, protocol, randomtrackingdict, addresses, paths)

View File

@ -74,7 +74,7 @@ class TestNetwork(TestPartialRun):
for _ in range(10): for _ in range(10):
try: try:
self.state.announceThread.announceSelf() self.state.announceThread.announceSelf(self.config)
except AttributeError: except AttributeError:
self.fail('state.announceThread is not set properly') self.fail('state.announceThread is not set properly')
time.sleep(1) time.sleep(1)