From 6c695c8ac77842f02af25162b89ed250e725a584 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Wed, 9 Aug 2017 17:36:52 +0200 Subject: [PATCH] Remove non-asyncore network code (partial) --- src/api.py | 12 +----- src/bitmessagemain.py | 55 ++++++++++--------------- src/bmconfigparser.py | 1 - src/class_singleWorker.py | 36 +++------------- src/network/stats.py | 86 ++++++++++++++++----------------------- 5 files changed, 65 insertions(+), 125 deletions(-) diff --git a/src/api.py b/src/api.py index 7c67f89f..dcc83538 100644 --- a/src/api.py +++ b/src/api.py @@ -864,11 +864,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): objectType, toStreamNumber, encryptedPayload, int(time.time()) + TTL,'') with shared.printLock: print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', hexlify(inventoryHash) - if BMConfigParser().get("network", "asyncore"): - queues.invQueue.put((toStreamNumber, inventoryHash)) - else: - protocol.broadcastToSendDataQueues(( - toStreamNumber, 'advertiseobject', inventoryHash)) + queues.invQueue.put((toStreamNumber, inventoryHash)) def HandleTrashSentMessageByAckDAta(self, params): # This API method should only be used when msgid is not available @@ -914,11 +910,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): objectType, pubkeyStreamNumber, payload, int(time.time()) + TTL,'') with shared.printLock: print 'broadcasting inv within API command disseminatePubkey with hash:', hexlify(inventoryHash) - if BMConfigParser().get("network", "asyncore"): - queues.invQueue.put((pubkeyStreamNumber, inventoryHash)) - else: - protocol.broadcastToSendDataQueues(( - pubkeyStreamNumber, 'advertiseobject', inventoryHash)) + queues.invQueue.put((pubkeyStreamNumber, inventoryHash)) def HandleGetMessageDataByDestinationHash(self, params): # Method will eventually be used by a particular Android app to diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 17e97ffd..eed725d3 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -92,13 +92,7 @@ def connectToStream(streamNumber): if streamNumber*2+1 not in knownnodes.knownNodes: knownnodes.knownNodes[streamNumber*2+1] = {} - if BMConfigParser().get("network", "asyncore"): - BMConnectionPool().connectToStream(streamNumber) - else: - for i in range(state.maximumNumberOfHalfOpenConnections): - a = outgoingSynSender() - a.setup(streamNumber, selfInitiatedConnections) - a.start() + BMConnectionPool().connectToStream(streamNumber) def _fixSocket(): if sys.platform.startswith('linux'): @@ -281,36 +275,29 @@ class Main: singleAPIThread.daemon = True # close the main program even if there are threads left singleAPIThread.start() - if BMConfigParser().get("network", "asyncore"): - BMConnectionPool() - asyncoreThread = BMNetworkThread() - asyncoreThread.daemon = True - asyncoreThread.start() - for i in range(BMConfigParser().getint("threads", "receive")): - receiveQueueThread = ReceiveQueueThread(i) - receiveQueueThread.daemon = True - receiveQueueThread.start() - announceThread = AnnounceThread() - announceThread.daemon = True - announceThread.start() - state.invThread = InvThread() - state.invThread.daemon = True - state.invThread.start() - state.addrThread = AddrThread() - state.addrThread.daemon = True - state.addrThread.start() - state.downloadThread = DownloadThread() - state.downloadThread.daemon = True - state.downloadThread.start() + BMConnectionPool() + asyncoreThread = BMNetworkThread() + asyncoreThread.daemon = True + asyncoreThread.start() + for i in range(BMConfigParser().getint("threads", "receive")): + receiveQueueThread = ReceiveQueueThread(i) + receiveQueueThread.daemon = True + receiveQueueThread.start() + announceThread = AnnounceThread() + announceThread.daemon = True + announceThread.start() + state.invThread = InvThread() + state.invThread.daemon = True + state.invThread.start() + state.addrThread = AddrThread() + state.addrThread.daemon = True + state.addrThread.start() + state.downloadThread = DownloadThread() + state.downloadThread.daemon = True + state.downloadThread.start() connectToStream(1) - if not BMConfigParser().get("network", "asyncore"): - singleListenerThread = singleListener() - singleListenerThread.setup(selfInitiatedConnections) - singleListenerThread.daemon = True # close the main program even if there are threads left - singleListenerThread.start() - if BMConfigParser().safeGetBoolean('bitmessagesettings','upnp'): import upnp upnpThread = upnp.uPnPThread() diff --git a/src/bmconfigparser.py b/src/bmconfigparser.py index b8bf790f..acc4476a 100644 --- a/src/bmconfigparser.py +++ b/src/bmconfigparser.py @@ -19,7 +19,6 @@ BMConfigDefaults = { "receive": 3, }, "network": { - "asyncore": True, "bind": '', }, "inventory": { diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 07c768a4..e7f58f1b 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -192,11 +192,7 @@ class singleWorker(threading.Thread, StoppableThread): logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) - if BMConfigParser().get("network", "asyncore"): - queues.invQueue.put((streamNumber, inventoryHash)) - else: - protocol.broadcastToSendDataQueues(( - streamNumber, 'advertiseobject', inventoryHash)) + queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: BMConfigParser().set( @@ -286,11 +282,7 @@ class singleWorker(threading.Thread, StoppableThread): logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) - if BMConfigParser().get("network", "asyncore"): - queues.invQueue.put((streamNumber, inventoryHash)) - else: - protocol.broadcastToSendDataQueues(( - streamNumber, 'advertiseobject', inventoryHash)) + queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: BMConfigParser().set( @@ -380,11 +372,7 @@ class singleWorker(threading.Thread, StoppableThread): logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) - if BMConfigParser().get("network", "asyncore"): - queues.invQueue.put((streamNumber, inventoryHash)) - else: - protocol.broadcastToSendDataQueues(( - streamNumber, 'advertiseobject', inventoryHash)) + queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: BMConfigParser().set( @@ -513,11 +501,7 @@ class singleWorker(threading.Thread, StoppableThread): objectType, streamNumber, payload, embeddedTime, tag) PendingUpload().add(inventoryHash) logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash)) - if BMConfigParser().get("network", "asyncore"): - queues.invQueue.put((streamNumber, inventoryHash)) - else: - protocol.broadcastToSendDataQueues(( - streamNumber, 'advertiseobject', inventoryHash)) + queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Broadcast sent on %1").arg(l10n.formatTimestamp())))) @@ -846,11 +830,7 @@ class singleWorker(threading.Thread, StoppableThread): # not sending to a chan or one of my addresses queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Waiting for acknowledgement. Sent on %1").arg(l10n.formatTimestamp())))) logger.info('Broadcasting inv for my msg(within sendmsg function):' + hexlify(inventoryHash)) - if BMConfigParser().get("network", "asyncore"): - queues.invQueue.put((toStreamNumber, inventoryHash)) - else: - protocol.broadcastToSendDataQueues(( - toStreamNumber, 'advertiseobject', inventoryHash)) + queues.invQueue.put((toStreamNumber, inventoryHash)) # Update the sent message in the sent table with the necessary information. if BMConfigParser().has_section(toaddress) or not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK): @@ -952,11 +932,7 @@ class singleWorker(threading.Thread, StoppableThread): objectType, streamNumber, payload, embeddedTime, '') PendingUpload().add(inventoryHash) logger.info('sending inv (for the getpubkey message)') - if BMConfigParser().get("network", "asyncore"): - queues.invQueue.put((streamNumber, inventoryHash)) - else: - protocol.broadcastToSendDataQueues(( - streamNumber, 'advertiseobject', inventoryHash)) + queues.invQueue.put((streamNumber, inventoryHash)) # wait 10% past expiration sleeptill = int(time.time() + TTL * 1.1) diff --git a/src/network/stats.py b/src/network/stats.py index 7f7c83ac..45961ac1 100644 --- a/src/network/stats.py +++ b/src/network/stats.py @@ -15,67 +15,53 @@ lastSentBytes = 0 currentSentSpeed = 0 def connectedHostsList(): - if BMConfigParser().get("network", "asyncore"): - retval = [] - for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): - if not i.fullyEstablished: - continue - try: - retval.append(i) - except AttributeError: - pass - return retval - return shared.connectedHostsList.items() + retval = [] + for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): + if not i.fullyEstablished: + continue + try: + retval.append(i) + except AttributeError: + pass + return retval def sentBytes(): - if BMConfigParser().get("network", "asyncore"): - return asyncore.sentBytes - return throttle.SendThrottle().total + return asyncore.sentBytes def uploadSpeed(): global lastSentTimestamp, lastSentBytes, currentSentSpeed - if BMConfigParser().get("network", "asyncore"): - currentTimestamp = time.time() - if int(lastSentTimestamp) < int(currentTimestamp): - currentSentBytes = asyncore.sentBytes - currentSentSpeed = int((currentSentBytes - lastSentBytes) / (currentTimestamp - lastSentTimestamp)) - lastSentBytes = currentSentBytes - lastSentTimestamp = currentTimestamp - return currentSentSpeed - return throttle.sendThrottle().getSpeed() + currentTimestamp = time.time() + if int(lastSentTimestamp) < int(currentTimestamp): + currentSentBytes = asyncore.sentBytes + currentSentSpeed = int((currentSentBytes - lastSentBytes) / (currentTimestamp - lastSentTimestamp)) + lastSentBytes = currentSentBytes + lastSentTimestamp = currentTimestamp + return currentSentSpeed def receivedBytes(): - if BMConfigParser().get("network", "asyncore"): - return asyncore.receivedBytes - return throttle.ReceiveThrottle().total + return asyncore.receivedBytes def downloadSpeed(): global lastReceivedTimestamp, lastReceivedBytes, currentReceivedSpeed - if BMConfigParser().get("network", "asyncore"): - currentTimestamp = time.time() - if int(lastReceivedTimestamp) < int(currentTimestamp): - currentReceivedBytes = asyncore.receivedBytes - currentReceivedSpeed = int((currentReceivedBytes - lastReceivedBytes) / (currentTimestamp - lastReceivedTimestamp)) - lastReceivedBytes = currentReceivedBytes - lastReceivedTimestamp = currentTimestamp - return currentReceivedSpeed - return throttle.ReceiveThrottle().getSpeed() + currentTimestamp = time.time() + if int(lastReceivedTimestamp) < int(currentTimestamp): + currentReceivedBytes = asyncore.receivedBytes + currentReceivedSpeed = int((currentReceivedBytes - lastReceivedBytes) / (currentTimestamp - lastReceivedTimestamp)) + lastReceivedBytes = currentReceivedBytes + lastReceivedTimestamp = currentTimestamp + return currentReceivedSpeed def pendingDownload(): - if BMConfigParser().get("network", "asyncore"): - tmp = {} - for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): - for k in connection.objectsNewToMe.keys(): - tmp[k] = True - return len(tmp) - return PendingDownloadQueue.totalSize() + tmp = {} + for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): + for k in connection.objectsNewToMe.keys(): + tmp[k] = True + return len(tmp) def pendingUpload(): - if BMConfigParser().get("network", "asyncore"): - return 0 - tmp = {} - for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): - for k in connection.objectsNewToThem.keys(): - tmp[k] = True - return len(tmp) - return PendingUpload().len() + return 0 + tmp = {} + for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): + for k in connection.objectsNewToThem.keys(): + tmp[k] = True + return len(tmp)