Refactor parent package dependency in network module #2255
|
@ -414,7 +414,7 @@ class SettingsDialog(QtGui.QDialog):
|
||||||
'bitmessagesettings', 'udp'):
|
'bitmessagesettings', 'udp'):
|
||||||
self.config.set('bitmessagesettings', 'udp', str(udp_enabled))
|
self.config.set('bitmessagesettings', 'udp', str(udp_enabled))
|
||||||
if udp_enabled:
|
if udp_enabled:
|
||||||
announceThread = AnnounceThread()
|
announceThread = AnnounceThread(self.config)
|
||||||
announceThread.daemon = True
|
announceThread.daemon = True
|
||||||
announceThread.start()
|
announceThread.start()
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -1,9 +1,18 @@
|
||||||
"""
|
"""
|
||||||
Network subsystem package
|
Network subsystem package
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
import networkdepsinterface
|
||||||
|
except ImportError:
|
||||||
|
from pybitmessage import networkdepsinterface
|
||||||
|
|
||||||
from .dandelion import Dandelion
|
from .dandelion import Dandelion
|
||||||
from .threads import StoppableThread
|
from .threads import StoppableThread
|
||||||
|
|
||||||
|
(
|
||||||
|
state, queues, config, protocol,
|
||||||
|
randomtrackingdict, addresses, paths) = networkdepsinterface.importParentPackageDepsToNetwork()
|
||||||
|
|
||||||
dandelion_ins = Dandelion()
|
dandelion_ins = Dandelion()
|
||||||
|
|
||||||
__all__ = ["StoppableThread"]
|
__all__ = ["StoppableThread"]
|
||||||
|
@ -11,7 +20,6 @@ __all__ = ["StoppableThread"]
|
||||||
|
|
||||||
def start(config, state):
|
def start(config, state):
|
||||||
"""Start network threads"""
|
"""Start network threads"""
|
||||||
import state
|
|
||||||
from .announcethread import AnnounceThread
|
from .announcethread import AnnounceThread
|
||||||
import connectionpool # pylint: disable=relative-import
|
import connectionpool # pylint: disable=relative-import
|
||||||
from .addrthread import AddrThread
|
from .addrthread import AddrThread
|
||||||
|
@ -30,18 +38,19 @@ def start(config, state):
|
||||||
readKnownNodes()
|
readKnownNodes()
|
||||||
connectionpool.pool.connectToStream(1)
|
connectionpool.pool.connectToStream(1)
|
||||||
for thread in (
|
for thread in (
|
||||||
BMNetworkThread(), InvThread(), AddrThread(),
|
BMNetworkThread(queues), InvThread(protocol, state, queues, addresses),
|
||||||
DownloadThread(), UploadThread()
|
AddrThread(protocol, queues), DownloadThread(state, protocol, addresses),
|
||||||
|
UploadThread(protocol, state)
|
||||||
):
|
):
|
||||||
thread.daemon = True
|
thread.daemon = True
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|
||||||
# Optional components
|
# Optional components
|
||||||
for i in range(config.getint('threads', 'receive')):
|
for i in range(config.getint('threads', 'receive')):
|
||||||
thread = ReceiveQueueThread(i)
|
thread = ReceiveQueueThread(queues, i)
|
||||||
thread.daemon = True
|
thread.daemon = True
|
||||||
thread.start()
|
thread.start()
|
||||||
if config.safeGetBoolean('bitmessagesettings', 'udp'):
|
if config.safeGetBoolean('bitmessagesettings', 'udp'):
|
||||||
state.announceThread = AnnounceThread()
|
state.announceThread = AnnounceThread(config)
|
||||||
state.announceThread.daemon = True
|
state.announceThread.daemon = True
|
||||||
state.announceThread.start()
|
state.announceThread.start()
|
||||||
|
|
|
@ -6,8 +6,6 @@ from six.moves import queue
|
||||||
# magic imports!
|
# magic imports!
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from helper_random import randomshuffle
|
from helper_random import randomshuffle
|
||||||
from protocol import assembleAddrMessage
|
|
||||||
from queues import addrQueue # FIXME: init with queue
|
|
||||||
|
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
@ -16,12 +14,17 @@ class AddrThread(StoppableThread):
|
||||||
"""(Node) address broadcasting thread"""
|
"""(Node) address broadcasting thread"""
|
||||||
name = "AddrBroadcaster"
|
name = "AddrBroadcaster"
|
||||||
|
|
||||||
|
def __init__(self, protocol, queues):
|
||||||
|
self.protocol = protocol
|
||||||
|
self.queues = queues
|
||||||
|
StoppableThread.__init__(self)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while not self._stopped:
|
while not self._stopped:
|
||||||
chunk = []
|
chunk = []
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
data = addrQueue.get(False)
|
data = self.queues.addrQueue.get(False)
|
||||||
chunk.append(data)
|
chunk.append(data)
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
break
|
break
|
||||||
|
@ -41,9 +44,9 @@ class AddrThread(StoppableThread):
|
||||||
continue
|
continue
|
||||||
filtered.append((stream, peer, seen))
|
filtered.append((stream, peer, seen))
|
||||||
if filtered:
|
if filtered:
|
||||||
i.append_write_buf(assembleAddrMessage(filtered))
|
i.append_write_buf(self.protocol.assembleAddrMessage(filtered))
|
||||||
|
|
||||||
addrQueue.iterate()
|
self.queues.addrQueue.iterate()
|
||||||
for i in range(len(chunk)):
|
for i in range(len(chunk)):
|
||||||
addrQueue.task_done()
|
self.queues.addrQueue.task_done()
|
||||||
self.stop.wait(1)
|
self.stop.wait(1)
|
||||||
|
|
|
@ -6,7 +6,7 @@ import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import network.asyncore_pollchoose as asyncore
|
import network.asyncore_pollchoose as asyncore
|
||||||
import state
|
from network import state
|
||||||
from threads import BusyError, nonBlocking
|
from threads import BusyError, nonBlocking
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ import time
|
||||||
|
|
||||||
# magic imports!
|
# magic imports!
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from bmconfigparser import config
|
|
||||||
from protocol import assembleAddrMessage
|
from protocol import assembleAddrMessage
|
||||||
|
|
||||||
from node import Peer
|
from node import Peer
|
||||||
|
@ -17,18 +16,22 @@ class AnnounceThread(StoppableThread):
|
||||||
name = "Announcer"
|
name = "Announcer"
|
||||||
announceInterval = 60
|
announceInterval = 60
|
||||||
|
|
||||||
|
def __init__(self, config):
|
||||||
|
self.config = config
|
||||||
|
StoppableThread.__init__(self)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
lastSelfAnnounced = 0
|
lastSelfAnnounced = 0
|
||||||
while not self._stopped:
|
while not self._stopped:
|
||||||
processed = 0
|
processed = 0
|
||||||
if lastSelfAnnounced < time.time() - self.announceInterval:
|
if lastSelfAnnounced < time.time() - self.announceInterval:
|
||||||
self.announceSelf()
|
self.announceSelf(self.config)
|
||||||
lastSelfAnnounced = time.time()
|
lastSelfAnnounced = time.time()
|
||||||
if processed == 0:
|
if processed == 0:
|
||||||
self.stop.wait(10)
|
self.stop.wait(10)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def announceSelf():
|
def announceSelf(config):
|
||||||
"""Announce our presence"""
|
"""Announce our presence"""
|
||||||
for connection in connectionpool.pool.udpSockets.values():
|
for connection in connectionpool.pool.udpSockets.values():
|
||||||
if not connection.announcing:
|
if not connection.announcing:
|
||||||
|
|
|
@ -4,8 +4,7 @@ BMObject and it's exceptions.
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import protocol
|
from network import state, protocol
|
||||||
import state
|
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from network import dandelion_ins
|
from network import dandelion_ins
|
||||||
from highlevelcrypto import calculateInventoryHash
|
from highlevelcrypto import calculateInventoryHash
|
||||||
|
|
|
@ -11,13 +11,9 @@ import struct
|
||||||
import time
|
import time
|
||||||
|
|
||||||
# magic imports!
|
# magic imports!
|
||||||
import addresses
|
|
||||||
import knownnodes
|
import knownnodes
|
||||||
import protocol
|
from network import protocol, state, config, queues, addresses, dandelion_ins
|
||||||
import state
|
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from bmconfigparser import config
|
|
||||||
from queues import invQueue, objectProcessorQueue, portCheckerQueue
|
|
||||||
from randomtrackingdict import RandomTrackingDict
|
from randomtrackingdict import RandomTrackingDict
|
||||||
from network.advanceddispatcher import AdvancedDispatcher
|
from network.advanceddispatcher import AdvancedDispatcher
|
||||||
from network.bmobject import (
|
from network.bmobject import (
|
||||||
|
@ -26,7 +22,6 @@ from network.bmobject import (
|
||||||
BMObjectUnwantedStreamError
|
BMObjectUnwantedStreamError
|
||||||
)
|
)
|
||||||
from network.proxy import ProxyError
|
from network.proxy import ProxyError
|
||||||
from network import dandelion_ins
|
|
||||||
from node import Node, Peer
|
from node import Node, Peer
|
||||||
from objectracker import ObjectTracker, missingObjects
|
from objectracker import ObjectTracker, missingObjects
|
||||||
|
|
||||||
|
@ -409,7 +404,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.object.checkObjectByType()
|
self.object.checkObjectByType()
|
||||||
objectProcessorQueue.put((
|
queues.objectProcessorQueue.put((
|
||||||
self.object.objectType, buffer(self.object.data))) # noqa: F821
|
self.object.objectType, buffer(self.object.data))) # noqa: F821
|
||||||
except BMObjectInvalidError:
|
except BMObjectInvalidError:
|
||||||
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
|
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
|
||||||
|
@ -431,7 +426,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
)
|
)
|
||||||
self.handleReceivedObject(
|
self.handleReceivedObject(
|
||||||
self.object.streamNumber, self.object.inventoryHash)
|
self.object.streamNumber, self.object.inventoryHash)
|
||||||
invQueue.put((
|
queues.invQueue.put((
|
||||||
self.object.streamNumber, self.object.inventoryHash,
|
self.object.streamNumber, self.object.inventoryHash,
|
||||||
self.destination))
|
self.destination))
|
||||||
return True
|
return True
|
||||||
|
@ -472,7 +467,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
|
|
||||||
def bm_command_portcheck(self):
|
def bm_command_portcheck(self):
|
||||||
"""Incoming port check request, queue it."""
|
"""Incoming port check request, queue it."""
|
||||||
portCheckerQueue.put(Peer(self.destination, self.peerNode.port))
|
queues.portCheckerQueue.put(Peer(self.destination, self.peerNode.port))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def bm_command_ping(self):
|
def bm_command_ping(self):
|
||||||
|
|
|
@ -6,10 +6,7 @@ import logging
|
||||||
import random
|
import random
|
||||||
|
|
||||||
import knownnodes
|
import knownnodes
|
||||||
import protocol
|
from network import protocol, state, config, queues
|
||||||
import state
|
|
||||||
from bmconfigparser import config
|
|
||||||
from queues import queue, portCheckerQueue
|
|
||||||
|
|
||||||
logger = logging.getLogger('default')
|
logger = logging.getLogger('default')
|
||||||
|
|
||||||
|
@ -34,10 +31,10 @@ def chooseConnection(stream):
|
||||||
onionOnly = config.safeGetBoolean(
|
onionOnly = config.safeGetBoolean(
|
||||||
"bitmessagesettings", "onionservicesonly")
|
"bitmessagesettings", "onionservicesonly")
|
||||||
try:
|
try:
|
||||||
retval = portCheckerQueue.get(False)
|
retval = queues.portCheckerQueue.get(False)
|
||||||
portCheckerQueue.task_done()
|
queues.portCheckerQueue.task_done()
|
||||||
return retval
|
return retval
|
||||||
except queue.Empty:
|
except queues.queue.Empty:
|
||||||
pass
|
pass
|
||||||
# with a probability of 0.5, connect to a discovered peer
|
# with a probability of 0.5, connect to a discovered peer
|
||||||
if random.choice((False, True)) and not haveOnion: # nosec B311
|
if random.choice((False, True)) and not haveOnion: # nosec B311
|
||||||
|
|
|
@ -11,9 +11,7 @@ import time
|
||||||
import asyncore_pollchoose as asyncore
|
import asyncore_pollchoose as asyncore
|
||||||
import helper_random
|
import helper_random
|
||||||
import knownnodes
|
import knownnodes
|
||||||
import protocol
|
from network import protocol, state, config
|
||||||
import state
|
|
||||||
from bmconfigparser import config
|
|
||||||
from connectionchooser import chooseConnection
|
from connectionchooser import chooseConnection
|
||||||
from node import Peer
|
from node import Peer
|
||||||
from proxy import Proxy
|
from proxy import Proxy
|
||||||
|
|
|
@ -2,12 +2,9 @@
|
||||||
`DownloadThread` class definition
|
`DownloadThread` class definition
|
||||||
"""
|
"""
|
||||||
import time
|
import time
|
||||||
import state
|
|
||||||
import addresses
|
|
||||||
import helper_random
|
|
||||||
import protocol
|
|
||||||
import connectionpool
|
|
||||||
from network import dandelion_ins
|
from network import dandelion_ins
|
||||||
|
import helper_random
|
||||||
|
import connectionpool
|
||||||
from objectracker import missingObjects
|
from objectracker import missingObjects
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
@ -20,8 +17,11 @@ class DownloadThread(StoppableThread):
|
||||||
cleanInterval = 60
|
cleanInterval = 60
|
||||||
requestExpires = 3600
|
requestExpires = 3600
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, state, protocol, addresses):
|
||||||
super(DownloadThread, self).__init__(name="Downloader")
|
super(DownloadThread, self).__init__(name="Downloader")
|
||||||
|
self.state = state
|
||||||
|
self.protocol = protocol
|
||||||
|
self.addresses = addresses
|
||||||
self.lastCleaned = time.time()
|
self.lastCleaned = time.time()
|
||||||
|
|
||||||
def cleanPending(self):
|
def cleanPending(self):
|
||||||
|
@ -60,7 +60,7 @@ class DownloadThread(StoppableThread):
|
||||||
payload = bytearray()
|
payload = bytearray()
|
||||||
chunkCount = 0
|
chunkCount = 0
|
||||||
for chunk in request:
|
for chunk in request:
|
||||||
if chunk in state.Inventory and not dandelion_ins.hasHash(chunk):
|
if chunk in self.state.Inventory and not dandelion_ins.hasHash(chunk):
|
||||||
try:
|
try:
|
||||||
del i.objectsNewToMe[chunk]
|
del i.objectsNewToMe[chunk]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -71,8 +71,8 @@ class DownloadThread(StoppableThread):
|
||||||
missingObjects[chunk] = now
|
missingObjects[chunk] = now
|
||||||
if not chunkCount:
|
if not chunkCount:
|
||||||
continue
|
continue
|
||||||
payload[0:0] = addresses.encodeVarint(chunkCount)
|
payload[0:0] = self.addresses.encodeVarint(chunkCount)
|
||||||
i.append_write_buf(protocol.CreatePacket('getdata', payload))
|
i.append_write_buf(self.protocol.CreatePacket('getdata', payload))
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
'%s:%i Requesting %i objects',
|
'%s:%i Requesting %i objects',
|
||||||
i.destination.host, i.destination.port, chunkCount)
|
i.destination.host, i.destination.port, chunkCount)
|
||||||
|
|
|
@ -5,12 +5,8 @@ import Queue
|
||||||
import random
|
import random
|
||||||
from time import time
|
from time import time
|
||||||
|
|
||||||
import addresses
|
|
||||||
import protocol
|
|
||||||
import state
|
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from network import dandelion_ins
|
from network import dandelion_ins
|
||||||
from queues import invQueue
|
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
|
||||||
|
@ -37,6 +33,13 @@ class InvThread(StoppableThread):
|
||||||
|
|
||||||
name = "InvBroadcaster"
|
name = "InvBroadcaster"
|
||||||
|
|
||||||
|
def __init__(self, protocol, state, queues, addresses):
|
||||||
|
self.protocol = protocol
|
||||||
|
self.state = state
|
||||||
|
self.queues = queues
|
||||||
|
self.addresses = addresses
|
||||||
|
StoppableThread.__init__(self)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def handleLocallyGenerated(stream, hashId):
|
def handleLocallyGenerated(stream, hashId):
|
||||||
"""Locally generated inventory items require special handling"""
|
"""Locally generated inventory items require special handling"""
|
||||||
|
@ -48,13 +51,13 @@ class InvThread(StoppableThread):
|
||||||
connection.objectsNewToThem[hashId] = time()
|
connection.objectsNewToThem[hashId] = time()
|
||||||
|
|
||||||
def run(self): # pylint: disable=too-many-branches
|
def run(self): # pylint: disable=too-many-branches
|
||||||
while not state.shutdown: # pylint: disable=too-many-nested-blocks
|
while not self.state.shutdown: # pylint: disable=too-many-nested-blocks
|
||||||
chunk = []
|
chunk = []
|
||||||
while True:
|
while True:
|
||||||
# Dandelion fluff trigger by expiration
|
# Dandelion fluff trigger by expiration
|
||||||
handleExpiredDandelion(dandelion_ins.expire(invQueue))
|
handleExpiredDandelion(dandelion_ins.expire(self.queues.invQueue))
|
||||||
try:
|
try:
|
||||||
data = invQueue.get(False)
|
data = self.queues.invQueue.get(False)
|
||||||
chunk.append((data[0], data[1]))
|
chunk.append((data[0], data[1]))
|
||||||
# locally generated
|
# locally generated
|
||||||
if len(data) == 2 or data[2] is None:
|
if len(data) == 2 or data[2] is None:
|
||||||
|
@ -81,7 +84,7 @@ class InvThread(StoppableThread):
|
||||||
if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311
|
if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311
|
||||||
fluffs.append(inv[1])
|
fluffs.append(inv[1])
|
||||||
# send a dinv only if the stem node supports dandelion
|
# send a dinv only if the stem node supports dandelion
|
||||||
elif connection.services & protocol.NODE_DANDELION > 0:
|
elif connection.services & self.protocol.NODE_DANDELION > 0:
|
||||||
stems.append(inv[1])
|
stems.append(inv[1])
|
||||||
else:
|
else:
|
||||||
fluffs.append(inv[1])
|
fluffs.append(inv[1])
|
||||||
|
@ -90,20 +93,20 @@ class InvThread(StoppableThread):
|
||||||
|
|
||||||
if fluffs:
|
if fluffs:
|
||||||
random.shuffle(fluffs)
|
random.shuffle(fluffs)
|
||||||
connection.append_write_buf(protocol.CreatePacket(
|
connection.append_write_buf(self.protocol.CreatePacket(
|
||||||
'inv',
|
'inv',
|
||||||
addresses.encodeVarint(
|
self.addresses.encodeVarint(
|
||||||
len(fluffs)) + ''.join(fluffs)))
|
len(fluffs)) + ''.join(fluffs)))
|
||||||
if stems:
|
if stems:
|
||||||
random.shuffle(stems)
|
random.shuffle(stems)
|
||||||
connection.append_write_buf(protocol.CreatePacket(
|
connection.append_write_buf(self.protocol.CreatePacket(
|
||||||
'dinv',
|
'dinv',
|
||||||
addresses.encodeVarint(
|
self.addresses.encodeVarint(
|
||||||
len(stems)) + ''.join(stems)))
|
len(stems)) + ''.join(stems)))
|
||||||
|
|
||||||
invQueue.iterate()
|
self.queues.invQueue.iterate()
|
||||||
for _ in range(len(chunk)):
|
for _ in range(len(chunk)):
|
||||||
invQueue.task_done()
|
self.queues.invQueue.task_done()
|
||||||
|
|
||||||
dandelion_ins.reRandomiseStems()
|
dandelion_ins.reRandomiseStems()
|
||||||
|
|
||||||
|
|
|
@ -15,8 +15,7 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
from collections import Iterable
|
from collections import Iterable
|
||||||
|
|
||||||
import state
|
from network import state, config
|
||||||
from bmconfigparser import config
|
|
||||||
from network.node import Peer
|
from network.node import Peer
|
||||||
|
|
||||||
state.Peer = Peer
|
state.Peer = Peer
|
||||||
|
|
|
@ -3,7 +3,6 @@ A thread to handle network concerns
|
||||||
"""
|
"""
|
||||||
import network.asyncore_pollchoose as asyncore
|
import network.asyncore_pollchoose as asyncore
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from queues import excQueue
|
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
|
||||||
|
@ -11,12 +10,16 @@ class BMNetworkThread(StoppableThread):
|
||||||
"""Main network thread"""
|
"""Main network thread"""
|
||||||
name = "Asyncore"
|
name = "Asyncore"
|
||||||
|
|
||||||
|
def __init__(self, queues):
|
||||||
|
self.queues = queues
|
||||||
|
StoppableThread.__init__(self)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
while not self._stopped:
|
while not self._stopped:
|
||||||
connectionpool.pool.loop()
|
connectionpool.pool.loop()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
excQueue.put((self.name, e))
|
self.queues.excQueue.put((self.name, e))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def stopThread(self):
|
def stopThread(self):
|
||||||
|
|
|
@ -8,7 +8,7 @@ import time
|
||||||
|
|
||||||
import asyncore_pollchoose as asyncore
|
import asyncore_pollchoose as asyncore
|
||||||
from advanceddispatcher import AdvancedDispatcher
|
from advanceddispatcher import AdvancedDispatcher
|
||||||
from bmconfigparser import config
|
from network import config
|
||||||
from node import Peer
|
from node import Peer
|
||||||
|
|
||||||
logger = logging.getLogger('default')
|
logger = logging.getLogger('default')
|
||||||
|
|
|
@ -7,20 +7,20 @@ import socket
|
||||||
|
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from network.advanceddispatcher import UnknownStateError
|
from network.advanceddispatcher import UnknownStateError
|
||||||
from queues import receiveDataQueue
|
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
|
||||||
class ReceiveQueueThread(StoppableThread):
|
class ReceiveQueueThread(StoppableThread):
|
||||||
"""This thread processes data received from the network
|
"""This thread processes data received from the network
|
||||||
(which is done by the asyncore thread)"""
|
(which is done by the asyncore thread)"""
|
||||||
def __init__(self, num=0):
|
def __init__(self, queues, num=0):
|
||||||
|
self.queues = queues
|
||||||
super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num)
|
super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while not self._stopped:
|
while not self._stopped:
|
||||||
try:
|
try:
|
||||||
dest = receiveDataQueue.get(block=True, timeout=1)
|
dest = self.queues.receiveDataQueue.get(block=True, timeout=1)
|
||||||
except Queue.Empty:
|
except Queue.Empty:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ class ReceiveQueueThread(StoppableThread):
|
||||||
connection = connectionpool.pool.getConnectionByAddr(dest)
|
connection = connectionpool.pool.getConnectionByAddr(dest)
|
||||||
# connection object not found
|
# connection object not found
|
||||||
except KeyError:
|
except KeyError:
|
||||||
receiveDataQueue.task_done()
|
self.queues.receiveDataQueue.task_done()
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
connection.process()
|
connection.process()
|
||||||
|
@ -52,4 +52,4 @@ class ReceiveQueueThread(StoppableThread):
|
||||||
self.logger.error('Socket error: %s', err)
|
self.logger.error('Socket error: %s', err)
|
||||||
except: # noqa:E722
|
except: # noqa:E722
|
||||||
self.logger.error('Error processing', exc_info=True)
|
self.logger.error('Error processing', exc_info=True)
|
||||||
receiveDataQueue.task_done()
|
self.queues.receiveDataQueue.task_done()
|
||||||
|
|
|
@ -10,16 +10,11 @@ import socket
|
||||||
import time
|
import time
|
||||||
|
|
||||||
# magic imports!
|
# magic imports!
|
||||||
import addresses
|
|
||||||
import helper_random
|
import helper_random
|
||||||
import l10n
|
import l10n
|
||||||
import protocol
|
from network import protocol, state, config, queues, addresses, dandelion_ins
|
||||||
import state
|
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from bmconfigparser import config
|
|
||||||
from highlevelcrypto import randomBytes
|
from highlevelcrypto import randomBytes
|
||||||
from network import dandelion_ins
|
|
||||||
from queues import invQueue, receiveDataQueue, UISignalQueue
|
|
||||||
from tr import _translate
|
from tr import _translate
|
||||||
|
|
||||||
import asyncore_pollchoose as asyncore
|
import asyncore_pollchoose as asyncore
|
||||||
|
@ -109,7 +104,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
||||||
max_known_nodes = max(
|
max_known_nodes = max(
|
||||||
len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes)
|
len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes)
|
||||||
delay = math.ceil(math.log(max_known_nodes + 2, 20)) * (
|
delay = math.ceil(math.log(max_known_nodes + 2, 20)) * (
|
||||||
0.2 + invQueue.queueCount / 2.0)
|
0.2 + queues.invQueue.queueCount / 2.0)
|
||||||
# take the stream with maximum amount of nodes
|
# take the stream with maximum amount of nodes
|
||||||
# +2 is to avoid problems with log(0) and log(1)
|
# +2 is to avoid problems with log(0) and log(1)
|
||||||
# 20 is avg connected nodes count
|
# 20 is avg connected nodes count
|
||||||
|
@ -135,7 +130,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
||||||
if BMProto.timeOffsetWrongCount > \
|
if BMProto.timeOffsetWrongCount > \
|
||||||
maximumTimeOffsetWrongCount and \
|
maximumTimeOffsetWrongCount and \
|
||||||
not self.fullyEstablished:
|
not self.fullyEstablished:
|
||||||
UISignalQueue.put((
|
queues.UISignalQueue.put((
|
||||||
'updateStatusBar',
|
'updateStatusBar',
|
||||||
_translate(
|
_translate(
|
||||||
"MainWindow",
|
"MainWindow",
|
||||||
|
@ -158,8 +153,8 @@ class TCPConnection(BMProto, TLSDispatcher):
|
||||||
"""Initiate inventory synchronisation."""
|
"""Initiate inventory synchronisation."""
|
||||||
if not self.isOutbound and not self.local:
|
if not self.isOutbound and not self.local:
|
||||||
state.clientHasReceivedIncomingConnections = True
|
state.clientHasReceivedIncomingConnections = True
|
||||||
UISignalQueue.put(('setStatusIcon', 'green'))
|
queues.UISignalQueue.put(('setStatusIcon', 'green'))
|
||||||
UISignalQueue.put((
|
queues.UISignalQueue.put((
|
||||||
'updateNetworkStatusTab', (self.isOutbound, True, self.destination)
|
'updateNetworkStatusTab', (self.isOutbound, True, self.destination)
|
||||||
))
|
))
|
||||||
self.antiIntersectionDelay(True)
|
self.antiIntersectionDelay(True)
|
||||||
|
@ -169,7 +164,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
||||||
knownnodes.increaseRating(self.destination)
|
knownnodes.increaseRating(self.destination)
|
||||||
knownnodes.addKnownNode(
|
knownnodes.addKnownNode(
|
||||||
self.streams, self.destination, time.time())
|
self.streams, self.destination, time.time())
|
||||||
dandelion_ins.maybeAddStem(self, invQueue)
|
dandelion_ins.maybeAddStem(self, queues.invQueue)
|
||||||
self.sendAddr()
|
self.sendAddr()
|
||||||
self.sendBigInv()
|
self.sendBigInv()
|
||||||
|
|
||||||
|
@ -271,12 +266,12 @@ class TCPConnection(BMProto, TLSDispatcher):
|
||||||
connectionpool.pool.streams, dandelion_ins.enabled,
|
connectionpool.pool.streams, dandelion_ins.enabled,
|
||||||
False, nodeid=self.nodeid))
|
False, nodeid=self.nodeid))
|
||||||
self.connectedAt = time.time()
|
self.connectedAt = time.time()
|
||||||
receiveDataQueue.put(self.destination)
|
queues.receiveDataQueue.put(self.destination)
|
||||||
|
|
||||||
def handle_read(self):
|
def handle_read(self):
|
||||||
"""Callback for reading from a socket"""
|
"""Callback for reading from a socket"""
|
||||||
TLSDispatcher.handle_read(self)
|
TLSDispatcher.handle_read(self)
|
||||||
receiveDataQueue.put(self.destination)
|
queues.receiveDataQueue.put(self.destination)
|
||||||
|
|
||||||
def handle_write(self):
|
def handle_write(self):
|
||||||
"""Callback for writing to a socket"""
|
"""Callback for writing to a socket"""
|
||||||
|
@ -286,7 +281,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
||||||
"""Callback for connection being closed."""
|
"""Callback for connection being closed."""
|
||||||
host_is_global = self.isOutbound or not self.local and not state.socksIP
|
host_is_global = self.isOutbound or not self.local and not state.socksIP
|
||||||
if self.fullyEstablished:
|
if self.fullyEstablished:
|
||||||
UISignalQueue.put((
|
queues.UISignalQueue.put((
|
||||||
'updateNetworkStatusTab',
|
'updateNetworkStatusTab',
|
||||||
(self.isOutbound, False, self.destination)
|
(self.isOutbound, False, self.destination)
|
||||||
))
|
))
|
||||||
|
|
|
@ -10,7 +10,7 @@ import sys
|
||||||
import network.asyncore_pollchoose as asyncore
|
import network.asyncore_pollchoose as asyncore
|
||||||
import paths
|
import paths
|
||||||
from network.advanceddispatcher import AdvancedDispatcher
|
from network.advanceddispatcher import AdvancedDispatcher
|
||||||
from queues import receiveDataQueue
|
from network import queues
|
||||||
|
|
||||||
logger = logging.getLogger('default')
|
logger = logging.getLogger('default')
|
||||||
|
|
||||||
|
@ -216,5 +216,5 @@ class TLSDispatcher(AdvancedDispatcher):
|
||||||
|
|
||||||
self.bm_proto_reset()
|
self.bm_proto_reset()
|
||||||
self.set_state("connection_fully_established")
|
self.set_state("connection_fully_established")
|
||||||
receiveDataQueue.put(self.destination)
|
queues.receiveDataQueue.put(self.destination)
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -6,10 +6,8 @@ import socket
|
||||||
import time
|
import time
|
||||||
|
|
||||||
# magic imports!
|
# magic imports!
|
||||||
import protocol
|
from network import protocol, state, queues
|
||||||
import state
|
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from queues import receiveDataQueue
|
|
||||||
|
|
||||||
from bmproto import BMProto
|
from bmproto import BMProto
|
||||||
from node import Peer
|
from node import Peer
|
||||||
|
@ -138,7 +136,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
|
||||||
# self.local works correctly
|
# self.local works correctly
|
||||||
self.read_buf[0:] = recdata
|
self.read_buf[0:] = recdata
|
||||||
self.bm_proto_reset()
|
self.bm_proto_reset()
|
||||||
receiveDataQueue.put(self.listening)
|
queues.receiveDataQueue.put(self.listening)
|
||||||
|
|
||||||
def handle_write(self):
|
def handle_write(self):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -4,8 +4,6 @@
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import helper_random
|
import helper_random
|
||||||
import protocol
|
|
||||||
import state
|
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from randomtrackingdict import RandomTrackingDict
|
from randomtrackingdict import RandomTrackingDict
|
||||||
from network import dandelion_ins
|
from network import dandelion_ins
|
||||||
|
@ -19,6 +17,11 @@ class UploadThread(StoppableThread):
|
||||||
maxBufSize = 2097152 # 2MB
|
maxBufSize = 2097152 # 2MB
|
||||||
name = "Uploader"
|
name = "Uploader"
|
||||||
|
|
||||||
|
def __init__(self, protocol, state):
|
||||||
|
self.protocol = protocol
|
||||||
|
self.state = state
|
||||||
|
StoppableThread.__init__(self)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while not self._stopped:
|
while not self._stopped:
|
||||||
uploaded = 0
|
uploaded = 0
|
||||||
|
@ -49,8 +52,8 @@ class UploadThread(StoppableThread):
|
||||||
i.destination)
|
i.destination)
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
payload.extend(protocol.CreatePacket(
|
payload.extend(self.protocol.CreatePacket(
|
||||||
'object', state.Inventory[chunk].payload))
|
'object', self.state.Inventory[chunk].payload))
|
||||||
chunk_count += 1
|
chunk_count += 1
|
||||||
except KeyError:
|
except KeyError:
|
||||||
i.antiIntersectionDelay()
|
i.antiIntersectionDelay()
|
||||||
|
|
15
src/networkdepsinterface.py
Normal file
15
src/networkdepsinterface.py
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
|
||||||
|
import state
|
||||||
|
import queues
|
||||||
|
import protocol
|
||||||
|
import paths
|
||||||
|
import randomtrackingdict
|
||||||
|
import addresses
|
||||||
|
from bmconfigparser import config
|
||||||
|
|
||||||
|
|
||||||
|
def importParentPackageDepsToNetwork():
|
||||||
|
"""
|
||||||
|
Exports parent package dependencies to the network.
|
||||||
|
"""
|
||||||
|
return (state, queues, config, protocol, randomtrackingdict, addresses, paths)
|
|
@ -74,7 +74,7 @@ class TestNetwork(TestPartialRun):
|
||||||
|
|
||||||
for _ in range(10):
|
for _ in range(10):
|
||||||
try:
|
try:
|
||||||
self.state.announceThread.announceSelf()
|
self.state.announceThread.announceSelf(self.config)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
self.fail('state.announceThread is not set properly')
|
self.fail('state.announceThread is not set properly')
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
Reference in New Issue
Block a user