From d635e515b965141e710a7d93137aa1942b2dc204 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Wed, 24 May 2017 16:51:49 +0200 Subject: [PATCH] Big Asyncore update - most of the stuff is done so it partially works - disabled pollers other than select (debugging necessary) - can switch in the settings, section network, option asyncore (defaults to False) --- src/bitmessagemain.py | 30 ++- src/network/advanceddispatcher.py | 29 ++- src/network/asyncore_pollchoose.py | 95 ++++++- src/network/bmobject.py | 94 +++++++ src/{ => network}/bmproto.py | 391 ++++++++++++++++++++++------- src/network/bmqueues.py | 95 +++++++ src/network/connectionchooser.py | 11 + src/network/connectionpool.py | 149 +++++++++++ src/network/downloadqueue.py | 12 + src/network/networkthread.py | 40 +++ src/network/node.py | 67 +---- src/network/tls.py | 29 +-- src/network/uploadqueue.py | 70 ++++++ src/protocol.py | 6 + 14 files changed, 928 insertions(+), 190 deletions(-) create mode 100644 src/network/bmobject.py rename src/{ => network}/bmproto.py (51%) create mode 100644 src/network/bmqueues.py create mode 100644 src/network/connectionchooser.py create mode 100644 src/network/connectionpool.py create mode 100644 src/network/downloadqueue.py create mode 100644 src/network/networkthread.py create mode 100644 src/network/uploadqueue.py diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index c98f7592..e61675cf 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -50,6 +50,9 @@ from class_smtpDeliver import smtpDeliver from class_smtpServer import smtpServer from bmconfigparser import BMConfigParser +from network.connectionpool import BMConnectionPool +from network.networkthread import BMNetworkThread + # Helper Functions import helper_bootstrap import helper_generic @@ -80,10 +83,13 @@ def connectToStream(streamNumber): if streamNumber*2+1 not in knownnodes.knownNodes: knownnodes.knownNodes[streamNumber*2+1] = {} - for i in range(maximumNumberOfHalfOpenConnections): - a = outgoingSynSender() - a.setup(streamNumber, selfInitiatedConnections) - a.start() + if BMConfigParser().safeGetBoolean("network", "asyncore"): + BMConnectionPool().connectToStream(streamNumber) + else: + for i in range(maximumNumberOfHalfOpenConnections): + a = outgoingSynSender() + a.setup(streamNumber, selfInitiatedConnections) + a.start() def _fixWinsock(): if not ('win32' in sys.platform) and not ('win64' in sys.platform): @@ -242,13 +248,19 @@ class Main: singleAPIThread.daemon = True # close the main program even if there are threads left singleAPIThread.start() + if BMConfigParser().safeGetBoolean("network", "asyncore"): + asyncoreThread = BMNetworkThread() + asyncoreThread.daemon = False + asyncoreThread.start() + connectToStream(1) - singleListenerThread = singleListener() - singleListenerThread.setup(selfInitiatedConnections) - singleListenerThread.daemon = True # close the main program even if there are threads left - singleListenerThread.start() - + if not BMConfigParser().safeGetBoolean("network", "asyncore"): + singleListenerThread = singleListener() + singleListenerThread.setup(selfInitiatedConnections) + singleListenerThread.daemon = True # close the main program even if there are threads left + singleListenerThread.start() + if BMConfigParser().safeGetBoolean('bitmessagesettings','upnp'): import upnp upnpThread = upnp.uPnPThread() diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index dc7eedb0..f4ba120e 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -1,4 +1,7 @@ +import time + import asyncore_pollchoose as asyncore +from bmconfigparser import BMConfigParser class AdvancedDispatcher(asyncore.dispatcher): _buf_len = 2097152 # 2MB @@ -9,6 +12,7 @@ class AdvancedDispatcher(asyncore.dispatcher): self.read_buf = b"" self.write_buf = b"" self.state = "init" + self.lastTx = time.time() def append_write_buf(self, string = None): self.write_buf += string @@ -32,7 +36,7 @@ class AdvancedDispatcher(asyncore.dispatcher): return while True: try: - print "Trying to handle state \"%s\"" % (self.state) +# print "Trying to handle state \"%s\"" % (self.state) if getattr(self, "state_" + str(self.state))() is False: break except AttributeError: @@ -50,13 +54,30 @@ class AdvancedDispatcher(asyncore.dispatcher): return self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len def handle_read(self): - print "handle_read" - self.read_buf += self.recv(AdvancedDispatcher._buf_len) + self.lastTx = time.time() + if asyncore.maxDownloadRate > 0: + newData = self.recv(asyncore.downloadChunk) + asyncore.downloadBucket -= len(newData) + self.read_buf += newData + else: + self.read_buf += self.recv(AdvancedDispatcher._buf_len) self.process() def handle_write(self): - written = self.send(self.write_buf) + self.lastTx = time.time() + if asyncore.maxUploadRate > 0: + written = self.send(self.write_buf[0:asyncore.uploadChunk]) + asyncore.uploadBucket -= written + else: + written = self.send(self.write_buf) self.slice_write_buf(written) def handle_connect(self): + self.lastTx = time.time() self.process() + + def close(self): + self.read_buf = b"" + self.write_buf = b"" + self.state = "shutdown" + asyncore.dispatcher.close(self) diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index 4ccce7f9..b26d4cab 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -46,6 +46,8 @@ many of the difficult problems for you, making the task of building sophisticated high-performance network servers and clients a snap. """ +# randomise object order for bandwidth balancing +import random import select import socket import sys @@ -56,6 +58,11 @@ import os from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \ ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \ errorcode +try: + from errno import WSAEWOULDBLOCK +except: + pass +from ssl import SSLError, SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE _DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, EBADF)) @@ -81,6 +88,15 @@ class ExitNow(Exception): _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit) +maxDownloadRate = 0 +downloadChunk = 0 +downloadTimestamp = 0 +downloadBucket = 0 +maxUploadRate = 0 +uploadChunk = 0 +uploadTimestamp = 0 +uploadBucket = 0 + def read(obj): try: obj.handle_read_event() @@ -97,6 +113,44 @@ def write(obj): except: obj.handle_error() +def set_rates(download, upload): + global maxDownloadRate, maxUploadRate, downloadChunk, uploadChunk, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp + maxDownloadRate = float(download) + if maxDownloadRate > 0: + downloadChunk = 1400 + maxUploadRate = float(upload) + if maxUploadRate > 0: + uploadChunk = 1400 + downloadBucket = maxDownloadRate + uploadBucket = maxUploadRate + downloadTimestamp = time.time() + uploadTimestamp = time.time() + +def wait_tx_buckets(): + global downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp + if maxDownloadRate > 0 and maxUploadRate > 0: + wait_for_this_long = min(maxDownloadRate / downloadChunk, maxUploadRate / uploadChunk) + elif maxDownloadRate > 0: + wait_for_this_long = maxDownloadRate / downloadChunk + elif maxUploadRate > 0: + wait_for_this_long = maxUploadRate / uploadChunk + else: + return + wait_for_this_long /= 2 + if wait_for_this_long > 1: + wait_for_this_long = 1 + elif wait_for_this_long < 0.1: + wait_for_this_long = 0.1 + + while downloadBucket < downloadChunk and uploadBucket < uploadChunk: + time.sleep(wait_for_this_long) + downloadBucket += (time.time() - downloadTimestamp) * maxDownloadRate + downloadTimestamp = time.time() + uploadBucket += (time.time() - uploadTimestamp) * maxUploadRate + uploadTimestamp = time.time() + + + def _exception(obj): try: obj.handle_expt_event() @@ -150,13 +204,13 @@ def select_poller(timeout=0.0, map=None): except KeyboardInterrupt: return - for fd in r: + for fd in random.sample(r, len(r)): obj = map.get(fd) if obj is None: continue read(obj) - for fd in w: + for fd in random.sample(w, len(w)): obj = map.get(fd) if obj is None: continue @@ -204,7 +258,7 @@ def poll_poller(timeout=0.0, map=None): r = poll_poller.pollster.poll(timeout) except KeyboardInterrupt: r = [] - for fd, flags in r: + for fd, flags in random.sample(r, len(r)): obj = map.get(fd) if obj is None: continue @@ -252,7 +306,7 @@ def epoll_poller(timeout=0.0, map=None): if err.args[0] != EINTR: raise r = [] - for fd, flags in r: + for fd, flags in random.sample(r, len(r)): obj = map.get(fd) if obj is None: continue @@ -278,7 +332,7 @@ def kqueue_poller(timeout=0.0, map=None): selectables += 1 events = kqueue.control(None, selectables, timeout) - for event in events: + for event in random.sample(events, len(events)): fd = event.ident obj = map.get(fd) if obj is None: @@ -307,13 +361,18 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, elif hasattr(select, 'select'): poller = select_poller - print "Poll loop using %s" % (poller.__name__) + poller = select_poller + +# print "Poll loop using %s" % (poller.__name__) if count is None: while map: + wait_tx_buckets() poller(timeout, map) else: + timeout /= count while map and count > 0: + wait_tx_buckets() poller(timeout, map) count = count - 1 @@ -482,10 +541,17 @@ class dispatcher: try: result = self.socket.send(data) return result - except socket.error as why: - if why.args[0] == EWOULDBLOCK: + except SSLError as err: + if err.errno == SSL_ERROR_WANT_WRITE: return 0 - elif why.args[0] in _DISCONNECTED: + else: + raise + except socket.error as why: + if why.errno in (errno.EAGAIN, errno.EWOULDBLOCK) or \ + (sys.platform.startswith('win') and \ + err.errno == errno.WSAEWOULDBLOCK): + return 0 + elif why.errno in _DISCONNECTED: self.handle_close() return 0 else: @@ -501,9 +567,18 @@ class dispatcher: return b'' else: return data + except SSLError as err: + if err.errno == SSL_ERROR_WANT_READ: + return b'' + else: + raise except socket.error as why: # winsock sometimes raises ENOTCONN - if why.args[0] in _DISCONNECTED: + if why.errno in (errno.EAGAIN, errno.EWOULDBLOCK) or \ + (sys.platform.startswith('win') and \ + err.errno == errno.WSAEWOULDBLOCK): + return b'' + if why.errno in _DISCONNECTED: self.handle_close() return b'' else: diff --git a/src/network/bmobject.py b/src/network/bmobject.py new file mode 100644 index 00000000..2c3fb59c --- /dev/null +++ b/src/network/bmobject.py @@ -0,0 +1,94 @@ +from binascii import hexlify +import time + +from addresses import calculateInventoryHash +from debug import logger +import protocol +import state + +class BMObjectInsufficientPOWError(Exception): pass + + +class BMObjectInvalidDataError(Exception): pass + + +class BMObjectExpiredError(Exception): pass + + +class BMObjectUnwantedStreamError(Exception): pass + + +class BMObjectInvalidError(Exception): pass + + +class BMObjectAlreadyHaveError(Exception): + pass + + +class BMObject(object): + # max TTL, 28 days and 3 hours + maxTTL = 28 * 24 * 60 * 60 + 10800 + # min TTL, 3 hour (in the past + minTTL = -3600 + + def __init__(self, nonce, expiresTime, objectType, version, streamNumber, data): + self.nonce = nonce + self.expiresTime = expiresTime + self.objectType = objectType + self.version = version + self.streamNumber = streamNumber + self.inventoryHash = calculateInventoryHash(data) + self.data = data + self.tag = '' + + def checkProofOfWorkSufficient(self): + # Let us check to make sure that the proof of work is sufficient. + if not protocol.isProofOfWorkSufficient(self.data): + logger.info('Proof of work is insufficient.') + raise BMObjectInsufficientPOWError() + + def checkEOLSanity(self): + # EOL sanity check + if self.expiresTime - int(time.time()) > BMObject.maxTTL: + logger.info('This object\'s End of Life time is too far in the future. Ignoring it. Time is %s' % self.expiresTime) + # TODO: remove from download queue + raise BMObjectExpiredError() + + if self.expiresTime - int(time.time()) < BMObject.minTTL: + logger.info('This object\'s End of Life time was too long ago. Ignoring the object. Time is %s' % self.expiresTime) + # TODO: remove from download queue + raise BMObjectExpiredError() + + def checkStream(self): + if self.streamNumber not in state.streamsInWhichIAmParticipating: + logger.debug('The streamNumber %s isn\'t one we are interested in.' % self.streamNumber) + raise BMObjectUnwantedStreamError() + + def checkMessage(self): + return + + def checkGetpubkey(self): + if len(self.data) < 42: + logger.info('getpubkey message doesn\'t contain enough data. Ignoring.') + raise BMObjectInvalidError() + + def checkPubkey(self, tag): + if len(self.data) < 146 or len(self.data) > 440: # sanity check + logger.info('pubkey object too short or too long. Ignoring.') + raise BMObjectInvalidError() + if self.version >= 4: + self.tag = tag + logger.debug('tag in received pubkey is: %s' % hexlify(tag)) + + def checkBroadcast(self, tag): + if len(self.data) < 180: + logger.debug('The payload length of this broadcast packet is unreasonably low. Someone is probably trying funny business. Ignoring message.') + raise BMObjectInvalidError() + + # this isn't supported anymore + if self.version < 2: + raise BMObjectInvalidError() + + if self.version >= 3: + self.tag = tag + logger.debug('tag in received broadcast is: %s' % hexlify(tag)) diff --git a/src/bmproto.py b/src/network/bmproto.py similarity index 51% rename from src/bmproto.py rename to src/network/bmproto.py index c9160c3b..d9ed2a95 100644 --- a/src/bmproto.py +++ b/src/network/bmproto.py @@ -1,28 +1,52 @@ +import base64 +from binascii import hexlify import hashlib +import math import time from pprint import pprint import socket -from struct import unpack +import struct +import random +import traceback +from addresses import calculateInventoryHash +from debug import logger +from inventory import Inventory +import knownnodes from network.advanceddispatcher import AdvancedDispatcher +from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError +import network.connectionpool +from network.downloadqueue import DownloadQueue from network.node import Node import network.asyncore_pollchoose as asyncore from network.proxy import Proxy, ProxyError, GeneralProxyError +from network.bmqueues import BMQueues from network.socks5 import Socks5Connection, Socks5Resolver, Socks5AuthError, Socks5Error from network.socks4a import Socks4aConnection, Socks4aResolver, Socks4aError +from network.uploadqueue import UploadQueue, UploadElem, AddrUploadQueue, ObjUploadQueue from network.tls import TLSDispatcher import addresses from bmconfigparser import BMConfigParser +from queues import objectProcessorQueue import shared +import state import protocol class BMProtoError(ProxyError): pass -class BMConnection(TLSDispatcher): +class BMProtoInsufficientDataError(BMProtoError): pass + + +class BMProtoExcessiveDataError(BMProtoError): pass + + +class BMConnection(TLSDispatcher, BMQueues): # ~1.6 MB which is the maximum possible size of an inv message. maxMessageSize = 1600100 + # 2**18 = 256kB is the maximum size of an object payload + maxObjectPayloadSize = 2**18 # protocol specification says max 1000 addresses in one addr command maxAddrCount = 1000 # protocol specification says max 50000 objects in one inv command @@ -32,46 +56,74 @@ class BMConnection(TLSDispatcher): AdvancedDispatcher.__init__(self, sock) self.verackReceived = False self.verackSent = False + self.lastTx = time.time() + self.connectionFullyEstablished = False + self.connectedAt = 0 + self.skipUntil = 0 if address is None and sock is not None: - self.destination = self.addr() + self.destination = state.Peer(sock.getpeername()[0], sock.getpeername()[1]) self.isOutbound = False TLSDispatcher.__init__(self, sock, server_side=True) - print "received connection in background from %s:%i" % (self.destination[0], self.destination[1]) + self.connectedAt = time.time() + print "received connection in background from %s:%i" % (self.destination.host, self.destination.port) else: self.destination = address self.isOutbound = True - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect(self.destination) + if ":" in address.host: + self.create_socket(socket.AF_INET6, socket.SOCK_STREAM) + else: + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) TLSDispatcher.__init__(self, sock, server_side=False) - print "connecting in background to %s:%i" % (self.destination[0], self.destination[1]) + self.connect(self.destination) + print "connecting in background to %s:%i" % (self.destination.host, self.destination.port) + shared.connectedHostsList[self.destination] = 0 + BMQueues.__init__(self) def bm_proto_reset(self): self.magic = None self.command = None - self.payloadLength = None + self.payloadLength = 0 self.checksum = None self.payload = None self.invalid = False self.payloadOffset = 0 + self.object = None def state_init(self): self.bm_proto_reset() - self.append_write_buf(protocol.assembleVersionMessage(self.destination[0], self.destination[1], (1,), False)) - if True: - print "Sending version (%ib)" % len(self.write_buf) - self.set_state("bm_header") - return False + if self.isOutbound: + self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False)) + print "%s:%i: Sending version (%ib)" % (self.destination.host, self.destination.port, len(self.write_buf)) + self.set_state("bm_header") + return True - def state_bm_ready(self): - print "doing bm ready" + def antiIntersectionDelay(self, initial = False): + # estimated time for a small object to propagate across the whole network + delay = math.ceil(math.log(max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) + 2, 20)) * (0.2 + UploadQueue.queueCount/2) + # take the stream with maximum amount of nodes + # +2 is to avoid problems with log(0) and log(1) + # 20 is avg connected nodes count + # 0.2 is avg message transmission time + if delay > 0: + if initial: + self.skipUntil = self.connectedAt + delay + if self.skipUntil > time.time(): + logger.debug("Skipping processing for %.2fs", self.skipUntil - time.time()) + else: + logger.debug("Skipping processing due to missing object for %.2fs", self.skipUntil - time.time()) + self.skipUntil = time.time() + now + + def set_connection_fully_established(self): + self.antiIntersectionDelay(True) + self.connectionFullyEstablished = True self.sendAddr() self.sendBigInv() - self.set_state("bm_header") - return False def state_bm_header(self): + #print "%s:%i: header" % (self.destination.host, self.destination.port) if len(self.read_buf) < protocol.Header.size: - print "Length below header size" + #print "Length below header size" return False self.magic, self.command, self.payloadLength, self.checksum = protocol.Header.unpack(self.read_buf[:protocol.Header.size]) self.command = self.command.rstrip('\x00') @@ -87,20 +139,41 @@ class BMConnection(TLSDispatcher): def state_bm_command(self): if len(self.read_buf) < self.payloadLength: - print "Length below announced object length" + #print "Length below announced object length" return False - print "received %s (%ib)" % (self.command, self.payloadLength) + print "%s:%i: command %s (%ib)" % (self.destination.host, self.destination.port, self.command, self.payloadLength) self.payload = self.read_buf[:self.payloadLength] if self.checksum != hashlib.sha512(self.payload).digest()[0:4]: print "Bad checksum, ignoring" self.invalid = True retval = True + if not self.connectionFullyEstablished and self.command not in ("version", "verack"): + logger.error("Received command %s before connection was fully established, ignoring", self.command) + self.invalid = True if not self.invalid: try: retval = getattr(self, "bm_command_" + str(self.command).lower())() except AttributeError: # unimplemented command print "unimplemented command %s" % (self.command) + except BMProtoInsufficientDataError: + print "packet length too short, skipping" + except BMProtoExcessiveDataError: + print "too much data, skipping" + except BMObjectInsufficientPOWError: + print "insufficient PoW, skipping" + except BMObjectInvalidDataError: + print "object invalid data, skipping" + except BMObjectExpiredError: + print "object expired, skipping" + except BMObjectUnwantedStreamError: + print "object not in wanted stream, skipping" + except BMObjectInvalidError: + print "object invalid, skipping" + except BMObjectAlreadyHaveError: + print "already got object, skipping" + except struct.error: + print "decoding error, skipping" else: print "Skipping command %s due to invalid data" % (self.command) if retval: @@ -120,11 +193,24 @@ class BMConnection(TLSDispatcher): return value def decode_payload_node(self): - services, address, port = self.decode_payload_content("Q16sH") - return Node(services, address, port) + services, host, port = self.decode_payload_content("Q16sH") + if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': + host = socket.inet_ntop(socket.AF_INET, host[12:]) + elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43': + # Onion, based on BMD/bitcoind + host = base64.b32encode(host[6:]).lower() + ".onion" + else: + host = socket.inet_ntop(socket.AF_INET6, host) + if host == "": + # This can happen on Windows systems which are not 64-bit compatible + # so let us drop the IPv6 address. + host = socket.inet_ntop(socket.AF_INET, host[12:]) + + return Node(services, host, port) def decode_payload_content(self, pattern = "v"): - # l = varint indicating the length of the next item + # l = varint indicating the length of the next array + # L = varint indicating the length of the next item # v = varint (or array) # H = uint16 # I = uint32 @@ -135,86 +221,171 @@ class BMConnection(TLSDispatcher): # , = end of array retval = [] - size = 0 + size = None insideDigit = False + i = 0 - for i in range(len(pattern)): - if pattern[i] in "0123456789": + while i < len(pattern): + if pattern[i] in "0123456789" and (i == 0 or pattern[i-1] not in "lL"): + if size is None: + size = 0 size = size * 10 + int(pattern[i]) + i += 1 continue - elif pattern[i] == "l": + elif pattern[i] == "l" and size is None: size = self.decode_payload_varint() + i += 1 continue - if size > 0: - innerval = [] + elif pattern[i] == "L" and size is None: + size = self.decode_payload_varint() + i += 1 + continue + if size is not None: if pattern[i] == "s": retval.append(self.payload[self.payloadOffset:self.payloadOffset + size]) self.payloadOffset += size + i += 1 else: + if "," in pattern[i:]: + subpattern = pattern[i:pattern.index(",")] + else: + subpattern = pattern[i:] + for j in range(size): - if "," in pattern[i:]: - retval.append(self.decode_payload_content(pattern[i:pattern.index(",")])) + if pattern[i-1:i] == "L": + retval.extend(self.decode_payload_content(subpattern)) else: - retval.append(self.decode_payload_content(pattern[i:])) - size = 0 + retval.append(self.decode_payload_content(subpattern)) + i += len(subpattern) + size = None else: if pattern[i] == "v": retval.append(self.decode_payload_varint()) if pattern[i] == "i": retval.append(self.decode_payload_node()) if pattern[i] == "H": - retval.append(unpack(">H", self.payload[self.payloadOffset:self.payloadOffset+2])[0]) + retval.append(struct.unpack(">H", self.payload[self.payloadOffset:self.payloadOffset+2])[0]) self.payloadOffset += 2 if pattern[i] == "I": - retval.append(unpack(">I", self.payload[self.payloadOffset:self.payloadOffset+4])[0]) + retval.append(struct.unpack(">I", self.payload[self.payloadOffset:self.payloadOffset+4])[0]) self.payloadOffset += 4 if pattern[i] == "Q": - retval.append(unpack(">Q", self.payload[self.payloadOffset:self.payloadOffset+8])[0]) + retval.append(struct.unpack(">Q", self.payload[self.payloadOffset:self.payloadOffset+8])[0]) self.payloadOffset += 8 + i += 1 + if self.payloadOffset > self.payloadLength: + print "Insufficient data %i/%i" % (self.payloadOffset, self.payloadLength) + raise BMProtoInsufficientDataError() return retval def bm_command_error(self): fatalStatus, banTime, inventoryVector, errorText = self.decode_payload_content("vvlsls") + print "%s:%i error: %i, %s" % (self.destination.host, self.destination.port, fatalStatus, errorText) + return True def bm_command_getdata(self): - items = self.decode_payload_content("l32s") - #self.antiIntersectionDelay(True) # only handle getdata requests if we have been connected long enough + items = self.decode_payload_content("L32s") +# if time.time() < self.skipUntil: +# print "skipping getdata" +# return True for i in items: - logger.debug('received getdata request for item:' + hexlify(i)) - if self.objectHashHolderInstance.hasHash(i): + print "received getdata request for item %s" % (hexlify(i)) + #logger.debug('received getdata request for item:' + hexlify(i)) + #if i in ObjUploadQueue.streamElems(1): + if False: self.antiIntersectionDelay() else: if i in Inventory(): self.append_write_buf(protocol.CreatePacket('object', Inventory()[i].payload)) else: - #self.antiIntersectionDelay() + self.antiIntersectionDelay() logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,)) + return True + + def bm_command_inv(self): + items = self.decode_payload_content("L32s") + + if len(items) >= BMConnection.maxObjectCount: + logger.error("Too many items in inv message!") + raise BMProtoExcessiveDataError() + else: + print "items in inv: %i" % (len(items)) + + startTime = time.time() + #advertisedSet = set() + for i in items: + #advertisedSet.add(i) + self.handleReceivedObj(i) + #objectsNewToMe = advertisedSet + #for stream in self.streams: + #objectsNewToMe -= Inventory().hashes_by_stream(stream) + logger.info('inv message lists %i objects. Of those %i are new to me. It took %f seconds to figure that out.', len(items), len(self.objectsNewToMe), time.time()-startTime) + + payload = addresses.encodeVarint(len(self.objectsNewToMe)) + ''.join(self.objectsNewToMe.keys()) + self.append_write_buf(protocol.CreatePacket('getdata', payload)) + +# for i in random.sample(self.objectsNewToMe, len(self.objectsNewToMe)): +# DownloadQueue().put(i) + return True def bm_command_object(self): - lengthOfTimeWeShouldUseToProcessThisMessage = shared.checkAndShareObjectWithPeers(self.payload) - self.downloadQueue.task_done(calculateInventoryHash(self.payload)) + objectOffset = self.payloadOffset + nonce, expiresTime, objectType, version, streamNumber = self.decode_payload_content("QQIvv") + self.object = BMObject(nonce, expiresTime, objectType, version, streamNumber, self.payload) + + if len(self.payload) - self.payloadOffset > BMConnection.maxObjectPayloadSize: + logger.info('The payload length of this object is too large (%s bytes). Ignoring it.' % len(self.payload) - self.payloadOffset) + raise BMProtoExcessiveDataError() + + self.object.checkProofOfWorkSufficient() + self.object.checkEOLSanity() + self.object.checkStream() + + try: + if self.object.objectType == protocol.OBJECT_GETPUBKEY: + self.object.checkGetpubkey() + elif self.object.objectType == protocol.OBJECT_PUBKEY: + self.object.checkPubkey(self.payload[self.payloadOffset:self.payloadOffset+32]) + elif self.object.objectType == protocol.OBJECT_MSG: + self.object.checkMessage() + elif self.object.objectType == protocol.OBJECT_BROADCAST: + self.object.checkBroadcast(self.payload[self.payloadOffset:self.payloadOffset+32]) + # other objects don't require other types of tests + except BMObjectAlreadyHaveError: + pass + else: + Inventory()[self.object.inventoryHash] = ( + self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag) + objectProcessorQueue.put((self.object.objectType,self.object.data)) + #DownloadQueue().task_done(self.object.inventoryHash) + network.connectionpool.BMConnectionPool().handleReceivedObject(self, self.object.streamNumber, self.object.inventoryHash) + #ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash)) + #broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) + return True def bm_command_addr(self): - addresses = self.decode_payload_content("lQbQ16sH") + addresses = self.decode_payload_content("lQIQ16sH") + return True def bm_command_ping(self): self.append_write_buf(protocol.CreatePacket('pong')) + return True def bm_command_pong(self): # nothing really - pass + return True def bm_command_verack(self): self.verackReceived = True if self.verackSent: if self.isSSL: self.set_state("tls_init", self.payloadLength) + self.bm_proto_reset() + return False else: - self.set_state("bm_ready", self.payloadLength) - else: - self.set_state("bm_header", self.payloadLength) - self.bm_proto_reset() - return False + self.set_connection_fully_established() + return True + return True def bm_command_version(self): #self.remoteProtocolVersion, self.services, self.timestamp, padding1, self.myExternalIP, padding2, self.remoteNodeIncomingPort = protocol.VersionPacket.unpack(self.payload[:protocol.VersionPacket.size]) @@ -223,24 +394,30 @@ class BMConnection(TLSDispatcher): print "remoteProtocolVersion: %i" % (self.remoteProtocolVersion) print "services: %08X" % (self.services) print "time offset: %i" % (self.timestamp - int(time.time())) - print "my external IP: %s" % (self.sockNode.address) + print "my external IP: %s" % (self.sockNode.host) print "remote node incoming port: %i" % (self.peerNode.port) print "user agent: %s" % (self.userAgent) if not self.peerValidityChecks(): # TODO ABORT return True + shared.connectedHostsList[self.destination] = self.streams[0] self.append_write_buf(protocol.CreatePacket('verack')) self.verackSent = True + if not self.isOutbound: + self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, True)) + print "%s:%i: Sending version (%ib)" % (self.destination.host, self.destination.port, len(self.write_buf)) if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and protocol.haveSSL(not self.isOutbound)): self.isSSL = True if self.verackReceived: if self.isSSL: self.set_state("tls_init", self.payloadLength) + self.bm_proto_reset() + return False else: - self.set_state("bm_ready", self.payloadLength) - self.bm_proto_reset() - return False + self.set_connection_fully_established() + return True + return True def peerValidityChecks(self): if self.remoteProtocolVersion < 3: @@ -271,14 +448,24 @@ class BMConnection(TLSDispatcher): logger.debug ('Closed connection to %s because there is no overlapping interest in streams.', str(self.peer)) return False + if self.destination in network.connectionpool.BMConnectionPool().inboundConnections: + try: + if not protocol.checkSocksIP(self.destination.host): + self.append_write_buf(protocol.assembleErrorMessage(fatal=2, + errorText="Too many connections from your IP. Closing connection.")) + logger.debug ('Closed connection to %s because we are already connected to that IP.', + str(self.peer)) + return False + except: + pass return True def sendAddr(self): def sendChunk(): - if numberOfAddressesInAddrMessage == 0: + if addressCount == 0: return self.append_write_buf(protocol.CreatePacket('addr', \ - addresses.encodeVarint(numberOfAddressesInAddrMessage) + payload)) + addresses.encodeVarint(addressCount) + payload)) # We are going to share a maximum number of 1000 addrs (per overlapping # stream) with our peer. 500 from overlapping streams, 250 from the @@ -287,7 +474,7 @@ class BMConnection(TLSDispatcher): # init addressCount = 0 - payload = '' + payload = b'' for stream in self.streams: addrsInMyStream = {} @@ -320,42 +507,42 @@ class BMConnection(TLSDispatcher): addrsInChildStreamRight = random.sample(filtered.items(), elemCount) for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInMyStream: addressCount += 1 - payload += pack( + payload += struct.pack( '>Q', timeLastReceivedMessageFromThisNode) # 64-bit time - payload += pack('>I', stream) - payload += pack( + payload += struct.pack('>I', stream) + payload += struct.pack( '>q', 1) # service bit flags offered by this node payload += protocol.encodeHost(HOST) - payload += pack('>H', PORT) # remote port + payload += struct.pack('>H', PORT) # remote port if addressCount >= BMConnection.maxAddrCount: sendChunk() - payload = '' + payload = b'' addressCount = 0 for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamLeft: addressCount += 1 - payload += pack( + payload += struct.pack( '>Q', timeLastReceivedMessageFromThisNode) # 64-bit time - payload += pack('>I', stream * 2) - payload += pack( + payload += struct.pack('>I', stream * 2) + payload += struct.pack( '>q', 1) # service bit flags offered by this node payload += protocol.encodeHost(HOST) - payload += pack('>H', PORT) # remote port + payload += struct.pack('>H', PORT) # remote port if addressCount >= BMConnection.maxAddrCount: sendChunk() - payload = '' + payload = b'' addressCount = 0 for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamRight: addressCount += 1 - payload += pack( + payload += struct.pack( '>Q', timeLastReceivedMessageFromThisNode) # 64-bit time - payload += pack('>I', (stream * 2) + 1) - payload += pack( + payload += struct.pack('>I', (stream * 2) + 1) + payload += struct.pack( '>q', 1) # service bit flags offered by this node payload += protocol.encodeHost(HOST) - payload += pack('>H', PORT) # remote port + payload += struct.pack('>H', PORT) # remote port if addressCount >= BMConnection.maxAddrCount: sendChunk() - payload = '' + payload = b'' addressCount = 0 # flush @@ -365,19 +552,21 @@ class BMConnection(TLSDispatcher): def sendChunk(): if objectCount == 0: return - payload = encodeVarint(objectCount) + payload - logger.debug('Sending huge inv message with %i objects to just this one peer', - str(numberOfObjects)) - self.append_write_buf(protocol.CreatePacket('inv', payload)) + logger.debug('Sending huge inv message with %i objects to just this one peer', objectCount) + self.append_write_buf(protocol.CreatePacket('inv', addresses.encodeVarint(objectCount) + payload)) # Select all hashes for objects in this stream. bigInvList = {} for stream in self.streams: for hash in Inventory().unexpired_hashes_by_stream(stream): - if not self.objectHashHolderInstance.hasHash(hash): - bigInvList[hash] = 0 + bigInvList[hash] = 0 +# for hash in ObjUploadQueue().streamHashes(stream): +# try: +# del bigInvList[hash] +# except KeyError: +# pass objectCount = 0 - payload = '' + payload = b'' # Now let us start appending all of these hashes together. They will be # sent out in a big inv message to our new peer. for hash, storedValue in bigInvList.items(): @@ -385,12 +574,43 @@ class BMConnection(TLSDispatcher): objectCount += 1 if objectCount >= BMConnection.maxObjectCount: self.sendChunk() - payload = '' + payload = b'' objectCount = 0 # flush sendChunk() + def handle_connect_event(self): + try: + asyncore.dispatcher.handle_connect_event(self) + self.connectedAt = time.time() + except socket.error as e: + print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) + self.close() + + def handle_read_event(self): + try: + asyncore.dispatcher.handle_read_event(self) + except socket.error as e: + print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) + self.close() + + def handle_write_event(self): + try: + asyncore.dispatcher.handle_write_event(self) + except socket.error as e: + print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) + self.close() + + def close(self, reason=None): + if reason is None: + print "%s:%i: closing" % (self.destination.host, self.destination.port) + #traceback.print_stack() + else: + print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason) + network.connectionpool.BMConnectionPool().removeConnection(self) + asyncore.dispatcher.close(self) + class Socks5BMConnection(Socks5Connection, BMConnection): def __init__(self, address): @@ -411,24 +631,19 @@ class Socks4aBMConnection(Socks4aConnection, BMConnection): class BMServer(AdvancedDispatcher): - port = 8444 - - def __init__(self, port=None): + def __init__(self, host='127.0.0.1', port=8444): if not hasattr(self, '_map'): AdvancedDispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() - if port is None: - port = BMServer.port - self.bind(('127.0.0.1', port)) - self.connections = 0 + self.bind((host, port)) self.listen(5) def handle_accept(self): pair = self.accept() if pair is not None: sock, addr = pair - BMConnection(sock=sock) + network.connectionpool.BMConnectionPool().addConnection(BMConnection(sock=sock)) if __name__ == "__main__": diff --git a/src/network/bmqueues.py b/src/network/bmqueues.py new file mode 100644 index 00000000..96ad52e4 --- /dev/null +++ b/src/network/bmqueues.py @@ -0,0 +1,95 @@ +import time + +from inventory import Inventory +from network.downloadqueue import DownloadQueue +from network.uploadqueue import UploadQueue + +haveBloom = False + +try: + # pybloomfiltermmap + from pybloomfilter import BloomFilter + haveBloom = True +except ImportError: + try: + # pybloom + from pybloom import BloomFilter + haveBloom = True + except ImportError: + pass + +# it isn't actually implemented yet so no point in turning it on +haveBloom = False + +class BMQueues(object): + invCleanPeriod = 300 + invInitialCapacity = 50000 + invErrorRate = 0.03 + + def __init__(self): + self.objectsNewToMe = {} + self.objectsNewToThem = {} + self.initInvBloom() + self.initAddrBloom() + + def initInvBloom(self): + if haveBloom: + # lock? + self.invBloom = BloomFilter(capacity=BMQueues.invInitialCapacity, + error_rate=BMQueues.invErrorRate) + + def initAddrBloom(self): + if haveBloom: + # lock? + self.addrBloom = BloomFilter(capacity=BMQueues.invInitialCapacity, + error_rate=BMQueues.invErrorRate) + + def clean(self): + if self.lastcleaned < time.time() - BMQueues.invCleanPeriod: + if haveBloom: + if PendingDownloadQueue().size() == 0: + self.initInvBloom() + self.initAddrBloom() + else: + # release memory + self.objectsNewToMe = self.objectsNewToMe.copy() + self.objectsNewToThem = self.objectsNewToThem.copy() + + def hasObj(self, hashid): + if haveBloom: + return hashid in self.invBloom + else: + return hashid in self.objectsNewToMe + + def handleReceivedObj(self, hashid): + if haveBloom: + self.invBloom.add(hashid) + elif hashid in Inventory(): + try: + del self.objectsNewToThem[hashid] + except KeyError: + pass + else: + self.objectsNewToMe[hashid] = True + + def hasAddr(self, addr): + if haveBloom: + return addr in self.invBloom + + def addAddr(self, hashid): + if haveBloom: + self.addrBloom.add(hashid) + +# addr sending -> per node upload queue, and flush every minute or so +# inv sending -> if not in bloom, inv immediately, otherwise put into a per node upload queue and flush every minute or so + +# no bloom +# - if inv arrives +# - if we don't have it, add tracking and download queue +# - if we do have it, remove from tracking +# tracking downloads +# - per node hash of items the node has but we don't +# tracking inv +# - per node hash of items that neither the remote node nor we have +# + diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py new file mode 100644 index 00000000..a1b1d10b --- /dev/null +++ b/src/network/connectionchooser.py @@ -0,0 +1,11 @@ +import random + +from bmconfigparser import BMConfigParser +import knownnodes +import state + +def chooseConnection(stream): + if state.trustedPeer: + return state.trustedPeer + else: + return random.choice(knownnodes.knownNodes[stream].keys()) diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py new file mode 100644 index 00000000..e79f033c --- /dev/null +++ b/src/network/connectionpool.py @@ -0,0 +1,149 @@ +import errno +import socket +import time +import random + +from bmconfigparser import BMConfigParser +from debug import logger +import helper_bootstrap +import network.bmproto +from network.connectionchooser import chooseConnection +import network.asyncore_pollchoose as asyncore +import protocol +from singleton import Singleton +import shared +import state + +@Singleton +class BMConnectionPool(object): + def __init__(self): + asyncore.set_rates( + BMConfigParser().safeGetInt("bitmessagesettings", "maxdownloadrate"), + BMConfigParser().safeGetInt("bitmessagesettings", "maxuploadrate")) + self.outboundConnections = {} + self.inboundConnections = {} + self.listeningSockets = {} + self.streams = [] + + self.bootstrapped = False + + def handleReceivedObject(self, connection, streamNumber, hashid): + for i in self.inboundConnections.values() + self.outboundConnections.values(): + if not isinstance(i, network.bmproto.BMConnection): + continue + if i == connection: + try: + del i.objectsNewToThem[hashid] + except KeyError: + pass + else: + try: + del i.objectsNewToThem[hashid] + except KeyError: + i.objectsNewToThem[hashid] = True + try: + del i.objectsNewToMe[hashid] + except KeyError: + pass + + def connectToStream(self, streamNumber): + self.streams.append(streamNumber) + + def addConnection(self, connection): + if connection.isOutbound: + self.outboundConnections[connection.destination] = connection + else: + if connection.destination.host in self.inboundConnections: + self.inboundConnections[connection.destination] = connection + else: + self.inboundConnections[connection.destination.host] = connection + + def removeConnection(self, connection): + if connection.isOutbound: + try: + del self.outboundConnections[connection.destination] + except KeyError: + pass + else: + try: + del self.inboundConnections[connection.destination] + except KeyError: + try: + del self.inboundConnections[connection.destination.host] + except KeyError: + pass + + def startListening(self): + port = BMConfigParser().safeGetInt("bitmessagesettings", "port") + if BMConfigParser().safeGet("bitmessagesettings", "onionhostname").endswith(".onion"): + host = BMConfigParser().safeGet("bitmessagesettigns", "onionbindip") + else: + host = '127.0.0.1' + if BMConfigParser().safeGetBoolean("bitmessagesettings", "sockslisten") or \ + BMConfigParser().get("bitmessagesettings", "socksproxytype") == "none": + host = '' + self.listeningSockets[state.Peer(host, port)] = network.bmproto.BMServer(host=host, port=port) + + def loop(self): + # defaults to empty loop if outbound connections are maxed + spawnConnections = False + acceptConnections = True + if BMConfigParser().safeGetBoolean('bitmessagesettings', 'dontconnect'): + acceptConnections = False + else: + spawnConnections = True + if BMConfigParser().safeGetBoolean('bitmessagesettings', 'sendoutgoingconnections'): + spawnConnections = True + if BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and \ + (not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and \ + ".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname')): + acceptConnections = False + + spawnConnections = False + if spawnConnections: + if not self.bootstrapped: + print "bootstrapping dns" + helper_bootstrap.dns() + self.bootstrapped = True + for i in range(len(self.outboundConnections), BMConfigParser().safeGetInt("bitmessagesettings", "maxoutboundconnections")): + chosen = chooseConnection(random.choice(self.streams)) + if chosen in self.outboundConnections: + continue + if chosen.host in self.inboundConnections: + continue + + #for c in self.outboundConnections: + # if chosen == c.destination: + # continue + #for c in self.inboundConnections: + # if chosen.host == c.destination.host: + # continue + try: + if (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS5"): + self.addConnection(network.bmproto.Socks5BMConnection(chosen)) + elif (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS4a"): + self.addConnection(network.bmproto.Socks4aBMConnection(chosen)) + elif not chosen.host.endswith(".onion"): + self.addConnection(network.bmproto.BMConnection(chosen)) + except socket.error as e: + if e.errno == errno.ENETUNREACH: + continue + + if acceptConnections and len(self.listeningSockets) == 0: + self.startListening() + logger.info('Listening for incoming connections.') + if len(self.listeningSockets) > 0 and not acceptConnections: + for i in self.listeningSockets: + i.close() + logger.info('Stopped listening for incoming connections.') + +# while len(asyncore.socket_map) > 0 and state.shutdown == 0: +# print "loop, state = %s" % (proxy.state) + asyncore.loop(timeout=2.0, count=1) + + for i in self.inboundConnections.values() + self.outboundConnections.values(): + minTx = time.time() - 20 + if i.connectionFullyEstablished: + minTx -= 300 - 20 + if i.lastTx < minTx: + i.close("Timeout (%is)" % (time.time() - i.lastTx)) diff --git a/src/network/downloadqueue.py b/src/network/downloadqueue.py new file mode 100644 index 00000000..3789fb19 --- /dev/null +++ b/src/network/downloadqueue.py @@ -0,0 +1,12 @@ +#import collections +from threading import current_thread, enumerate as threadingEnumerate, RLock +import Queue +import time + +#from helper_sql import * +from singleton import Singleton + +@Singleton +class DownloadQueue(Queue.Queue): + # keep a track of objects that have been advertised to us but we haven't downloaded them yet + maxWait = 300 diff --git a/src/network/networkthread.py b/src/network/networkthread.py new file mode 100644 index 00000000..3a9d8e37 --- /dev/null +++ b/src/network/networkthread.py @@ -0,0 +1,40 @@ +import threading + +from bmconfigparser import BMConfigParser +from debug import logger +from helper_threading import StoppableThread +import network.asyncore_pollchoose as asyncore +from network.connectionpool import BMConnectionPool + +class BMNetworkThread(threading.Thread, StoppableThread): + def __init__(self): + threading.Thread.__init__(self, name="BMNetworkThread") + self.initStop() + self.name = "AsyncoreThread" + BMConnectionPool() + logger.error("init asyncore thread") + + def run(self): + while not self._stopped: + BMConnectionPool().loop() + + def stopThread(self): + super(BMNetworkThread, self).stopThread() + for i in BMConnectionPool().listeningSockets: + try: + i.close() + except: + pass + for i in BMConnectionPool().outboundConnections: + try: + i.close() + except: + pass + for i in BMConnectionPool().inboundConnections: + try: + i.close() + except: + pass + + # just in case + asyncore.close_all() diff --git a/src/network/node.py b/src/network/node.py index 054d07d8..ab9f5fbe 100644 --- a/src/network/node.py +++ b/src/network/node.py @@ -1,66 +1,3 @@ -import time - -from inventory import PendingDownloadQueue - -try: - # pybloomfiltermmap - from pybloomfilter import BloomFilter -except ImportError: - try: - # pybloom - from pybloom import BloomFilter - except ImportError: - # bundled pybloom - from fallback.pybloom import BloomFilter - - -class Node(object): - invCleanPeriod = 300 - invInitialCapacity = 50000 - invErrorRate = 0.03 - - def __init__(self): - self.initInvBloom() - self.initAddrBloom() - - def initInvBloom(self): - # lock? - self.invBloom = BloomFilter(capacity=Node.invInitialCapacity, - error_rate=Node.invErrorRate) - - def initAddrBloom(self): - # lock? - self.addrBloom = BloomFilter(capacity=Node.invInitialCapacity, - error_rate=Node.invErrorRate) - - def cleanBloom(self): - if self.lastcleaned < time.time() - Node.invCleanPeriod: - if PendingDownloadQueue().size() == 0: - self.initInvBloom() - self.initAddrBloom() - - def hasInv(self, hashid): - return hashid in self.invBloom - - def addInv(self, hashid): - self.invBloom.add(hashid) - - def hasAddr(self, hashid): - return hashid in self.invBloom - - def addInv(self, hashid): - self.invBloom.add(hashid) - -# addr sending -> per node upload queue, and flush every minute or so -# inv sending -> if not in bloom, inv immediately, otherwise put into a per node upload queue and flush every minute or so - -# no bloom -# - if inv arrives -# - if we don't have it, add tracking and download queue -# - if we do have it, remove from tracking -# tracking downloads -# - per node hash of items the node has but we don't -# tracking inv -# - per node hash of items that neither the remote node nor we have -# +import collections +Node = collections.namedtuple('Node', ['services', 'host', 'port']) diff --git a/src/network/tls.py b/src/network/tls.py index c7554891..669d9aa3 100644 --- a/src/network/tls.py +++ b/src/network/tls.py @@ -60,14 +60,14 @@ class TLSDispatcher(AdvancedDispatcher): def writable(self): if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: - print "tls writable, %r" % (self.want_write) + #print "tls writable, %r" % (self.want_write) return self.want_write else: return AdvancedDispatcher.writable(self) def readable(self): if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: - print "tls readable, %r" % (self.want_read) + #print "tls readable, %r" % (self.want_read) return self.want_read else: return AdvancedDispatcher.readable(self) @@ -75,19 +75,19 @@ class TLSDispatcher(AdvancedDispatcher): def handle_read(self): # wait for write buffer flush if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: - print "handshaking (read)" + #print "handshaking (read)" self.state_tls_handshake() else: - print "not handshaking (read)" + #print "not handshaking (read)" return AdvancedDispatcher.handle_read(self) def handle_write(self): # wait for write buffer flush if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: - print "handshaking (write)" + #print "handshaking (write)" self.state_tls_handshake() else: - print "not handshaking (write)" + #print "not handshaking (write)" return AdvancedDispatcher.handle_write(self) def state_tls_handshake(self): @@ -96,24 +96,25 @@ class TLSDispatcher(AdvancedDispatcher): return False # Perform the handshake. try: - print "handshaking (internal)" + #print "handshaking (internal)" self.sslSocket.do_handshake() except ssl.SSLError, err: - print "handshake fail" + #print "%s:%i: handshake fail" % (self.destination.host, self.destination.port) self.want_read = self.want_write = False if err.args[0] == ssl.SSL_ERROR_WANT_READ: - print "want read" + #print "want read" self.want_read = True - elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: - print "want write" + if err.args[0] == ssl.SSL_ERROR_WANT_WRITE: + #print "want write" self.want_write = True - else: + if not (self.want_write or self.want_read): raise else: - print "handshake success" + print "%s:%i: handshake success" % (self.destination.host, self.destination.port) # The handshake has completed, so remove this channel and... self.del_channel() self.set_socket(self.sslSocket) self.tlsDone = True - self.state_bm_ready() + self.set_state("bm_header") + self.set_connection_fully_established() return False diff --git a/src/network/uploadqueue.py b/src/network/uploadqueue.py new file mode 100644 index 00000000..5d699e96 --- /dev/null +++ b/src/network/uploadqueue.py @@ -0,0 +1,70 @@ +from collections import namedtuple +import Queue +import random +from threading import current_thread, enumerate as threadingEnumerate, RLock +import time + +#from helper_sql import * +from singleton import Singleton + +UploadElem = namedtuple("UploadElem", "stream identifier") + +class UploadQueueDeadlineException(Exception): + pass + + +class UploadQueue(object): + queueCount = 10 + + def __init__(self): + self.queue = [] + self.lastGet = 0 + self.getIterator = 0 + for i in range(UploadQueue.queueCount): + self.queue.append([]) + + def put(self, item): + self.queue[random.randrange(0, UploadQueue.queueCount)].append(item) + + def get(self): + i = UploadQueue.queueCount + retval = [] + while self.lastGet < time.time() - 1 and i > 0: + if len(self.queue) > 0: + retval.extend(self.queue[self.getIterator]) + self.queue[self.getIterator] = [] + self.lastGet += 1 + # only process each queue once + i -= 1 + self.getIterator = (self.getIterator + 1) % UploadQueue.queueCount + if self.lastGet < time.time() - 1: + self.lastGet = time.time() + return retval + + def streamElems(self, stream): + retval = {} + for q in self.queue: + for elem in q: + if elem.stream == stream: + retval[elem.identifier] = True + return retval + + def len(self): + retval = 0 + for i in range(UploadQueue.queueCount): + retval += len(self.queue[i]) + return retval + + def stop(self): + for i in range(UploadQueue.queueCount): + self.queue[i] = [] + + +@Singleton +class AddrUploadQueue(UploadQueue): + pass + + +@Singleton +class ObjUploadQueue(UploadQueue): + pass diff --git a/src/protocol.py b/src/protocol.py index 9397cd8b..b7847e8f 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -32,6 +32,12 @@ STATUS_WARNING = 0 STATUS_ERROR = 1 STATUS_FATAL = 2 +#Object types +OBJECT_GETPUBKEY = 0 +OBJECT_PUBKEY = 1 +OBJECT_MSG = 2 +OBJECT_BROADCAST = 3 + eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack( '>Q', random.randrange(1, 18446744073709551615))