From c85d52b8e8e8d6dfd9e67becab4cd53e8630b607 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Mon, 29 May 2017 00:24:07 +0200 Subject: [PATCH] Asyncore updates - asyncore is now on by default - inv announcements implemented - bandwidth limit implemented / fixed - stats on download / upload speed now work - make prints into logger - limit knownNodes to 20k as it was before - green light fixed - other minor fixes --- src/api.py | 14 +++-- src/bitmessagemain.py | 4 ++ src/bitmessageqt/networkstatus.py | 2 +- src/bmconfigparser.py | 2 +- src/class_singleCleaner.py | 9 ++- src/class_singleWorker.py | 42 ++++++++++---- src/network/advanceddispatcher.py | 36 ++++++------ src/network/announcethread.py | 2 +- src/network/asyncore_pollchoose.py | 71 ++++++++++++----------- src/network/bmproto.py | 91 ++++++++++++------------------ src/network/connectionpool.py | 5 +- src/network/invthread.py | 82 +++++++++++++++++++++++++++ src/network/networkthread.py | 2 +- src/network/receivequeuethread.py | 2 +- src/network/stats.py | 49 +++++++++++++++- src/network/tcp.py | 12 +++- src/network/tls.py | 7 ++- src/network/udp.py | 12 ++-- src/protocol.py | 23 +++++--- src/queues.py | 1 + src/state.py | 2 + src/storage/filesystem.py | 4 +- 22 files changed, 316 insertions(+), 158 deletions(-) create mode 100644 src/network/invthread.py diff --git a/src/api.py b/src/api.py index 24c0fc12..f2334484 100644 --- a/src/api.py +++ b/src/api.py @@ -858,8 +858,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): objectType, toStreamNumber, encryptedPayload, int(time.time()) + TTL,'') with shared.printLock: print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', hexlify(inventoryHash) - protocol.broadcastToSendDataQueues(( - toStreamNumber, 'advertiseobject', inventoryHash)) + if BMConfigParser.safeGetBoolean("network", "asyncore"): + queues.invQueue.put((toStreamNumber, inventoryHash)) + else: + protocol.broadcastToSendDataQueues(( + toStreamNumber, 'advertiseobject', inventoryHash)) def HandleTrashSentMessageByAckDAta(self, params): # This API method should only be used when msgid is not available @@ -905,8 +908,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): objectType, pubkeyStreamNumber, payload, int(time.time()) + TTL,'') with shared.printLock: print 'broadcasting inv within API command disseminatePubkey with hash:', hexlify(inventoryHash) - protocol.broadcastToSendDataQueues(( - pubkeyStreamNumber, 'advertiseobject', inventoryHash)) + if BMConfigParser.safeGetBoolean("network", "asyncore"): + queues.invQueue.put((pubkeyStreamNumber, inventoryHash)) + else: + protocol.broadcastToSendDataQueues(( + pubkeyStreamNumber, 'advertiseobject', inventoryHash)) def HandleGetMessageDataByDestinationHash(self, params): # Method will eventually be used by a particular Android app to diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index d566852c..ca578c43 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -57,6 +57,7 @@ from network.connectionpool import BMConnectionPool from network.networkthread import BMNetworkThread from network.receivequeuethread import ReceiveQueueThread from network.announcethread import AnnounceThread +from network.invthread import InvThread #from network.downloadthread import DownloadThread # Helper Functions @@ -275,6 +276,9 @@ class Main: announceThread = AnnounceThread() announceThread.daemon = True announceThread.start() + state.invThread = InvThread() + state.invThread.daemon = True + state.invThread.start() connectToStream(1) diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index 158f02fa..ef9e4c42 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -46,7 +46,7 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): return "%4.0f kB" % num def updateNumberOfObjectsToBeSynced(self): - self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, PendingDownloadQueue.totalSize() + PendingUpload().len())) + self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, network.stats.pendingDownload() + network.stats.pendingUpload())) def updateNumberOfMessagesProcessed(self): self.updateNumberOfObjectsToBeSynced() diff --git a/src/bmconfigparser.py b/src/bmconfigparser.py index 8c4d8b3c..b727ad65 100644 --- a/src/bmconfigparser.py +++ b/src/bmconfigparser.py @@ -16,7 +16,7 @@ BMConfigDefaults = { "maxuploadrate": 0, }, "network": { - "asyncore": False, + "asyncore": True, "bind": None, }, "inventory": { diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 6c0a62f6..22b83079 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -1,4 +1,5 @@ import threading +import resource import shared import time import sys @@ -9,6 +10,7 @@ from bmconfigparser import BMConfigParser from helper_sql import * from helper_threading import * from inventory import Inventory +from network.connectionpool import BMConnectionPool from debug import logger import knownnodes import queues @@ -36,6 +38,7 @@ resends msg messages in 5 days (then 10 days, then 20 days, etc...) class singleCleaner(threading.Thread, StoppableThread): + cycleLength = 300 def __init__(self): threading.Thread.__init__(self, name="singleCleaner") @@ -51,7 +54,7 @@ class singleCleaner(threading.Thread, StoppableThread): # initial wait if state.shutdown == 0: - self.stop.wait(300) + self.stop.wait(singleCleaner.cycleLength) while state.shutdown == 0: queues.UISignalQueue.put(( @@ -119,8 +122,10 @@ class singleCleaner(threading.Thread, StoppableThread): # TODO: cleanup pending upload / download + logger.info("Memory usage %s (kB)", resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) + if state.shutdown == 0: - self.stop.wait(300) + self.stop.wait(singleCleaner.cycleLength) def resendPubkeyRequest(address): diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index c2d16de4..ff357b70 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -192,8 +192,11 @@ class singleWorker(threading.Thread, StoppableThread): logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues(( - streamNumber, 'advertiseobject', inventoryHash)) + if BMConfigParser.safeGetBoolean("network", "asyncore"): + queues.invQueue.put((streamNumber, inventoryHash)) + else: + protocol.broadcastToSendDataQueues(( + streamNumber, 'advertiseobject', inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: BMConfigParser().set( @@ -283,8 +286,11 @@ class singleWorker(threading.Thread, StoppableThread): logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues(( - streamNumber, 'advertiseobject', inventoryHash)) + if BMConfigParser.safeGetBoolean("network", "asyncore"): + queues.invQueue.put((streamNumber, inventoryHash)) + else: + protocol.broadcastToSendDataQueues(( + streamNumber, 'advertiseobject', inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: BMConfigParser().set( @@ -374,8 +380,11 @@ class singleWorker(threading.Thread, StoppableThread): logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues(( - streamNumber, 'advertiseobject', inventoryHash)) + if BMConfigParser.safeGetBoolean("network", "asyncore"): + queues.invQueue.put((streamNumber, inventoryHash)) + else: + protocol.broadcastToSendDataQueues(( + streamNumber, 'advertiseobject', inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: BMConfigParser().set( @@ -504,8 +513,11 @@ class singleWorker(threading.Thread, StoppableThread): objectType, streamNumber, payload, embeddedTime, tag) PendingUpload().add(inventoryHash) logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues(( - streamNumber, 'advertiseobject', inventoryHash)) + if BMConfigParser.safeGetBoolean("network", "asyncore"): + queues.invQueue.put((streamNumber, inventoryHash)) + else: + protocol.broadcastToSendDataQueues(( + streamNumber, 'advertiseobject', inventoryHash)) queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Broadcast sent on %1").arg(l10n.formatTimestamp())))) @@ -834,8 +846,11 @@ class singleWorker(threading.Thread, StoppableThread): # not sending to a chan or one of my addresses queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Waiting for acknowledgement. Sent on %1").arg(l10n.formatTimestamp())))) logger.info('Broadcasting inv for my msg(within sendmsg function):' + hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues(( - toStreamNumber, 'advertiseobject', inventoryHash)) + if BMConfigParser.safeGetBoolean("network", "asyncore"): + queues.invQueue.put((toStreamNumber, inventoryHash)) + else: + protocol.broadcastToSendDataQueues(( + toStreamNumber, 'advertiseobject', inventoryHash)) # Update the sent message in the sent table with the necessary information. if BMConfigParser().has_section(toaddress) or not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK): @@ -937,8 +952,11 @@ class singleWorker(threading.Thread, StoppableThread): objectType, streamNumber, payload, embeddedTime, '') PendingUpload().add(inventoryHash) logger.info('sending inv (for the getpubkey message)') - protocol.broadcastToSendDataQueues(( - streamNumber, 'advertiseobject', inventoryHash)) + if BMConfigParser.safeGetBoolean("network", "asyncore"): + queues.invQueue.put((streamNumber, inventoryHash)) + else: + protocol.broadcastToSendDataQueues(( + streamNumber, 'advertiseobject', inventoryHash)) # wait 10% past expiration sleeptill = int(time.time() + TTL * 1.1) diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index 96b206cf..5f8a94aa 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -2,6 +2,7 @@ import Queue import time import asyncore_pollchoose as asyncore +from debug import logger from bmconfigparser import BMConfigParser class AdvancedDispatcher(asyncore.dispatcher): @@ -56,44 +57,45 @@ class AdvancedDispatcher(asyncore.dispatcher): self.state = state def writable(self): - return self.connecting or len(self.write_buf) > 0 or not self.writeQueue.empty() + return asyncore.dispatcher.writable(self) and \ + (self.connecting or len(self.write_buf) > 0 or not self.writeQueue.empty()) def readable(self): - return self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len + return asyncore.dispatcher.readable(self) and \ + (self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len) def handle_read(self): self.lastTx = time.time() downloadBytes = AdvancedDispatcher._buf_len if asyncore.maxDownloadRate > 0: - downloadBytes = asyncore.downloadChunk + downloadBytes = asyncore.downloadBucket if self.expectBytes > 0 and downloadBytes > self.expectBytes: downloadBytes = self.expectBytes - newData = self.recv(downloadBytes) - if asyncore.maxDownloadRate > 0: - asyncore.downloadBucket -= len(newData) - self.receivedBytes += len(newData) - if self.expectBytes > 0: - self.expectBytes -= len(newData) - asyncore.updateReceived(len(newData)) - self.read_buf += newData + if downloadBytes > 0: + newData = self.recv(downloadBytes) + self.receivedBytes += len(newData) + if self.expectBytes > 0: + self.expectBytes -= len(newData) + asyncore.update_received(len(newData)) + self.read_buf += newData self.process() def handle_write(self): self.lastTx = time.time() + bufSize = AdvancedDispatcher._buf_len if asyncore.maxUploadRate > 0: - bufSize = asyncore.uploadChunk - else: - bufSize = self._buf_len + bufSize = asyncore.uploadBucket while len(self.write_buf) < bufSize: try: self.write_buf += self.writeQueue.get(False) self.writeQueue.task_done() except Queue.Empty: break + if bufSize <= 0: + return if len(self.write_buf) > 0: written = self.send(self.write_buf[0:bufSize]) - asyncore.uploadBucket -= written - asyncore.updateSent(written) + asyncore.update_sent(written) self.sentBytes += written self.slice_write_buf(written) @@ -107,7 +109,7 @@ class AdvancedDispatcher(asyncore.dispatcher): def close(self): self.read_buf = b"" self.write_buf = b"" - self.state = "shutdown" + self.state = "close" while True: try: self.writeQueue.get(False) diff --git a/src/network/announcethread.py b/src/network/announcethread.py index 0ba93d7a..29ed301e 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -17,7 +17,7 @@ class AnnounceThread(threading.Thread, StoppableThread): self.initStop() self.name = "AnnounceThread" BMConnectionPool() - logger.error("init announce thread") + logger.info("init announce thread") def run(self): lastSelfAnnounced = 0 diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index dda6d7c2..25a5b3fb 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -90,12 +90,10 @@ class ExitNow(Exception): _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit) maxDownloadRate = 0 -downloadChunk = 0 downloadTimestamp = 0 downloadBucket = 0 receivedBytes = 0 maxUploadRate = 0 -uploadChunk = 0 uploadTimestamp = 0 uploadBucket = 0 sentBytes = 0 @@ -117,48 +115,37 @@ def write(obj): obj.handle_error() def set_rates(download, upload): - global maxDownloadRate, maxUploadRate, downloadChunk, uploadChunk, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp + global maxDownloadRate, maxUploadRate, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp maxDownloadRate = float(download) - if maxDownloadRate > 0: - downloadChunk = 1400 maxUploadRate = float(upload) - if maxUploadRate > 0: - uploadChunk = 1400 downloadBucket = maxDownloadRate uploadBucket = maxUploadRate downloadTimestamp = time.time() uploadTimestamp = time.time() -def updateReceived(download=0): - global receivedBytes +def update_received(download=0): + global receivedBytes, maxDownloadRate, downloadBucket, downloadTimestamp + currentTimestamp = time.time() receivedBytes += download + if maxDownloadRate > 0: + bucketIncrease = int(maxDownloadRate * (currentTimestamp - downloadTimestamp)) + downloadBucket += bucketIncrease + if downloadBucket > maxDownloadRate: + downloadBucket = int(maxDownloadRate) + downloadBucket -= download + downloadTimestamp = currentTimestamp -def updateSent(upload=0): - global sentBytes +def update_sent(upload=0): + global sentBytes, maxUploadRate, uploadBucket, uploadTimestamp + currentTimestamp = time.time() sentBytes += upload - -def wait_tx_buckets(): - global downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp - if maxDownloadRate > 0 and maxUploadRate > 0: - wait_for_this_long = min(maxDownloadRate / downloadChunk, maxUploadRate / uploadChunk) - elif maxDownloadRate > 0: - wait_for_this_long = maxDownloadRate / downloadChunk - elif maxUploadRate > 0: - wait_for_this_long = maxUploadRate / uploadChunk - else: - return - wait_for_this_long /= 2 - if wait_for_this_long > 1: - wait_for_this_long = 1 - elif wait_for_this_long < 0.1: - wait_for_this_long = 0.1 - - while downloadBucket < downloadChunk and uploadBucket < uploadChunk: - time.sleep(wait_for_this_long) - downloadBucket += (time.time() - downloadTimestamp) * maxDownloadRate - downloadTimestamp = time.time() - uploadBucket += (time.time() - uploadTimestamp) * maxUploadRate - uploadTimestamp = time.time() + if maxUploadRate > 0: + bucketIncrease = int(maxUploadRate * (currentTimestamp - uploadTimestamp)) + uploadBucket += bucketIncrease + if uploadBucket > maxUploadRate: + uploadBucket = int(maxUploadRate) + uploadBucket -= upload + uploadTimestamp = currentTimestamp def _exception(obj): try: @@ -376,13 +363,19 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, if count is None: while map: - wait_tx_buckets() + # fill buckets first + update_sent() + update_received() + # then poll poller(timeout, map) else: timeout /= count while map and count > 0: - wait_tx_buckets() + # fill buckets first + update_sent() + update_received() poller(timeout, map) + # then poll count = count - 1 class dispatcher: @@ -396,6 +389,8 @@ class dispatcher: ignore_log_types = frozenset(['warning']) poller_registered = False flags = 0 + # don't do network IO with a smaller bucket than this + minTx = 1500 def __init__(self, sock=None, map=None): if map is None: @@ -499,9 +494,13 @@ class dispatcher: # ================================================== def readable(self): + if maxDownloadRate > 0: + return downloadBucket > dispatcher.minTx return True def writable(self): + if maxUploadRate > 0: + return uploadBucket > dispatcher.minTx return True # ================================================== diff --git a/src/network/bmproto.py b/src/network/bmproto.py index d3f02568..a99bdfb2 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -3,7 +3,6 @@ from binascii import hexlify import hashlib import math import time -from pprint import pprint import socket import struct import random @@ -25,7 +24,7 @@ from network.uploadqueue import UploadQueue, UploadElem, AddrUploadQueue, ObjUpl import addresses from bmconfigparser import BMConfigParser -from queues import objectProcessorQueue, portCheckerQueue, UISignalQueue +from queues import objectProcessorQueue, portCheckerQueue, UISignalQueue, invQueue import shared import state import protocol @@ -53,35 +52,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # maximum time offset maxTimeOffset = 3600 -# def __init__(self, address=None, sock=None): -# AdvancedDispatcher.__init__(self, sock) -# self.verackReceived = False -# self.verackSent = False -# self.lastTx = time.time() -# self.streams = [0] -# self.fullyEstablished = False -# self.connectedAt = 0 -# self.skipUntil = 0 -# if address is None and sock is not None: -# self.destination = state.Peer(sock.getpeername()[0], sock.getpeername()[1]) -# self.isOutbound = False -# TLSDispatcher.__init__(self, sock, server_side=True) -# self.connectedAt = time.time() -# #print "received connection in background from %s:%i" % (self.destination.host, self.destination.port) -# else: -# self.destination = address -# self.isOutbound = True -# if ":" in address.host: -# self.create_socket(socket.AF_INET6, socket.SOCK_STREAM) -# else: -# self.create_socket(socket.AF_INET, socket.SOCK_STREAM) -# self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -# TLSDispatcher.__init__(self, sock, server_side=False) -# self.connect(self.destination) -# #print "connecting in background to %s:%i" % (self.destination.host, self.destination.port) -# shared.connectedHostsList[self.destination] = 0 -# ObjectTracker.__init__(self) -# UISignalQueue.put(('updateNetworkStatusTab', 'no data')) + def __init__(self, address=None, sock=None): + AdvancedDispatcher.__init__(self, sock) + self.isOutbound = False + # packet/connection from a local IP + self.local = False def bm_proto_reset(self): self.magic = None @@ -95,7 +70,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.object = None def state_bm_header(self): - #print "%s:%i: header" % (self.destination.host, self.destination.port) if len(self.read_buf) < protocol.Header.size: #print "Length below header size" return False @@ -105,7 +79,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # skip 1 byte in order to sync self.bm_proto_reset() self.set_state("bm_header", 1) - print "Bad magic" + logger.debug("Bad magic") self.close() return False if self.payloadLength > BMProto.maxMessageSize: @@ -117,10 +91,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker): if len(self.read_buf) < self.payloadLength: #print "Length below announced object length" return False - print "%s:%i: command %s (%ib)" % (self.destination.host, self.destination.port, self.command, self.payloadLength) + #logger.debug("%s:%i: command %s (%ib)", self.destination.host, self.destination.port, self.command, self.payloadLength) self.payload = self.read_buf[:self.payloadLength] if self.checksum != hashlib.sha512(self.payload).digest()[0:4]: - print "Bad checksum, ignoring" + logger.debug("Bad checksum, ignoring") self.invalid = True retval = True if not self.fullyEstablished and self.command not in ("version", "verack"): @@ -131,28 +105,28 @@ class BMProto(AdvancedDispatcher, ObjectTracker): retval = getattr(self, "bm_command_" + str(self.command).lower())() except AttributeError: # unimplemented command - print "unimplemented command %s" % (self.command) + logger.debug("unimplemented command %s", self.command) except BMProtoInsufficientDataError: - print "packet length too short, skipping" + logger.debug("packet length too short, skipping") except BMProtoExcessiveDataError: - print "too much data, skipping" + logger.debug("too much data, skipping") except BMObjectInsufficientPOWError: - print "insufficient PoW, skipping" + logger.debug("insufficient PoW, skipping") except BMObjectInvalidDataError: - print "object invalid data, skipping" + logger.debug("object invalid data, skipping") except BMObjectExpiredError: - print "object expired, skipping" + logger.debug("object expired, skipping") except BMObjectUnwantedStreamError: - print "object not in wanted stream, skipping" + logger.debug("object not in wanted stream, skipping") except BMObjectInvalidError: - print "object invalid, skipping" + logger.debug("object invalid, skipping") except BMObjectAlreadyHaveError: - print "already got object, skipping" + logger.debug("already got object, skipping") except struct.error: - print "decoding error, skipping" + logger.debug("decoding error, skipping") else: #print "Skipping command %s due to invalid data" % (self.command) - print "Closing due to invalid data" % (self.command) + logger.debug("Closing due to invalid command %s", self.command) self.close() return False if retval: @@ -253,13 +227,13 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.payloadOffset += 8 i += 1 if self.payloadOffset > self.payloadLength: - print "Insufficient data %i/%i" % (self.payloadOffset, self.payloadLength) + logger.debug("Insufficient data %i/%i", self.payloadOffset, self.payloadLength) raise BMProtoInsufficientDataError() return retval def bm_command_error(self): fatalStatus, banTime, inventoryVector, errorText = self.decode_payload_content("vvlsls") - print "%s:%i error: %i, %s" % (self.destination.host, self.destination.port, fatalStatus, errorText) + logger.error("%s:%i error: %i, %s", self.destination.host, self.destination.port, fatalStatus, errorText) return True def bm_command_getdata(self): @@ -325,6 +299,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): objectProcessorQueue.put((self.object.objectType,self.object.data)) #DownloadQueue().task_done(self.object.inventoryHash) network.connectionpool.BMConnectionPool().handleReceivedObject(self, self.object.streamNumber, self.object.inventoryHash) + invQueue.put((self.object.streamNumber, self.object.inventoryHash)) #ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash)) #broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) return True @@ -344,8 +319,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker): peer = state.Peer(decodedIP, port) if peer in knownnodes.knownNodes[stream] and knownnodes.knownNodes[stream][peer] > seenTime: continue - knownnodes.knownNodes[stream][peer] = seenTime - AddrUploadQueue().put((stream, peer)) + if len(knownnodes.knownNodes[stream]) < 20000: + with knownnodes.knownNodesLock: + knownnodes.knownNodes[stream][peer] = seenTime + #knownnodes.knownNodes[stream][peer] = seenTime + #AddrUploadQueue().put((stream, peer)) return True def bm_command_portcheck(self): @@ -392,7 +370,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.verackSent = True if not self.isOutbound: self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, True)) - print "%s:%i: Sending version" % (self.destination.host, self.destination.port) + #print "%s:%i: Sending version" % (self.destination.host, self.destination.port) if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and protocol.haveSSL(not self.isOutbound)): self.isSSL = True @@ -472,10 +450,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def close(self, reason=None): self.set_state("close") -# if reason is None: -# print "%s:%i: closing" % (self.destination.host, self.destination.port) -# #traceback.print_stack() -# else: -# print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason) + if reason is None: + #logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, ''.join(traceback.format_stack())) + logger.debug("%s:%i: closing", self.destination.host, self.destination.port) + #traceback.print_stack() + else: + logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, reason) network.connectionpool.BMConnectionPool().removeConnection(self) AdvancedDispatcher.close(self) diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index a75b62aa..d1a6b6ee 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -21,8 +21,8 @@ import state class BMConnectionPool(object): def __init__(self): asyncore.set_rates( - BMConfigParser().safeGetInt("bitmessagesettings", "maxdownloadrate"), - BMConfigParser().safeGetInt("bitmessagesettings", "maxuploadrate")) + BMConfigParser().safeGetInt("bitmessagesettings", "maxdownloadrate") * 1024, + BMConfigParser().safeGetInt("bitmessagesettings", "maxuploadrate") * 1024) self.outboundConnections = {} self.inboundConnections = {} self.listeningSockets = {} @@ -117,7 +117,6 @@ class BMConnectionPool(object): if spawnConnections: if not self.bootstrapped: - print "bootstrapping dns" helper_bootstrap.dns() self.bootstrapped = True established = sum(1 for c in self.outboundConnections.values() if (c.connected and c.fullyEstablished)) diff --git a/src/network/invthread.py b/src/network/invthread.py new file mode 100644 index 00000000..75d53a20 --- /dev/null +++ b/src/network/invthread.py @@ -0,0 +1,82 @@ +import collections +import Queue +import random +import threading +import time + +import addresses +from bmconfigparser import BMConfigParser +from debug import logger +from helper_threading import StoppableThread +from network.bmproto import BMProto +from network.connectionpool import BMConnectionPool +from queues import invQueue +import protocol +import state + +class InvThread(threading.Thread, StoppableThread): + size = 10 + + def __init__(self): + threading.Thread.__init__(self, name="InvThread") + self.initStop() + self.name = "InvThread" + + self.shutdown = False + + self.collectionOfInvs = [] + for i in range(InvThread.size): + self.collectionOfInvs.append({}) + + def run(self): + iterator = 0 + while not state.shutdown: + while True: + try: + (stream, hash) = invQueue.get(False) + self.holdHash (stream, hash) + except Queue.Empty: + break + + if len(self.collectionOfInvs[iterator]) > 0: + for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): + hashes = [] + for stream in connection.streams: + try: + for hashId in self.collectionOfInvs[iterator][stream]: + if hashId in connection.objectsNewToThem: + hashes.append(hashId) + del connection.objectsNewToThem[hashId] + except KeyError: + continue + if len(hashes) > 0: + connection.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(len(hashes)) + b"".join(hashes))) + self.collectionOfInvs[iterator] = [] + iterator += 1 + iterator %= InvThread.size + self.stop.wait(1) + + def holdHash(self, stream, hash): + iter = random.randrange(0, InvThread.size) + try: + self.collectionOfInvs[iter][stream].append(hash) + except KeyError, IndexError: + self.collectionOfInvs[iter][stream] = [] + self.collectionOfInvs[iter][stream].append(hash) + + def hasHash(self, hash): + for streamlist in self.collectionOfInvs: + for stream in streamlist: + if hash in streamlist[stream]: + return True + return False + + def hashCount(self): + retval = 0 + for streamlist in self.collectionOfInvs: + for stream in streamlist: + retval += len(streamlist[stream]) + return retval + + def close(self): + self.shutdown = True diff --git a/src/network/networkthread.py b/src/network/networkthread.py index 498bd340..bb6c0301 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -12,7 +12,7 @@ class BMNetworkThread(threading.Thread, StoppableThread): self.initStop() self.name = "AsyncoreThread" BMConnectionPool() - logger.error("init asyncore thread") + logger.info("init asyncore thread") def run(self): while not self._stopped: diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index c5509b65..a0a2c4b8 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -17,7 +17,7 @@ class ReceiveQueueThread(threading.Thread, StoppableThread): self.initStop() self.name = "ReceiveQueueThread" BMConnectionPool() - logger.error("init receive queue thread") + logger.info("init receive queue thread") def run(self): lastprinted = int(time.time()) diff --git a/src/network/stats.py b/src/network/stats.py index 838ef23a..36baf2cb 100644 --- a/src/network/stats.py +++ b/src/network/stats.py @@ -1,9 +1,19 @@ +import time + from bmconfigparser import BMConfigParser from network.connectionpool import BMConnectionPool +from inventory import PendingDownloadQueue, PendingUpload import asyncore_pollchoose as asyncore import shared import throttle +lastReceivedTimestamp = time.time() +lastReceivedBytes = 0 +currentReceivedSpeed = 0 +lastSentTimestamp = time.time() +lastSentBytes = 0 +currentSentSpeed = 0 + def connectedHostsList(): if BMConfigParser().safeGetBoolean("network", "asyncore"): retval = [] @@ -25,8 +35,15 @@ def sentBytes(): return throttle.SendThrottle().total def uploadSpeed(): + global lastSentTimestamp, lastSentBytes, currentSentSpeed if BMConfigParser().safeGetBoolean("network", "asyncore"): - return 0 + currentTimestamp = time.time() + if int(lastSentTimestamp) < int(currentTimestamp): + currentSentBytes = asyncore.sentBytes + currentSentSpeed = int((currentSentBytes - lastSentBytes) / (currentTimestamp - lastSentTimestamp)) + lastSentBytes = currentSentBytes + lastSentTimestamp = currentTimestamp + return currentSentSpeed else: return throttle.sendThrottle().getSpeed() @@ -37,7 +54,35 @@ def receivedBytes(): return throttle.ReceiveThrottle().total def downloadSpeed(): + global lastReceivedTimestamp, lastReceivedBytes, currentReceivedSpeed if BMConfigParser().safeGetBoolean("network", "asyncore"): - return 0 + currentTimestamp = time.time() + if int(lastReceivedTimestamp) < int(currentTimestamp): + currentReceivedBytes = asyncore.receivedBytes + currentReceivedSpeed = int((currentReceivedBytes - lastReceivedBytes) / (currentTimestamp - lastReceivedTimestamp)) + lastReceivedBytes = currentReceivedBytes + lastReceivedTimestamp = currentTimestamp + return currentReceivedSpeed else: return throttle.ReceiveThrottle().getSpeed() + +def pendingDownload(): + if BMConfigParser().safeGetBoolean("network", "asyncore"): + tmp = {} + for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): + for k in connection.objectsNewToMe.keys(): + tmp[k] = True + return len(tmp) + else: + return PendingDownloadQueue.totalSize() + +def pendingUpload(): + if BMConfigParser().safeGetBoolean("network", "asyncore"): + return 0 + tmp = {} + for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): + for k in connection.objectsNewToThem.keys(): + tmp[k] = True + return len(tmp) + else: + return PendingUpload().len() diff --git a/src/network/tcp.py b/src/network/tcp.py index 8c5fb968..42d5a831 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -36,7 +36,7 @@ import protocol class TCPConnection(BMProto, TLSDispatcher): def __init__(self, address=None, sock=None): - AdvancedDispatcher.__init__(self, sock) + BMProto.__init__(self, address=address, sock=sock) self.verackReceived = False self.verackSent = False self.streams = [0] @@ -60,7 +60,12 @@ class TCPConnection(BMProto, TLSDispatcher): TLSDispatcher.__init__(self, sock, server_side=False) self.connect(self.destination) logger.debug("Connecting to %s:%i", self.destination.host, self.destination.port) - shared.connectedHostsList[self.destination] = 0 + encodedAddr = protocol.encodeHost(self.destination.host) + if protocol.checkIPAddress(encodedAddr, True) and not protocol.checkSocksIP(self.destination.host): + self.local = True + else: + self.local = False + #shared.connectedHostsList[self.destination] = 0 ObjectTracker.__init__(self) UISignalQueue.put(('updateNetworkStatusTab', 'no data')) self.bm_proto_reset() @@ -83,6 +88,9 @@ class TCPConnection(BMProto, TLSDispatcher): self.skipUntil = time.time() + delay def set_connection_fully_established(self): + if not self.isOutbound and not self.local: + shared.clientHasReceivedIncomingConnections = True + UISignalQueue.put(('setStatusIcon', 'green')) UISignalQueue.put(('updateNetworkStatusTab', 'no data')) self.antiIntersectionDelay(True) self.fullyEstablished = True diff --git a/src/network/tls.py b/src/network/tls.py index f813e3be..115f3faa 100644 --- a/src/network/tls.py +++ b/src/network/tls.py @@ -7,6 +7,7 @@ import socket import ssl import sys +from debug import logger from network.advanceddispatcher import AdvancedDispatcher import network.asyncore_pollchoose as asyncore import paths @@ -108,10 +109,10 @@ class TLSDispatcher(AdvancedDispatcher): return False # Perform the handshake. try: - print "handshaking (internal)" + #print "handshaking (internal)" self.sslSocket.do_handshake() except ssl.SSLError, err: - print "%s:%i: handshake fail" % (self.destination.host, self.destination.port) + #print "%s:%i: handshake fail" % (self.destination.host, self.destination.port) self.want_read = self.want_write = False if err.args[0] == ssl.SSL_ERROR_WANT_READ: #print "want read" @@ -122,7 +123,7 @@ class TLSDispatcher(AdvancedDispatcher): if not (self.want_write or self.want_read): raise else: - print "%s:%i: TLS handshake success%s" % (self.destination.host, self.destination.port, ", TLS protocol version: %s" % (self.sslSocket.version()) if sys.version_info >= (2, 7, 9) else "") + logger.debug("%s:%i: TLS handshake success%s", self.destination.host, self.destination.port, ", TLS protocol version: %s" % (self.sslSocket.version()) if sys.version_info >= (2, 7, 9) else "") # The handshake has completed, so remove this channel and... self.del_channel() self.set_socket(self.sslSocket) diff --git a/src/network/udp.py b/src/network/udp.py index 9e687603..29e434a2 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -35,7 +35,7 @@ class UDPSocket(BMProto): announceInterval = 60 def __init__(self, host=None, sock=None): - AdvancedDispatcher.__init__(self, sock) + BMProto.__init__(self, sock) self.verackReceived = True self.verackSent = True # TODO sort out streams @@ -43,7 +43,6 @@ class UDPSocket(BMProto): self.fullyEstablished = True self.connectedAt = 0 self.skipUntil = 0 - self.isOutbound = False if sock is None: if host is None: host = '' @@ -51,7 +50,7 @@ class UDPSocket(BMProto): self.create_socket(socket.AF_INET6, socket.SOCK_DGRAM) else: self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) - print "binding to %s" % (host) + logger.info("Binding UDP socket to %s:%i", host, UDPSocket.port) self.socket.bind((host, UDPSocket.port)) #BINDTODEVICE is only available on linux and requires root #try: @@ -67,10 +66,11 @@ class UDPSocket(BMProto): ObjectTracker.__init__(self) self.connecting = False self.connected = True - # packet was received from a local IP - self.local = False self.set_state("bm_header") + def state_bm_command(self): + BMProto.state_bm_command(self) + # disable most commands before doing research / testing # only addr (peer discovery), error and object are implemented @@ -163,7 +163,7 @@ class UDPSocket(BMProto): return try: retval = self.socket.sendto(data, ('', UDPSocket.port)) - print "broadcasted %ib" % (retval) + #print "broadcasted %ib" % (retval) except socket.error as e: print "socket error on sendato: %s" % (e) self.writeQueue.task_done() diff --git a/src/protocol.py b/src/protocol.py index 5661dff0..d7bd5b8c 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -106,28 +106,35 @@ def checkIPAddress(host, private=False): def checkIPv4Address(host, hostStandardFormat, private=False): if host[0] == '\x7F': # 127/8 - logger.debug('Ignoring IP address in loopback range: ' + hostStandardFormat) + if not private: + logger.debug('Ignoring IP address in loopback range: ' + hostStandardFormat) return False if host[0] == '\x0A': # 10/8 - logger.debug('Ignoring IP address in private range: ' + hostStandardFormat) + if not private: + logger.debug('Ignoring IP address in private range: ' + hostStandardFormat) return hostStandardFormat if private else False if host[0:2] == '\xC0\xA8': # 192.168/16 - logger.debug('Ignoring IP address in private range: ' + hostStandardFormat) + if not private: + logger.debug('Ignoring IP address in private range: ' + hostStandardFormat) return hostStandardFormat if private else False if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12 - logger.debug('Ignoring IP address in private range:' + hostStandardFormat) - return False + if not private: + logger.debug('Ignoring IP address in private range:' + hostStandardFormat) + return hostStandardFormat if private else False return False if private else hostStandardFormat def checkIPv6Address(host, hostStandardFormat, private=False): if host == ('\x00' * 15) + '\x01': - logger.debug('Ignoring loopback address: ' + hostStandardFormat) + if not private: + logger.debug('Ignoring loopback address: ' + hostStandardFormat) return False if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80: - logger.debug ('Ignoring local address: ' + hostStandardFormat) + if not private: + logger.debug ('Ignoring local address: ' + hostStandardFormat) return hostStandardFormat if private else False if (ord(host[0]) & 0xfe) == 0xfc: - logger.debug ('Ignoring unique local address: ' + hostStandardFormat) + if not private: + logger.debug ('Ignoring unique local address: ' + hostStandardFormat) return hostStandardFormat if private else False return False if private else hostStandardFormat diff --git a/src/queues.py b/src/queues.py index a11bedeb..7c36d54a 100644 --- a/src/queues.py +++ b/src/queues.py @@ -6,6 +6,7 @@ UISignalQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue() # receiveDataThreads dump objects they hear on the network into this queue to be processed. objectProcessorQueue = ObjectProcessorQueue() +invQueue = Queue.Queue() portCheckerQueue = Queue.Queue() peerDiscoveryQueue = Queue.Queue() apiAddressGeneratorReturnQueue = Queue.Queue( diff --git a/src/state.py b/src/state.py index 72852f3e..ff8a143d 100644 --- a/src/state.py +++ b/src/state.py @@ -23,6 +23,8 @@ sqlReady = False # set to true by sqlTread when ready for processing maximumNumberOfHalfOpenConnections = 0 +invThread = None + # If the trustedpeer option is specified in keys.dat then this will # contain a Peer which will be connected to instead of using the # addresses advertised by other peers. The client will only connect to diff --git a/src/storage/filesystem.py b/src/storage/filesystem.py index bb4d0e3f..4efe1132 100644 --- a/src/storage/filesystem.py +++ b/src/storage/filesystem.py @@ -111,8 +111,8 @@ class FilesystemInventory(InventoryStorage): print "error loading %s" % (hexlify(hashId)) pass self._inventory = newInventory - for i, v in self._inventory.items(): - print "loaded stream: %s, %i items" % (i, len(v)) +# for i, v in self._inventory.items(): +# print "loaded stream: %s, %i items" % (i, len(v)) def stream_list(self): return self._inventory.keys()