Refactor parent package dependency in network module #2255

Open
anand-skss wants to merge 2 commits from test into v0.6
19 changed files with 68 additions and 71 deletions
Showing only changes of commit f2bb4d1fd1 - Show all commits

View File

@ -1,9 +1,18 @@
""" """
Network subsystem package Network subsystem package
""" """
try:
import networkdepsinterface
except ImportError:
from pybitmessage import networkdepsinterface
from .dandelion import Dandelion from .dandelion import Dandelion
from .threads import StoppableThread from .threads import StoppableThread
(
state, queues, config, protocol,
randomtrackingdict, addresses, paths) = networkdepsinterface.importParentPackageDepsToNetwork()
dandelion_ins = Dandelion() dandelion_ins = Dandelion()
__all__ = ["StoppableThread"] __all__ = ["StoppableThread"]
@ -11,7 +20,6 @@ __all__ = ["StoppableThread"]
def start(config, state): def start(config, state):
"""Start network threads""" """Start network threads"""
import state
from .announcethread import AnnounceThread from .announcethread import AnnounceThread
import connectionpool # pylint: disable=relative-import import connectionpool # pylint: disable=relative-import
from .addrthread import AddrThread from .addrthread import AddrThread

View File

@ -7,7 +7,7 @@ from six.moves import queue
import connectionpool import connectionpool
from helper_random import randomshuffle from helper_random import randomshuffle
from protocol import assembleAddrMessage from protocol import assembleAddrMessage
from queues import addrQueue # FIXME: init with queue from network import queues # FIXME: init with queue
from threads import StoppableThread from threads import StoppableThread
@ -21,7 +21,7 @@ class AddrThread(StoppableThread):
chunk = [] chunk = []
while True: while True:
try: try:
data = addrQueue.get(False) data = queues.addrQueue.get(False)
chunk.append(data) chunk.append(data)
except queue.Empty: except queue.Empty:
break break
@ -43,7 +43,7 @@ class AddrThread(StoppableThread):
if filtered: if filtered:
i.append_write_buf(assembleAddrMessage(filtered)) i.append_write_buf(assembleAddrMessage(filtered))
addrQueue.iterate() queues.addrQueue.iterate()
for i in range(len(chunk)): for i in range(len(chunk)):
addrQueue.task_done() queues.addrQueue.task_done()
self.stop.wait(1) self.stop.wait(1)

View File

@ -6,7 +6,7 @@ import threading
import time import time
import network.asyncore_pollchoose as asyncore import network.asyncore_pollchoose as asyncore
import state from network import state
from threads import BusyError, nonBlocking from threads import BusyError, nonBlocking

View File

@ -5,7 +5,7 @@ import time
# magic imports! # magic imports!
import connectionpool import connectionpool
from bmconfigparser import config from network import config
from protocol import assembleAddrMessage from protocol import assembleAddrMessage
from node import Peer from node import Peer

View File

@ -4,8 +4,7 @@ BMObject and it's exceptions.
import logging import logging
import time import time
import protocol from network import state, protocol
import state
import connectionpool import connectionpool
from network import dandelion_ins from network import dandelion_ins
from highlevelcrypto import calculateInventoryHash from highlevelcrypto import calculateInventoryHash

View File

@ -11,13 +11,9 @@ import struct
import time import time
# magic imports! # magic imports!
import addresses
import knownnodes import knownnodes
import protocol from network import protocol, state, config, queues, addresses, dandelion_ins
import state
import connectionpool import connectionpool
from bmconfigparser import config
from queues import invQueue, objectProcessorQueue, portCheckerQueue
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
from network.advanceddispatcher import AdvancedDispatcher from network.advanceddispatcher import AdvancedDispatcher
from network.bmobject import ( from network.bmobject import (
@ -26,7 +22,6 @@ from network.bmobject import (
BMObjectUnwantedStreamError BMObjectUnwantedStreamError
) )
from network.proxy import ProxyError from network.proxy import ProxyError
from network import dandelion_ins
from node import Node, Peer from node import Node, Peer
from objectracker import ObjectTracker, missingObjects from objectracker import ObjectTracker, missingObjects
@ -409,7 +404,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
try: try:
self.object.checkObjectByType() self.object.checkObjectByType()
objectProcessorQueue.put(( queues.objectProcessorQueue.put((
self.object.objectType, buffer(self.object.data))) # noqa: F821 self.object.objectType, buffer(self.object.data))) # noqa: F821
except BMObjectInvalidError: except BMObjectInvalidError:
BMProto.stopDownloadingObject(self.object.inventoryHash, True) BMProto.stopDownloadingObject(self.object.inventoryHash, True)
@ -431,7 +426,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
) )
self.handleReceivedObject( self.handleReceivedObject(
self.object.streamNumber, self.object.inventoryHash) self.object.streamNumber, self.object.inventoryHash)
invQueue.put(( queues.invQueue.put((
self.object.streamNumber, self.object.inventoryHash, self.object.streamNumber, self.object.inventoryHash,
self.destination)) self.destination))
return True return True
@ -472,7 +467,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def bm_command_portcheck(self): def bm_command_portcheck(self):
"""Incoming port check request, queue it.""" """Incoming port check request, queue it."""
portCheckerQueue.put(Peer(self.destination, self.peerNode.port)) queues.portCheckerQueue.put(Peer(self.destination, self.peerNode.port))
return True return True
def bm_command_ping(self): def bm_command_ping(self):

View File

@ -6,10 +6,7 @@ import logging
import random import random
import knownnodes import knownnodes
import protocol from network import protocol, state, config, queues
import state
from bmconfigparser import config
from queues import queue, portCheckerQueue
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -34,10 +31,10 @@ def chooseConnection(stream):
onionOnly = config.safeGetBoolean( onionOnly = config.safeGetBoolean(
"bitmessagesettings", "onionservicesonly") "bitmessagesettings", "onionservicesonly")
try: try:
retval = portCheckerQueue.get(False) retval = queues.portCheckerQueue.get(False)
portCheckerQueue.task_done() queues.portCheckerQueue.task_done()
return retval return retval
except queue.Empty: except queues.queue.Empty:
pass pass
# with a probability of 0.5, connect to a discovered peer # with a probability of 0.5, connect to a discovered peer
if random.choice((False, True)) and not haveOnion: # nosec B311 if random.choice((False, True)) and not haveOnion: # nosec B311

View File

@ -11,9 +11,7 @@ import time
import asyncore_pollchoose as asyncore import asyncore_pollchoose as asyncore
import helper_random import helper_random
import knownnodes import knownnodes
import protocol from network import protocol, state, config
import state
from bmconfigparser import config
from connectionchooser import chooseConnection from connectionchooser import chooseConnection
from node import Peer from node import Peer
from proxy import Proxy from proxy import Proxy

View File

@ -2,12 +2,9 @@
`DownloadThread` class definition `DownloadThread` class definition
""" """
import time import time
import state from network import state, protocol, addresses, dandelion_ins
import addresses
import helper_random import helper_random
import protocol
import connectionpool import connectionpool
from network import dandelion_ins
from objectracker import missingObjects from objectracker import missingObjects
from threads import StoppableThread from threads import StoppableThread

View File

@ -5,12 +5,9 @@ import Queue
import random import random
from time import time from time import time
import addresses from network import protocol, state, queues, addresses
import protocol
import state
import connectionpool import connectionpool
from network import dandelion_ins from network import dandelion_ins
from queues import invQueue
from threads import StoppableThread from threads import StoppableThread
@ -52,9 +49,9 @@ class InvThread(StoppableThread):
chunk = [] chunk = []
while True: while True:
# Dandelion fluff trigger by expiration # Dandelion fluff trigger by expiration
handleExpiredDandelion(dandelion_ins.expire(invQueue)) handleExpiredDandelion(dandelion_ins.expire(queues.invQueue))
try: try:
data = invQueue.get(False) data = queues.invQueue.get(False)
chunk.append((data[0], data[1])) chunk.append((data[0], data[1]))
# locally generated # locally generated
if len(data) == 2 or data[2] is None: if len(data) == 2 or data[2] is None:
@ -101,9 +98,9 @@ class InvThread(StoppableThread):
addresses.encodeVarint( addresses.encodeVarint(
len(stems)) + ''.join(stems))) len(stems)) + ''.join(stems)))
invQueue.iterate() queues.invQueue.iterate()
for _ in range(len(chunk)): for _ in range(len(chunk)):
invQueue.task_done() queues.invQueue.task_done()
dandelion_ins.reRandomiseStems() dandelion_ins.reRandomiseStems()

View File

@ -15,8 +15,7 @@ try:
except ImportError: except ImportError:
from collections import Iterable from collections import Iterable
import state from network import state, config
from bmconfigparser import config
from network.node import Peer from network.node import Peer
state.Peer = Peer state.Peer = Peer

View File

@ -3,7 +3,7 @@ A thread to handle network concerns
""" """
import network.asyncore_pollchoose as asyncore import network.asyncore_pollchoose as asyncore
import connectionpool import connectionpool
from queues import excQueue from network import queues
from threads import StoppableThread from threads import StoppableThread
@ -16,7 +16,7 @@ class BMNetworkThread(StoppableThread):
while not self._stopped: while not self._stopped:
connectionpool.pool.loop() connectionpool.pool.loop()
except Exception as e: except Exception as e:
excQueue.put((self.name, e)) queues.excQueue.put((self.name, e))
raise raise
def stopThread(self): def stopThread(self):

View File

@ -8,7 +8,7 @@ import time
import asyncore_pollchoose as asyncore import asyncore_pollchoose as asyncore
from advanceddispatcher import AdvancedDispatcher from advanceddispatcher import AdvancedDispatcher
from bmconfigparser import config from network import config
from node import Peer from node import Peer
logger = logging.getLogger('default') logger = logging.getLogger('default')

View File

@ -7,7 +7,7 @@ import socket
import connectionpool import connectionpool
from network.advanceddispatcher import UnknownStateError from network.advanceddispatcher import UnknownStateError
from queues import receiveDataQueue from network import queues
from threads import StoppableThread from threads import StoppableThread
@ -20,7 +20,7 @@ class ReceiveQueueThread(StoppableThread):
def run(self): def run(self):
while not self._stopped: while not self._stopped:
try: try:
dest = receiveDataQueue.get(block=True, timeout=1) dest = queues.receiveDataQueue.get(block=True, timeout=1)
except Queue.Empty: except Queue.Empty:
continue continue
@ -38,7 +38,7 @@ class ReceiveQueueThread(StoppableThread):
connection = connectionpool.pool.getConnectionByAddr(dest) connection = connectionpool.pool.getConnectionByAddr(dest)
# connection object not found # connection object not found
except KeyError: except KeyError:
receiveDataQueue.task_done() queues.receiveDataQueue.task_done()
continue continue
try: try:
connection.process() connection.process()
@ -52,4 +52,4 @@ class ReceiveQueueThread(StoppableThread):
self.logger.error('Socket error: %s', err) self.logger.error('Socket error: %s', err)
except: # noqa:E722 except: # noqa:E722
self.logger.error('Error processing', exc_info=True) self.logger.error('Error processing', exc_info=True)
receiveDataQueue.task_done() queues.receiveDataQueue.task_done()

View File

@ -10,16 +10,11 @@ import socket
import time import time
# magic imports! # magic imports!
import addresses
import helper_random import helper_random
import l10n import l10n
import protocol from network import protocol, state, config, queues, addresses, dandelion_ins
import state
import connectionpool import connectionpool
from bmconfigparser import config
from highlevelcrypto import randomBytes from highlevelcrypto import randomBytes
from network import dandelion_ins
from queues import invQueue, receiveDataQueue, UISignalQueue
from tr import _translate from tr import _translate
import asyncore_pollchoose as asyncore import asyncore_pollchoose as asyncore
@ -109,7 +104,7 @@ class TCPConnection(BMProto, TLSDispatcher):
max_known_nodes = max( max_known_nodes = max(
len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes)
delay = math.ceil(math.log(max_known_nodes + 2, 20)) * ( delay = math.ceil(math.log(max_known_nodes + 2, 20)) * (
0.2 + invQueue.queueCount / 2.0) 0.2 + queues.invQueue.queueCount / 2.0)
# take the stream with maximum amount of nodes # take the stream with maximum amount of nodes
# +2 is to avoid problems with log(0) and log(1) # +2 is to avoid problems with log(0) and log(1)
# 20 is avg connected nodes count # 20 is avg connected nodes count
@ -135,7 +130,7 @@ class TCPConnection(BMProto, TLSDispatcher):
if BMProto.timeOffsetWrongCount > \ if BMProto.timeOffsetWrongCount > \
maximumTimeOffsetWrongCount and \ maximumTimeOffsetWrongCount and \
not self.fullyEstablished: not self.fullyEstablished:
UISignalQueue.put(( queues.UISignalQueue.put((
'updateStatusBar', 'updateStatusBar',
_translate( _translate(
"MainWindow", "MainWindow",
@ -158,8 +153,8 @@ class TCPConnection(BMProto, TLSDispatcher):
"""Initiate inventory synchronisation.""" """Initiate inventory synchronisation."""
if not self.isOutbound and not self.local: if not self.isOutbound and not self.local:
state.clientHasReceivedIncomingConnections = True state.clientHasReceivedIncomingConnections = True
UISignalQueue.put(('setStatusIcon', 'green')) queues.UISignalQueue.put(('setStatusIcon', 'green'))
UISignalQueue.put(( queues.UISignalQueue.put((
'updateNetworkStatusTab', (self.isOutbound, True, self.destination) 'updateNetworkStatusTab', (self.isOutbound, True, self.destination)
)) ))
self.antiIntersectionDelay(True) self.antiIntersectionDelay(True)
@ -169,7 +164,7 @@ class TCPConnection(BMProto, TLSDispatcher):
knownnodes.increaseRating(self.destination) knownnodes.increaseRating(self.destination)
knownnodes.addKnownNode( knownnodes.addKnownNode(
self.streams, self.destination, time.time()) self.streams, self.destination, time.time())
dandelion_ins.maybeAddStem(self, invQueue) dandelion_ins.maybeAddStem(self, queues.invQueue)
self.sendAddr() self.sendAddr()
self.sendBigInv() self.sendBigInv()
@ -271,12 +266,12 @@ class TCPConnection(BMProto, TLSDispatcher):
connectionpool.pool.streams, dandelion_ins.enabled, connectionpool.pool.streams, dandelion_ins.enabled,
False, nodeid=self.nodeid)) False, nodeid=self.nodeid))
self.connectedAt = time.time() self.connectedAt = time.time()
receiveDataQueue.put(self.destination) queues.receiveDataQueue.put(self.destination)
def handle_read(self): def handle_read(self):
"""Callback for reading from a socket""" """Callback for reading from a socket"""
TLSDispatcher.handle_read(self) TLSDispatcher.handle_read(self)
receiveDataQueue.put(self.destination) queues.receiveDataQueue.put(self.destination)
def handle_write(self): def handle_write(self):
"""Callback for writing to a socket""" """Callback for writing to a socket"""
@ -286,7 +281,7 @@ class TCPConnection(BMProto, TLSDispatcher):
"""Callback for connection being closed.""" """Callback for connection being closed."""
host_is_global = self.isOutbound or not self.local and not state.socksIP host_is_global = self.isOutbound or not self.local and not state.socksIP
if self.fullyEstablished: if self.fullyEstablished:
UISignalQueue.put(( queues.UISignalQueue.put((
'updateNetworkStatusTab', 'updateNetworkStatusTab',
(self.isOutbound, False, self.destination) (self.isOutbound, False, self.destination)
)) ))

View File

@ -10,7 +10,7 @@ import sys
import network.asyncore_pollchoose as asyncore import network.asyncore_pollchoose as asyncore
import paths import paths
from network.advanceddispatcher import AdvancedDispatcher from network.advanceddispatcher import AdvancedDispatcher
from queues import receiveDataQueue from network import queues
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -216,5 +216,5 @@ class TLSDispatcher(AdvancedDispatcher):
self.bm_proto_reset() self.bm_proto_reset()
self.set_state("connection_fully_established") self.set_state("connection_fully_established")
receiveDataQueue.put(self.destination) queues.receiveDataQueue.put(self.destination)
return False return False

View File

@ -6,10 +6,8 @@ import socket
import time import time
# magic imports! # magic imports!
import protocol from network import protocol, state, queues
import state
import connectionpool import connectionpool
from queues import receiveDataQueue
from bmproto import BMProto from bmproto import BMProto
from node import Peer from node import Peer
@ -138,7 +136,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
# self.local works correctly # self.local works correctly
self.read_buf[0:] = recdata self.read_buf[0:] = recdata
self.bm_proto_reset() self.bm_proto_reset()
receiveDataQueue.put(self.listening) queues.receiveDataQueue.put(self.listening)
def handle_write(self): def handle_write(self):
try: try:

View File

@ -4,8 +4,7 @@
import time import time
import helper_random import helper_random
import protocol from network import protocol, state
import state
import connectionpool import connectionpool
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
from network import dandelion_ins from network import dandelion_ins

View File

@ -0,0 +1,15 @@
import state
import queues
import protocol
import paths
import randomtrackingdict
import addresses
from bmconfigparser import config
def importParentPackageDepsToNetwork():
"""
Exports parent package dependencies to the network.
"""
return (state, queues, config, protocol, randomtrackingdict, addresses, paths)