From f8b4b427fca4c8a3c96b587e577bef7e40126685 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sat, 27 May 2017 19:09:21 +0200 Subject: [PATCH] Asyncore update - bugfixes - UDP socket for local peer discovery - new function assembleAddr to unify creating address command - open port checker functionality (inactive) - sendBigInv is done in a thread separate from the network IO thread --- src/bitmessagemain.py | 13 +- src/bmconfigparser.py | 6 +- src/network/advanceddispatcher.py | 16 +- src/network/announcethread.py | 39 ++ src/network/bmproto.py | 375 +++++-------------- src/network/connectionchooser.py | 7 +- src/network/connectionpool.py | 63 +++- src/network/{bmqueues.py => objectracker.py} | 25 +- src/network/receivequeuethread.py | 43 ++- src/network/tcp.py | 236 ++++++++++++ src/network/tls.py | 8 +- src/network/udp.py | 198 ++++++++++ src/protocol.py | 24 +- src/queues.py | 1 + 14 files changed, 721 insertions(+), 333 deletions(-) create mode 100644 src/network/announcethread.py rename src/network/{bmqueues.py => objectracker.py} (74%) create mode 100644 src/network/tcp.py create mode 100644 src/network/udp.py diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 8f39deb9..d566852c 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -51,9 +51,13 @@ from class_smtpDeliver import smtpDeliver from class_smtpServer import smtpServer from bmconfigparser import BMConfigParser +from inventory import Inventory + from network.connectionpool import BMConnectionPool from network.networkthread import BMNetworkThread from network.receivequeuethread import ReceiveQueueThread +from network.announcethread import AnnounceThread +#from network.downloadthread import DownloadThread # Helper Functions import helper_bootstrap @@ -221,6 +225,8 @@ class Main: sqlLookup.daemon = False # DON'T close the main program even if there are threads left. The closeEvent should command this thread to exit gracefully. sqlLookup.start() + Inventory() # init + # SMTP delivery thread if daemon and BMConfigParser().safeGet("bitmessagesettings", "smtpdeliver", '') != '': smtpDeliveryThread = smtpDeliver() @@ -261,11 +267,14 @@ class Main: if BMConfigParser().safeGetBoolean("network", "asyncore"): asyncoreThread = BMNetworkThread() - asyncoreThread.daemon = False + asyncoreThread.daemon = True asyncoreThread.start() receiveQueueThread = ReceiveQueueThread() - receiveQueueThread.daemon = False + receiveQueueThread.daemon = True receiveQueueThread.start() + announceThread = AnnounceThread() + announceThread.daemon = True + announceThread.start() connectToStream(1) diff --git a/src/bmconfigparser.py b/src/bmconfigparser.py index 4e66c703..8c4d8b3c 100644 --- a/src/bmconfigparser.py +++ b/src/bmconfigparser.py @@ -16,7 +16,11 @@ BMConfigDefaults = { "maxuploadrate": 0, }, "network": { - "asyncore": False + "asyncore": False, + "bind": None, + }, + "inventory": { + "storage": "sqlite", }, "zlib": { 'maxsize': 1048576 diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index fb28f3d4..938eb11d 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -36,7 +36,10 @@ class AdvancedDispatcher(asyncore.dispatcher): def process(self): if self.state not in ["init", "tls_handshake"] and len(self.read_buf) == 0: return - while True: + if not self.connected: + return + maxLoop = 20 + while maxLoop > 0: try: # print "Trying to handle state \"%s\"" % (self.state) if getattr(self, "state_" + str(self.state))() is False: @@ -44,6 +47,7 @@ class AdvancedDispatcher(asyncore.dispatcher): except AttributeError: # missing state raise + maxLoop -= 1 def set_state(self, state, length=0): self.slice_read_buf(length) @@ -96,4 +100,14 @@ class AdvancedDispatcher(asyncore.dispatcher): self.read_buf = b"" self.write_buf = b"" self.state = "shutdown" + while True: + try: + self.writeQueue.get(False) + except Queue.Empty: + break + while True: + try: + self.receiveQueue.get(False) + except Queue.Empty: + break asyncore.dispatcher.close(self) diff --git a/src/network/announcethread.py b/src/network/announcethread.py new file mode 100644 index 00000000..0ba93d7a --- /dev/null +++ b/src/network/announcethread.py @@ -0,0 +1,39 @@ +import Queue +import threading +import time + +from bmconfigparser import BMConfigParser +from debug import logger +from helper_threading import StoppableThread +from network.bmproto import BMProto +from network.connectionpool import BMConnectionPool +from network.udp import UDPSocket +import protocol +import state + +class AnnounceThread(threading.Thread, StoppableThread): + def __init__(self): + threading.Thread.__init__(self, name="AnnounceThread") + self.initStop() + self.name = "AnnounceThread" + BMConnectionPool() + logger.error("init announce thread") + + def run(self): + lastSelfAnnounced = 0 + while not self._stopped: + processed = 0 + if lastSelfAnnounced < time.time() - UDPSocket.announceInterval: + self.announceSelf() + lastSelfAnnounced = time.time() + if processed == 0: + self.stop.wait(10) + + def announceSelf(self): + for connection in BMConnectionPool().udpSockets.values(): + for stream in state.streamsInWhichIAmParticipating: + addr = (stream, state.Peer('127.0.0.1', BMConfigParser().safeGetInt("bitmessagesettings", "port")), time.time()) + connection.writeQueue.put(BMProto.assembleAddr([addr])) + + def stopThread(self): + super(AnnounceThread, self).stopThread() diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 88a8f794..6e1c3a18 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -19,12 +19,9 @@ import network.connectionpool from network.downloadqueue import DownloadQueue from network.node import Node import network.asyncore_pollchoose as asyncore +from network.objectracker import ObjectTracker from network.proxy import Proxy, ProxyError, GeneralProxyError -from network.bmqueues import BMQueues -from network.socks5 import Socks5Connection, Socks5Resolver, Socks5AuthError, Socks5Error -from network.socks4a import Socks4aConnection, Socks4aResolver, Socks4aError from network.uploadqueue import UploadQueue, UploadElem, AddrUploadQueue, ObjUploadQueue -from network.tls import TLSDispatcher import addresses from bmconfigparser import BMConfigParser @@ -42,7 +39,7 @@ class BMProtoInsufficientDataError(BMProtoError): pass class BMProtoExcessiveDataError(BMProtoError): pass -class BMConnection(TLSDispatcher, BMQueues): +class BMProto(AdvancedDispatcher, ObjectTracker): # ~1.6 MB which is the maximum possible size of an inv message. maxMessageSize = 1600100 # 2**18 = 256kB is the maximum size of an object payload @@ -51,36 +48,40 @@ class BMConnection(TLSDispatcher, BMQueues): maxAddrCount = 1000 # protocol specification says max 50000 objects in one inv command maxObjectCount = 50000 + # address is online if online less than this many seconds ago + addressAlive = 10800 + # maximum time offset + maxTimeOffset = 3600 - def __init__(self, address=None, sock=None): - AdvancedDispatcher.__init__(self, sock) - self.verackReceived = False - self.verackSent = False - self.lastTx = time.time() - self.streams = [0] - self.fullyEstablished = False - self.connectedAt = 0 - self.skipUntil = 0 - if address is None and sock is not None: - self.destination = state.Peer(sock.getpeername()[0], sock.getpeername()[1]) - self.isOutbound = False - TLSDispatcher.__init__(self, sock, server_side=True) - self.connectedAt = time.time() - print "received connection in background from %s:%i" % (self.destination.host, self.destination.port) - else: - self.destination = address - self.isOutbound = True - if ":" in address.host: - self.create_socket(socket.AF_INET6, socket.SOCK_STREAM) - else: - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - TLSDispatcher.__init__(self, sock, server_side=False) - self.connect(self.destination) - print "connecting in background to %s:%i" % (self.destination.host, self.destination.port) - shared.connectedHostsList[self.destination] = 0 - BMQueues.__init__(self) - UISignalQueue.put(('updateNetworkStatusTab', 'no data')) +# def __init__(self, address=None, sock=None): +# AdvancedDispatcher.__init__(self, sock) +# self.verackReceived = False +# self.verackSent = False +# self.lastTx = time.time() +# self.streams = [0] +# self.fullyEstablished = False +# self.connectedAt = 0 +# self.skipUntil = 0 +# if address is None and sock is not None: +# self.destination = state.Peer(sock.getpeername()[0], sock.getpeername()[1]) +# self.isOutbound = False +# TLSDispatcher.__init__(self, sock, server_side=True) +# self.connectedAt = time.time() +# #print "received connection in background from %s:%i" % (self.destination.host, self.destination.port) +# else: +# self.destination = address +# self.isOutbound = True +# if ":" in address.host: +# self.create_socket(socket.AF_INET6, socket.SOCK_STREAM) +# else: +# self.create_socket(socket.AF_INET, socket.SOCK_STREAM) +# self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +# TLSDispatcher.__init__(self, sock, server_side=False) +# self.connect(self.destination) +# #print "connecting in background to %s:%i" % (self.destination.host, self.destination.port) +# shared.connectedHostsList[self.destination] = 0 +# ObjectTracker.__init__(self) +# UISignalQueue.put(('updateNetworkStatusTab', 'no data')) def bm_proto_reset(self): self.magic = None @@ -92,37 +93,6 @@ class BMConnection(TLSDispatcher, BMQueues): self.payloadOffset = 0 self.object = None - def state_init(self): - self.bm_proto_reset() - if self.isOutbound: - self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False)) - print "%s:%i: Sending version" % (self.destination.host, self.destination.port) - self.set_state("bm_header") - return True - - def antiIntersectionDelay(self, initial = False): - # estimated time for a small object to propagate across the whole network - delay = math.ceil(math.log(max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) + 2, 20)) * (0.2 + UploadQueue.queueCount/2) - # take the stream with maximum amount of nodes - # +2 is to avoid problems with log(0) and log(1) - # 20 is avg connected nodes count - # 0.2 is avg message transmission time - if delay > 0: - if initial: - self.skipUntil = self.connectedAt + delay - if self.skipUntil > time.time(): - logger.debug("Skipping processing for %.2fs", self.skipUntil - time.time()) - else: - logger.debug("Skipping processing due to missing object for %.2fs", self.skipUntil - time.time()) - self.skipUntil = time.time() + delay - - def set_connection_fully_established(self): - UISignalQueue.put(('updateNetworkStatusTab', 'no data')) - self.antiIntersectionDelay(True) - self.fullyEstablished = True - self.sendAddr() - self.sendBigInv() - def state_bm_header(self): #print "%s:%i: header" % (self.destination.host, self.destination.port) if len(self.read_buf) < protocol.Header.size: @@ -137,7 +107,7 @@ class BMConnection(TLSDispatcher, BMQueues): print "Bad magic" self.close() return False - if self.payloadLength > BMConnection.maxMessageSize: + if self.payloadLength > BMProto.maxMessageSize: self.invalid = True self.set_state("bm_command", protocol.Header.size) return True @@ -309,28 +279,18 @@ class BMConnection(TLSDispatcher, BMQueues): def bm_command_inv(self): items = self.decode_payload_content("L32s") - if len(items) >= BMConnection.maxObjectCount: + if len(items) >= BMProto.maxObjectCount: logger.error("Too many items in inv message!") raise BMProtoExcessiveDataError() else: pass - #print "items in inv: %i" % (len(items)) - startTime = time.time() - #advertisedSet = set() for i in items: - #advertisedSet.add(i) - self.handleReceivedObj(i) - #objectsNewToMe = advertisedSet - #for stream in self.streams: - #objectsNewToMe -= Inventory().hashes_by_stream(stream) - logger.info('inv message lists %i objects. Of those %i are new to me. It took %f seconds to figure that out.', len(items), len(self.objectsNewToMe), time.time()-startTime) + self.receiveQueue.put(("inv", i)) + self.handleReceivedInventory(i) payload = addresses.encodeVarint(len(self.objectsNewToMe)) + ''.join(self.objectsNewToMe.keys()) self.writeQueue.put(protocol.CreatePacket('getdata', payload)) - -# for i in random.sample(self.objectsNewToMe, len(self.objectsNewToMe)): -# DownloadQueue().put(i) return True def bm_command_object(self): @@ -338,7 +298,7 @@ class BMConnection(TLSDispatcher, BMQueues): nonce, expiresTime, objectType, version, streamNumber = self.decode_payload_content("QQIvv") self.object = BMObject(nonce, expiresTime, objectType, version, streamNumber, self.payload) - if len(self.payload) - self.payloadOffset > BMConnection.maxObjectPayloadSize: + if len(self.payload) - self.payloadOffset > BMProto.maxObjectPayloadSize: logger.info('The payload length of this object is too large (%s bytes). Ignoring it.' % len(self.payload) - self.payloadOffset) raise BMProtoExcessiveDataError() @@ -368,20 +328,23 @@ class BMConnection(TLSDispatcher, BMQueues): #broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) return True + def _decode_addr(self): + return self.decode_payload_content("lQIQ16sH") + def bm_command_addr(self): - addresses = self.decode_payload_content("lQIQ16sH") - import pprint + addresses = self._decode_addr() for i in addresses: seenTime, stream, services, ip, port = i decodedIP = protocol.checkIPAddress(ip) if stream not in state.streamsInWhichIAmParticipating: continue #print "maybe adding %s in stream %i to knownnodes (%i)" % (decodedIP, stream, len(knownnodes.knownNodes[stream])) - if decodedIP is not False and seenTime > time.time() - 10800: + if decodedIP is not False and seenTime > time.time() - BMProto.addressAlive: peer = state.Peer(decodedIP, port) if peer in knownnodes.knownNodes[stream] and knownnodes.knownNodes[stream][peer] > seenTime: continue knownnodes.knownNodes[stream][peer] = seenTime + AddrUploadQueue().put((stream, peer)) return True def bm_command_portcheck(self): @@ -411,14 +374,15 @@ class BMConnection(TLSDispatcher, BMQueues): def bm_command_version(self): #self.remoteProtocolVersion, self.services, self.timestamp, padding1, self.myExternalIP, padding2, self.remoteNodeIncomingPort = protocol.VersionPacket.unpack(self.payload[:protocol.VersionPacket.size]) self.remoteProtocolVersion, self.services, self.timestamp, self.sockNode, self.peerNode, self.nonce, self.userAgent, self.streams = self.decode_payload_content("IQQiiQlslv") + self.nonce = struct.pack('>Q', self.nonce) self.timeOffset = self.timestamp - int(time.time()) - print "remoteProtocolVersion: %i" % (self.remoteProtocolVersion) - print "services: %08X" % (self.services) - print "time offset: %i" % (self.timestamp - int(time.time())) - print "my external IP: %s" % (self.sockNode.host) - print "remote node incoming port: %i" % (self.peerNode.port) - print "user agent: %s" % (self.userAgent) - print "streams: [%s]" % (",".join(map(str,self.streams))) + #print "remoteProtocolVersion: %i" % (self.remoteProtocolVersion) + #print "services: %08X" % (self.services) + #print "time offset: %i" % (self.timestamp - int(time.time())) + #print "my external IP: %s" % (self.sockNode.host) + #print "remote node incoming port: %i" % (self.peerNode.port) + #print "user agent: %s" % (self.userAgent) + #print "streams: [%s]" % (",".join(map(str,self.streams))) if not self.peerValidityChecks(): # TODO ABORT return True @@ -446,20 +410,20 @@ class BMConnection(TLSDispatcher, BMQueues): self.writeQueue.put(protocol.assembleErrorMessage(fatal=2, errorText="Your is using an old protocol. Closing connection.")) logger.debug ('Closing connection to old protocol version %s, node: %s', - str(self.remoteProtocolVersion), str(self.peer)) + str(self.remoteProtocolVersion), str(self.destination)) return False - if self.timeOffset > 3600: + if self.timeOffset > BMProto.maxTimeOffset: self.writeQueue.put(protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the future compared to mine. Closing connection.")) logger.info("%s's time is too far in the future (%s seconds). Closing connection to it.", - self.peer, self.timeOffset) + self.destination, self.timeOffset) shared.timeOffsetWrongCount += 1 return False - elif self.timeOffset < -3600: + elif self.timeOffset < -BMProto.maxTimeOffset: self.writeQueue.put(protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the past compared to mine. Closing connection.")) logger.info("%s's time is too far in the past (timeOffset %s seconds). Closing connection to it.", - self.peer, self.timeOffset) + self.destination, self.timeOffset) shared.timeOffsetWrongCount += 1 return False else: @@ -468,7 +432,7 @@ class BMConnection(TLSDispatcher, BMQueues): self.writeQueue.put(protocol.assembleErrorMessage(fatal=2, errorText="We don't have shared stream interests. Closing connection.")) logger.debug ('Closed connection to %s because there is no overlapping interest in streams.', - str(self.peer)) + str(self.destination)) return False if self.destination in network.connectionpool.BMConnectionPool().inboundConnections: try: @@ -476,218 +440,63 @@ class BMConnection(TLSDispatcher, BMQueues): self.writeQueue.put(protocol.assembleErrorMessage(fatal=2, errorText="Too many connections from your IP. Closing connection.")) logger.debug ('Closed connection to %s because we are already connected to that IP.', - str(self.peer)) + str(self.destination)) return False except: pass + if self.nonce == protocol.eightBytesOfRandomDataUsedToDetectConnectionsToSelf: + self.writeQueue.put(protocol.assembleErrorMessage(fatal=2, + errorText="I'm connected to myself. Closing connection.")) + logger.debug ("Closed connection to %s because I'm connected to myself.", + str(self.destination)) + return True - def sendAddr(self): - def sendChunk(): - if addressCount == 0: - return - self.writeQueue.put(protocol.CreatePacket('addr', \ - addresses.encodeVarint(addressCount) + payload)) - - # We are going to share a maximum number of 1000 addrs (per overlapping - # stream) with our peer. 500 from overlapping streams, 250 from the - # left child stream, and 250 from the right child stream. - maxAddrCount = BMConfigParser().safeGetInt("bitmessagesettings", "maxaddrperstreamsend", 500) - - # init - addressCount = 0 - payload = b'' - - for stream in self.streams: - addrsInMyStream = {} - addrsInChildStreamLeft = {} - addrsInChildStreamRight = {} - - with knownnodes.knownNodesLock: - if len(knownnodes.knownNodes[stream]) > 0: - filtered = {k: v for k, v in knownnodes.knownNodes[stream].items() - if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} - elemCount = len(filtered) - if elemCount > maxAddrCount: - elemCount = maxAddrCount - # only if more recent than 3 hours - addrsInMyStream = random.sample(filtered.items(), elemCount) - # sent 250 only if the remote isn't interested in it - if len(knownnodes.knownNodes[stream * 2]) > 0 and stream not in self.streams: - filtered = {k: v for k, v in knownnodes.knownNodes[stream*2].items() - if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} - elemCount = len(filtered) - if elemCount > maxAddrCount / 2: - elemCount = int(maxAddrCount / 2) - addrsInChildStreamLeft = random.sample(filtered.items(), elemCount) - if len(knownnodes.knownNodes[(stream * 2) + 1]) > 0 and stream not in self.streams: - filtered = {k: v for k, v in knownnodes.knownNodes[stream*2+1].items() - if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} - elemCount = len(filtered) - if elemCount > maxAddrCount / 2: - elemCount = int(maxAddrCount / 2) - addrsInChildStreamRight = random.sample(filtered.items(), elemCount) - for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInMyStream: - addressCount += 1 - payload += struct.pack( - '>Q', timeLastReceivedMessageFromThisNode) # 64-bit time - payload += struct.pack('>I', stream) - payload += struct.pack( - '>q', 1) # service bit flags offered by this node - payload += protocol.encodeHost(HOST) - payload += struct.pack('>H', PORT) # remote port - if addressCount >= BMConnection.maxAddrCount: - sendChunk() - payload = b'' - addressCount = 0 - for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamLeft: - addressCount += 1 - payload += struct.pack( - '>Q', timeLastReceivedMessageFromThisNode) # 64-bit time - payload += struct.pack('>I', stream * 2) - payload += struct.pack( - '>q', 1) # service bit flags offered by this node - payload += protocol.encodeHost(HOST) - payload += struct.pack('>H', PORT) # remote port - if addressCount >= BMConnection.maxAddrCount: - sendChunk() - payload = b'' - addressCount = 0 - for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamRight: - addressCount += 1 - payload += struct.pack( - '>Q', timeLastReceivedMessageFromThisNode) # 64-bit time - payload += struct.pack('>I', (stream * 2) + 1) - payload += struct.pack( - '>q', 1) # service bit flags offered by this node - payload += protocol.encodeHost(HOST) - payload += struct.pack('>H', PORT) # remote port - if addressCount >= BMConnection.maxAddrCount: - sendChunk() - payload = b'' - addressCount = 0 - - # flush - sendChunk() - - def sendBigInv(self): - def sendChunk(): - if objectCount == 0: - return - logger.debug('Sending huge inv message with %i objects to just this one peer', objectCount) - self.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(objectCount) + payload)) - - # Select all hashes for objects in this stream. - bigInvList = {} - for stream in self.streams: - for hash in Inventory().unexpired_hashes_by_stream(stream): - bigInvList[hash] = 0 -# for hash in ObjUploadQueue().streamHashes(stream): -# try: -# del bigInvList[hash] -# except KeyError: -# pass - objectCount = 0 - payload = b'' - # Now let us start appending all of these hashes together. They will be - # sent out in a big inv message to our new peer. - for hash, storedValue in bigInvList.items(): - payload += hash - objectCount += 1 - if objectCount >= BMConnection.maxObjectCount: - self.sendChunk() - payload = b'' - objectCount = 0 - - # flush - sendChunk() + @staticmethod + def assembleAddr(peerList): + if type(peerList) is state.Peer: + peerList = (peerList) + # TODO handle max length, now it's done by upper layers + payload = addresses.encodeVarint(len(peerList)) + for address in peerList: + stream, peer, timestamp = address + payload += struct.pack( + '>Q', timestamp) # 64-bit time + payload += struct.pack('>I', stream) + payload += struct.pack( + '>q', 1) # service bit flags offered by this node + payload += protocol.encodeHost(peer.host) + payload += struct.pack('>H', peer.port) # remote port + return protocol.CreatePacket('addr', payload) def handle_connect_event(self): try: asyncore.dispatcher.handle_connect_event(self) self.connectedAt = time.time() except socket.error as e: - print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) + #print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) self.close() def handle_read_event(self): try: asyncore.dispatcher.handle_read_event(self) except socket.error as e: - print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) + #print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) self.close() def handle_write_event(self): try: asyncore.dispatcher.handle_write_event(self) except socket.error as e: - print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) + #print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) self.close() def close(self, reason=None): self.set_state("close") - if reason is None: - print "%s:%i: closing" % (self.destination.host, self.destination.port) - #traceback.print_stack() - else: - print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason) +# if reason is None: +# print "%s:%i: closing" % (self.destination.host, self.destination.port) +# #traceback.print_stack() +# else: +# print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason) network.connectionpool.BMConnectionPool().removeConnection(self) - asyncore.dispatcher.close(self) - - -class Socks5BMConnection(Socks5Connection, BMConnection): - def __init__(self, address): - Socks5Connection.__init__(self, address=address) - - def state_socks_handshake_done(self): - BMConnection.state_init(self) - return False - - -class Socks4aBMConnection(Socks4aConnection, BMConnection): - def __init__(self, address): - Socks4aConnection.__init__(self, address=address) - - def state_socks_handshake_done(self): - BMConnection.state_init(self) - return False - - -class BMServer(AdvancedDispatcher): - def __init__(self, host='127.0.0.1', port=8444): - if not hasattr(self, '_map'): - AdvancedDispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - self.bind((host, port)) - self.listen(5) - - def handle_accept(self): - pair = self.accept() - if pair is not None: - sock, addr = pair - try: - network.connectionpool.BMConnectionPool().addConnection(BMConnection(sock=sock)) - except socket.errno: - pass - - -if __name__ == "__main__": - # initial fill - - for host in (("127.0.0.1", 8448),): - direct = BMConnection(host) - while len(asyncore.socket_map) > 0: - print "loop, state = %s" % (direct.state) - asyncore.loop(timeout=10, count=1) - continue - - proxy = Socks5BMConnection(host) - while len(asyncore.socket_map) > 0: -# print "loop, state = %s" % (proxy.state) - asyncore.loop(timeout=10, count=1) - - proxy = Socks4aBMConnection(host) - while len(asyncore.socket_map) > 0: -# print "loop, state = %s" % (proxy.state) - asyncore.loop(timeout=10, count=1) + AdvancedDispatcher.close(self) diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index 1c8d988d..05ef47bd 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -3,7 +3,7 @@ import random from bmconfigparser import BMConfigParser import knownnodes -from queues import portCheckerQueue +from queues import portCheckerQueue, peerDiscoveryQueue import state def chooseConnection(stream): @@ -13,4 +13,7 @@ def chooseConnection(stream): try: return portCheckerQueue.get(False) except Queue.Empty: - return random.choice(knownnodes.knownNodes[stream].keys()) + try: + return peerDiscoveryQueue.get(False) + except Queue.Empty: + return random.choice(knownnodes.knownNodes[stream].keys()) diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 8d4f4539..a75b62aa 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -2,11 +2,14 @@ import errno import socket import time import random +import re from bmconfigparser import BMConfigParser from debug import logger import helper_bootstrap import network.bmproto +import network.tcp +import network.udp from network.connectionchooser import chooseConnection import network.asyncore_pollchoose as asyncore import protocol @@ -23,33 +26,31 @@ class BMConnectionPool(object): self.outboundConnections = {} self.inboundConnections = {} self.listeningSockets = {} + self.udpSockets = {} self.streams = [] self.bootstrapped = False def handleReceivedObject(self, connection, streamNumber, hashid): for i in self.inboundConnections.values() + self.outboundConnections.values(): - if not isinstance(i, network.bmproto.BMConnection): + if not isinstance(i, network.bmproto.BMProto): continue + try: + del i.objectsNewToMe[hashid] + except KeyError: + i.objectsNewToThem[hashid] = True if i == connection: try: del i.objectsNewToThem[hashid] except KeyError: pass - else: - try: - del i.objectsNewToThem[hashid] - except KeyError: - i.objectsNewToThem[hashid] = True - try: - del i.objectsNewToMe[hashid] - except KeyError: - pass def connectToStream(self, streamNumber): self.streams.append(streamNumber) def addConnection(self, connection): + if isinstance(connection, network.udp.UDPSocket): + return if connection.isOutbound: self.outboundConnections[connection.destination] = connection else: @@ -59,7 +60,9 @@ class BMConnectionPool(object): self.inboundConnections[connection.destination.host] = connection def removeConnection(self, connection): - if connection.isOutbound: + if isinstance(connection, network.udp.UDPSocket): + return + elif connection.isOutbound: try: del self.outboundConnections[connection.destination] except KeyError: @@ -73,16 +76,29 @@ class BMConnectionPool(object): except KeyError: pass - def startListening(self): - port = BMConfigParser().safeGetInt("bitmessagesettings", "port") + def getListeningIP(self): if BMConfigParser().safeGet("bitmessagesettings", "onionhostname").endswith(".onion"): host = BMConfigParser().safeGet("bitmessagesettigns", "onionbindip") else: host = '127.0.0.1' if BMConfigParser().safeGetBoolean("bitmessagesettings", "sockslisten") or \ BMConfigParser().get("bitmessagesettings", "socksproxytype") == "none": + # python doesn't like bind + INADDR_ANY? + #host = socket.INADDR_ANY host = '' - self.listeningSockets[state.Peer(host, port)] = network.bmproto.BMServer(host=host, port=port) + return host + + def startListening(self): + host = self.getListeningIP() + port = BMConfigParser().safeGetInt("bitmessagesettings", "port") + self.listeningSockets[state.Peer(host, port)] = network.tcp.TCPServer(host=host, port=port) + + def startUDPSocket(self, bind=None): + if bind is None: + host = self.getListeningIP() + self.udpSockets[host] = network.udp.UDPSocket(host=host) + else: + self.udpSockets[bind] = network.udp.UDPSocket(host=bind) def loop(self): # defaults to empty loop if outbound connections are maxed @@ -122,11 +138,11 @@ class BMConnectionPool(object): # continue try: if (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS5"): - self.addConnection(network.bmproto.Socks5BMConnection(chosen)) + self.addConnection(network.tcp.Socks5BMConnection(chosen)) elif (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS4a"): - self.addConnection(network.bmproto.Socks4aBMConnection(chosen)) + self.addConnection(network.tcp.Socks4aBMConnection(chosen)) elif not chosen.host.endswith(".onion"): - self.addConnection(network.bmproto.BMConnection(chosen)) + self.addConnection(network.tcp.TCPConnection(chosen)) except socket.error as e: if e.errno == errno.ENETUNREACH: continue @@ -134,10 +150,23 @@ class BMConnectionPool(object): if acceptConnections and len(self.listeningSockets) == 0: self.startListening() logger.info('Listening for incoming connections.') + if acceptConnections and len(self.udpSockets) == 0: + if BMConfigParser().safeGet("network", "bind") is None: + self.startUDPSocket() + else: + for bind in re.sub("[^\w.]+", " ", BMConfigParser().safeGet("network", "bind")).split(): + self.startUDPSocket(bind) + logger.info('Starting UDP socket(s).') if len(self.listeningSockets) > 0 and not acceptConnections: for i in self.listeningSockets: i.close() + self.listeningSockets = {} logger.info('Stopped listening for incoming connections.') + if len(self.udpSockets) > 0 and not acceptConnections: + for i in self.udpSockets: + i.close() + self.udpSockets = {} + logger.info('Stopped udp sockets.') # while len(asyncore.socket_map) > 0 and state.shutdown == 0: # print "loop, state = %s" % (proxy.state) diff --git a/src/network/bmqueues.py b/src/network/objectracker.py similarity index 74% rename from src/network/bmqueues.py rename to src/network/objectracker.py index 96ad52e4..246916b9 100644 --- a/src/network/bmqueues.py +++ b/src/network/objectracker.py @@ -1,3 +1,4 @@ +from Queue import Queue import time from inventory import Inventory @@ -21,7 +22,7 @@ except ImportError: # it isn't actually implemented yet so no point in turning it on haveBloom = False -class BMQueues(object): +class ObjectTracker(object): invCleanPeriod = 300 invInitialCapacity = 50000 invErrorRate = 0.03 @@ -29,20 +30,22 @@ class BMQueues(object): def __init__(self): self.objectsNewToMe = {} self.objectsNewToThem = {} + self.downloadPending = 0 + self.downloadQueue = Queue() self.initInvBloom() self.initAddrBloom() def initInvBloom(self): if haveBloom: # lock? - self.invBloom = BloomFilter(capacity=BMQueues.invInitialCapacity, - error_rate=BMQueues.invErrorRate) + self.invBloom = BloomFilter(capacity=ObjectTracker.invInitialCapacity, + error_rate=ObjectTracker.invErrorRate) def initAddrBloom(self): if haveBloom: # lock? - self.addrBloom = BloomFilter(capacity=BMQueues.invInitialCapacity, - error_rate=BMQueues.invErrorRate) + self.addrBloom = BloomFilter(capacity=ObjectTracker.invInitialCapacity, + error_rate=ObjectTracker.invErrorRate) def clean(self): if self.lastcleaned < time.time() - BMQueues.invCleanPeriod: @@ -61,16 +64,17 @@ class BMQueues(object): else: return hashid in self.objectsNewToMe - def handleReceivedObj(self, hashid): + def handleReceivedInventory(self, hashId): if haveBloom: - self.invBloom.add(hashid) - elif hashid in Inventory(): + self.invBloom.add(hashId) + elif hashId in Inventory(): try: - del self.objectsNewToThem[hashid] + del self.objectsNewToThem[hashId] except KeyError: pass else: - self.objectsNewToMe[hashid] = True + self.objectsNewToMe[hashId] = True +# self.DownloadQueue.put(hashId) def hasAddr(self, addr): if haveBloom: @@ -82,6 +86,7 @@ class BMQueues(object): # addr sending -> per node upload queue, and flush every minute or so # inv sending -> if not in bloom, inv immediately, otherwise put into a per node upload queue and flush every minute or so +# data sending -> a simple queue # no bloom # - if inv arrives diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 6405238d..27a01902 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -1,11 +1,14 @@ import Queue import threading +import time +import addresses from bmconfigparser import BMConfigParser from debug import logger from helper_threading import StoppableThread from inventory import Inventory from network.connectionpool import BMConnectionPool +from network.bmproto import BMProto import protocol class ReceiveQueueThread(threading.Thread, StoppableThread): @@ -14,12 +17,17 @@ class ReceiveQueueThread(threading.Thread, StoppableThread): self.initStop() self.name = "ReceiveQueueThread" BMConnectionPool() - logger.error("init asyncore thread") + logger.error("init receive queue thread") def run(self): + lastprinted = int(time.time()) while not self._stopped: + if lastprinted < int(time.time()): + lastprinted = int(time.time()) processed = 0 for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): + if self._stopped: + break try: command, args = i.receiveQueue.get(False) except Queue.Empty: @@ -31,7 +39,7 @@ class ReceiveQueueThread(threading.Thread, StoppableThread): # missing command raise if processed == 0: - self.stop.wait(0.2) + self.stop.wait(2) def command_object(self, connection, objHash): try: @@ -40,5 +48,36 @@ class ReceiveQueueThread(threading.Thread, StoppableThread): connection.antiIntersectionDelay() logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (connection.destination,)) + def command_biginv(self, connection, dummy): + def sendChunk(): + if objectCount == 0: + return + logger.debug('Sending huge inv message with %i objects to just this one peer', objectCount) + connection.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(objectCount) + payload)) + + # Select all hashes for objects in this stream. + bigInvList = {} + for stream in connection.streams: + for objHash in Inventory().unexpired_hashes_by_stream(stream): + bigInvList[objHash] = 0 + connection.objectsNewToThem[objHash] = True + objectCount = 0 + payload = b'' + # Now let us start appending all of these hashes together. They will be + # sent out in a big inv message to our new peer. + for hash, storedValue in bigInvList.items(): + payload += hash + objectCount += 1 + if objectCount >= BMProto.maxObjectCount: + self.sendChunk() + payload = b'' + objectCount = 0 + + # flush + sendChunk() + + def command_inv(self, connection, hashId): + connection.handleReceivedInventory(hashId) + def stopThread(self): super(ReceiveQueueThread, self).stopThread() diff --git a/src/network/tcp.py b/src/network/tcp.py new file mode 100644 index 00000000..ef54fc18 --- /dev/null +++ b/src/network/tcp.py @@ -0,0 +1,236 @@ +import base64 +from binascii import hexlify +import hashlib +import math +import time +from pprint import pprint +import socket +import struct +import random +import traceback + +from addresses import calculateInventoryHash +from debug import logger +from inventory import Inventory +import knownnodes +from network.advanceddispatcher import AdvancedDispatcher +from network.bmproto import BMProtoError, BMProtoInsufficientDataError, BMProtoExcessiveDataError, BMProto +from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError +import network.connectionpool +from network.downloadqueue import DownloadQueue +from network.node import Node +import network.asyncore_pollchoose as asyncore +from network.proxy import Proxy, ProxyError, GeneralProxyError +from network.objectracker import ObjectTracker +from network.socks5 import Socks5Connection, Socks5Resolver, Socks5AuthError, Socks5Error +from network.socks4a import Socks4aConnection, Socks4aResolver, Socks4aError +from network.uploadqueue import UploadQueue, UploadElem, AddrUploadQueue, ObjUploadQueue +from network.tls import TLSDispatcher + +import addresses +from bmconfigparser import BMConfigParser +from queues import objectProcessorQueue, portCheckerQueue, UISignalQueue +import shared +import state +import protocol + +class TCPConnection(BMProto, TLSDispatcher): + def __init__(self, address=None, sock=None): + AdvancedDispatcher.__init__(self, sock) + self.verackReceived = False + self.verackSent = False + self.streams = [0] + self.fullyEstablished = False + self.connectedAt = 0 + self.skipUntil = 0 + if address is None and sock is not None: + self.destination = state.Peer(sock.getpeername()[0], sock.getpeername()[1]) + self.isOutbound = False + TLSDispatcher.__init__(self, sock, server_side=True) + self.connectedAt = time.time() + #print "received connection in background from %s:%i" % (self.destination.host, self.destination.port) + else: + self.destination = address + self.isOutbound = True + if ":" in address.host: + self.create_socket(socket.AF_INET6, socket.SOCK_STREAM) + else: + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + TLSDispatcher.__init__(self, sock, server_side=False) + self.connect(self.destination) + #print "connecting in background to %s:%i" % (self.destination.host, self.destination.port) + shared.connectedHostsList[self.destination] = 0 + ObjectTracker.__init__(self) + UISignalQueue.put(('updateNetworkStatusTab', 'no data')) + + def state_init(self): + self.bm_proto_reset() + if self.isOutbound: + self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False)) + print "%s:%i: Sending version" % (self.destination.host, self.destination.port) + self.set_state("bm_header") + return True + + def antiIntersectionDelay(self, initial = False): + # estimated time for a small object to propagate across the whole network + delay = math.ceil(math.log(max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) + 2, 20)) * (0.2 + UploadQueue.queueCount/2) + # take the stream with maximum amount of nodes + # +2 is to avoid problems with log(0) and log(1) + # 20 is avg connected nodes count + # 0.2 is avg message transmission time + if delay > 0: + if initial: + self.skipUntil = self.connectedAt + delay + if self.skipUntil > time.time(): + logger.debug("Skipping processing for %.2fs", self.skipUntil - time.time()) + else: + logger.debug("Skipping processing due to missing object for %.2fs", self.skipUntil - time.time()) + self.skipUntil = time.time() + delay + + def set_connection_fully_established(self): + UISignalQueue.put(('updateNetworkStatusTab', 'no data')) + self.antiIntersectionDelay(True) + self.fullyEstablished = True + self.sendAddr() + self.sendBigInv() + + def sendAddr(self): + # We are going to share a maximum number of 1000 addrs (per overlapping + # stream) with our peer. 500 from overlapping streams, 250 from the + # left child stream, and 250 from the right child stream. + maxAddrCount = BMConfigParser().safeGetInt("bitmessagesettings", "maxaddrperstreamsend", 500) + + # init + addressCount = 0 + payload = b'' + + templist = [] + addrs = {} + for stream in self.streams: + with knownnodes.knownNodesLock: + if len(knownnodes.knownNodes[stream]) > 0: + filtered = {k: v for k, v in knownnodes.knownNodes[stream].items() + if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} + elemCount = len(filtered) + if elemCount > maxAddrCount: + elemCount = maxAddrCount + # only if more recent than 3 hours + addrs[stream] = random.sample(filtered.items(), elemCount) + # sent 250 only if the remote isn't interested in it + if len(knownnodes.knownNodes[stream * 2]) > 0 and stream not in self.streams: + filtered = {k: v for k, v in knownnodes.knownNodes[stream*2].items() + if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} + elemCount = len(filtered) + if elemCount > maxAddrCount / 2: + elemCount = int(maxAddrCount / 2) + addrs[stream * 2] = random.sample(filtered.items(), elemCount) + if len(knownnodes.knownNodes[(stream * 2) + 1]) > 0 and stream not in self.streams: + filtered = {k: v for k, v in knownnodes.knownNodes[stream*2+1].items() + if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} + elemCount = len(filtered) + if elemCount > maxAddrCount / 2: + elemCount = int(maxAddrCount / 2) + addrs[stream * 2 + 1] = random.sample(filtered.items(), elemCount) + for substream in addrs.keys(): + for peer, timestamp in addrs[substream]: + templist.append((substream, peer, timestamp)) + if len(templist) >= BMProto.maxAddrCount: + self.writeQueue.put(BMProto.assembleAddr(templist)) + templist = [] + # flush + if len(templist) > 0: + self.writeQueue.put(BMProto.assembleAddr(templist)) + + def sendBigInv(self): + self.receiveQueue.put(("biginv", None)) + + def handle_connect_event(self): + try: + asyncore.dispatcher.handle_connect_event(self) + self.connectedAt = time.time() + except socket.error as e: + #print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) + self.close() + + def handle_read_event(self): + try: + asyncore.dispatcher.handle_read_event(self) + except socket.error as e: + #print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) + self.close() + + def handle_write_event(self): + try: + asyncore.dispatcher.handle_write_event(self) + except socket.error as e: + #print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) + self.close() + + def close(self, reason=None): + self.set_state("close") +# if reason is None: +# print "%s:%i: closing" % (self.destination.host, self.destination.port) +# #traceback.print_stack() +# else: +# print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason) + network.connectionpool.BMConnectionPool().removeConnection(self) + asyncore.dispatcher.close(self) + + +class Socks5BMConnection(Socks5Connection, TCPConnection): + def __init__(self, address): + Socks5Connection.__init__(self, address=address) + + def state_socks_handshake_done(self): + TCPConnection.state_init(self) + return False + + +class Socks4aBMConnection(Socks4aConnection, TCPConnection): + def __init__(self, address): + Socks4aConnection.__init__(self, address=address) + + def state_socks_handshake_done(self): + TCPConnection.state_init(self) + return False + + +class TCPServer(AdvancedDispatcher): + def __init__(self, host='127.0.0.1', port=8444): + if not hasattr(self, '_map'): + AdvancedDispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind((host, port)) + self.listen(5) + + def handle_accept(self): + pair = self.accept() + if pair is not None: + sock, addr = pair + try: + network.connectionpool.BMConnectionPool().addConnection(TCPConnection(sock=sock)) + except socket.error: + pass + + +if __name__ == "__main__": + # initial fill + + for host in (("127.0.0.1", 8448),): + direct = TCPConnection(host) + while len(asyncore.socket_map) > 0: + print "loop, state = %s" % (direct.state) + asyncore.loop(timeout=10, count=1) + continue + + proxy = Socks5BMConnection(host) + while len(asyncore.socket_map) > 0: +# print "loop, state = %s" % (proxy.state) + asyncore.loop(timeout=10, count=1) + + proxy = Socks4aBMConnection(host) + while len(asyncore.socket_map) > 0: +# print "loop, state = %s" % (proxy.state) + asyncore.loop(timeout=10, count=1) diff --git a/src/network/tls.py b/src/network/tls.py index d2abb6b9..f79f0650 100644 --- a/src/network/tls.py +++ b/src/network/tls.py @@ -60,7 +60,7 @@ class TLSDispatcher(AdvancedDispatcher): def writable(self): try: - if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: + if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0 and self.writeQueue.empty(): #print "tls writable, %r" % (self.want_write) return self.want_write else: @@ -70,7 +70,7 @@ class TLSDispatcher(AdvancedDispatcher): def readable(self): try: - if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: + if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0 and self.writeQueue.empty(): #print "tls readable, %r" % (self.want_read) return self.want_read else: @@ -81,7 +81,7 @@ class TLSDispatcher(AdvancedDispatcher): def handle_read(self): try: # wait for write buffer flush - if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: + if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0 and self.writeQueue.empty(): #print "handshaking (read)" self.state_tls_handshake() else: @@ -93,7 +93,7 @@ class TLSDispatcher(AdvancedDispatcher): def handle_write(self): try: # wait for write buffer flush - if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: + if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0 and self.writeQueue.empty(): #print "handshaking (write)" self.state_tls_handshake() else: diff --git a/src/network/udp.py b/src/network/udp.py new file mode 100644 index 00000000..81bcc06a --- /dev/null +++ b/src/network/udp.py @@ -0,0 +1,198 @@ +import base64 +from binascii import hexlify +import hashlib +import math +import time +from pprint import pprint +import socket +import struct +import random +import traceback + +from addresses import calculateInventoryHash +from debug import logger +from inventory import Inventory +import knownnodes +from network.advanceddispatcher import AdvancedDispatcher +from network.bmproto import BMProtoError, BMProtoInsufficientDataError, BMProtoExcessiveDataError, BMProto +from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError +import network.connectionpool +from network.downloadqueue import DownloadQueue +from network.node import Node +import network.asyncore_pollchoose as asyncore +from network.objectracker import ObjectTracker +from network.uploadqueue import UploadQueue, UploadElem, AddrUploadQueue, ObjUploadQueue + +import addresses +from bmconfigparser import BMConfigParser +from queues import objectProcessorQueue, peerDiscoveryQueue, portCheckerQueue, UISignalQueue +import shared +import state +import protocol + +class UDPSocket(BMProto): + port = 8444 + announceInterval = 60 + + def __init__(self, host=None, sock=None): + AdvancedDispatcher.__init__(self, sock) + self.verackReceived = True + self.verackSent = True + # TODO sort out streams + self.streams = [1] + self.fullyEstablished = True + self.connectedAt = 0 + self.skipUntil = 0 + self.isOutbound = False + if sock is None: + if host is None: + host = '' + if ":" in host: + self.create_socket(socket.AF_INET6, socket.SOCK_DGRAM) + else: + self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) + print "binding to %s" % (host) + self.socket.bind((host, UDPSocket.port)) + #BINDTODEVICE is only available on linux and requires root + #try: + #print "binding to %s" % (host) + #self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, host) + #except AttributeError: + else: + self.socket = sock + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + self.destination = state.Peer(self.socket.getsockname()[0], self.socket.getsockname()[1]) + ObjectTracker.__init__(self) + self.connecting = False + self.connected = True + # packet was received from a local IP + self.local = False + self.set_state("bm_header") + + # disable most commands before doing research / testing + # only addr (peer discovery), error and object are implemented + + def bm_command_error(self): + return BMProto.bm_command_error(self) + + def bm_command_getdata(self): + return True +# return BMProto.bm_command_getdata(self) + + def bm_command_inv(self): + return True +# return BMProto.bm_command_inv(self) + + def bm_command_object(self): + return BMProto.bm_command_object(self) + + def bm_command_addr(self): +# BMProto.bm_command_object(self) + addresses = self._decode_addr() + # only allow peer discovery from private IPs in order to avoid attacks from random IPs on the internet + if not self.local: + return + remoteport = False + for i in addresses: + seenTime, stream, services, ip, port = i + decodedIP = protocol.checkIPAddress(ip) + if stream not in state.streamsInWhichIAmParticipating: + continue + if seenTime < time.time() - BMProto.maxtimeOffset or seenTime > time.time() + BMProto.maxTimeOffset: + continue + if decodedIP is False: + # if the address isn't local, interpret it as the hosts' own announcement + remoteport = port + if remoteport is False: + return + print "received peer discovery from %s:%i (port %i):" % (self.destination.host, self.destination.port, remoteport) + if self.local: + peerDiscoveryQueue.put(state.peer(self.destination.host, remoteport)) + return True + + def bm_command_portcheck(self): + return True + + def bm_command_ping(self): + return True + + def bm_command_pong(self): + return True + + def bm_command_verack(self): + return True + + def bm_command_version(self): + return True + + def handle_connect_event(self): + return + + def writable(self): + return not self.writeQueue.empty() + + def readable(self): + return len(self.read_buf) < AdvancedDispatcher._buf_len + + def handle_read(self): + print "read!" + try: + (addr, recdata) = self.socket.recvfrom(AdvancedDispatcher._buf_len) + except socket.error as e: + print "socket error: %s" % (str(e)) + return + + self.destination = state.Peer(addr[0], addr[1]) + encodedAddr = socket.inet_pton(self.socket.family, addr[0]) + if protocol.checkIPAddress(encodedAddr, True): + self.local = True + else: + self.local = False + # overwrite the old buffer to avoid mixing data and so that self.local works correctly + self.read_buf = data + self.process() + + def handle_write(self): +# print "handling write" + try: + data = self.writeQueue.get(False) + except Queue.Empty: + return + try: + retval = self.socket.sendto(data, ('', UDPSocket.port)) +# print "broadcasted %ib" % (retval) + except socket.error as e: + print "socket error on sendato: %s" % (e) + + def close(self, reason=None): + self.set_state("close") +# if reason is None: +# print "%s:%i: closing" % (self.destination.host, self.destination.port) +# #traceback.print_stack() +# else: +# print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason) + network.connectionpool.BMConnectionPool().removeConnection(self) + asyncore.dispatcher.close(self) + + +if __name__ == "__main__": + # initial fill + + for host in (("127.0.0.1", 8448),): + direct = BMConnection(host) + while len(asyncore.socket_map) > 0: + print "loop, state = %s" % (direct.state) + asyncore.loop(timeout=10, count=1) + continue + + proxy = Socks5BMConnection(host) + while len(asyncore.socket_map) > 0: +# print "loop, state = %s" % (proxy.state) + asyncore.loop(timeout=10, count=1) + + proxy = Socks4aBMConnection(host) + while len(asyncore.socket_map) > 0: +# print "loop, state = %s" % (proxy.state) + asyncore.loop(timeout=10, count=1) diff --git a/src/protocol.py b/src/protocol.py index 83ecb7bd..5661dff0 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -86,13 +86,15 @@ def networkType(host): else: return 'IPv6' -def checkIPAddress(host): +def checkIPAddress(host, private=False): if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': hostStandardFormat = socket.inet_ntop(socket.AF_INET, host[12:]) - return checkIPv4Address(host[12:], hostStandardFormat) + return checkIPv4Address(host[12:], hostStandardFormat, private) elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43': # Onion, based on BMD/bitcoind hostStandardFormat = base64.b32encode(host[6:]).lower() + ".onion" + if private: + return False return hostStandardFormat else: hostStandardFormat = socket.inet_ntop(socket.AF_INET6, host) @@ -100,34 +102,34 @@ def checkIPAddress(host): # This can happen on Windows systems which are not 64-bit compatible # so let us drop the IPv6 address. return False - return checkIPv6Address(host, hostStandardFormat) + return checkIPv6Address(host, hostStandardFormat, private) -def checkIPv4Address(host, hostStandardFormat): +def checkIPv4Address(host, hostStandardFormat, private=False): if host[0] == '\x7F': # 127/8 logger.debug('Ignoring IP address in loopback range: ' + hostStandardFormat) return False if host[0] == '\x0A': # 10/8 logger.debug('Ignoring IP address in private range: ' + hostStandardFormat) - return False + return hostStandardFormat if private else False if host[0:2] == '\xC0\xA8': # 192.168/16 logger.debug('Ignoring IP address in private range: ' + hostStandardFormat) - return False + return hostStandardFormat if private else False if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12 logger.debug('Ignoring IP address in private range:' + hostStandardFormat) return False - return hostStandardFormat + return False if private else hostStandardFormat -def checkIPv6Address(host, hostStandardFormat): +def checkIPv6Address(host, hostStandardFormat, private=False): if host == ('\x00' * 15) + '\x01': logger.debug('Ignoring loopback address: ' + hostStandardFormat) return False if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80: logger.debug ('Ignoring local address: ' + hostStandardFormat) - return False + return hostStandardFormat if private else False if (ord(host[0]) & 0xfe) == 0xfc: logger.debug ('Ignoring unique local address: ' + hostStandardFormat) - return False - return hostStandardFormat + return hostStandardFormat if private else False + return False if private else hostStandardFormat # checks diff --git a/src/queues.py b/src/queues.py index c6b09307..a11bedeb 100644 --- a/src/queues.py +++ b/src/queues.py @@ -7,5 +7,6 @@ addressGeneratorQueue = Queue.Queue() # receiveDataThreads dump objects they hear on the network into this queue to be processed. objectProcessorQueue = ObjectProcessorQueue() portCheckerQueue = Queue.Queue() +peerDiscoveryQueue = Queue.Queue() apiAddressGeneratorReturnQueue = Queue.Queue( ) # The address generator thread uses this queue to get information back to the API thread.