From e309a1edb3e01a3e9e9ca30a5f263e8362b63c26 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Thu, 25 May 2017 23:04:33 +0200 Subject: [PATCH] Asyncore update - separate queue for processing blocking stuff on reception - rewrote write buffer as a queue - some addr handling - number of half open connections correct --- src/bitmessagemain.py | 12 ++++-- src/network/advanceddispatcher.py | 36 +++++++++------- src/network/bmproto.py | 68 +++++++++++++++++++------------ src/network/connectionpool.py | 53 ++++++++++++------------ src/network/networkthread.py | 2 +- src/network/receivequeuethread.py | 44 ++++++++++++++++++++ src/network/socks4a.py | 24 +++++------ src/network/socks5.py | 22 +++++----- src/protocol.py | 45 +++++++++++++++++++- src/state.py | 2 + 10 files changed, 213 insertions(+), 95 deletions(-) create mode 100644 src/network/receivequeuethread.py diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 2c0be937..c05d002a 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -52,6 +52,7 @@ from bmconfigparser import BMConfigParser from network.connectionpool import BMConnectionPool from network.networkthread import BMNetworkThread +from network.receivequeuethread import ReceiveQueueThread # Helper Functions import helper_bootstrap @@ -65,13 +66,13 @@ def connectToStream(streamNumber): if isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections(): # Some XP and Vista systems can only have 10 outgoing connections at a time. - maximumNumberOfHalfOpenConnections = 9 + state.maximumNumberOfHalfOpenConnections = 9 else: - maximumNumberOfHalfOpenConnections = 64 + state.maximumNumberOfHalfOpenConnections = 64 try: # don't overload Tor if BMConfigParser().get('bitmessagesettings', 'socksproxytype') != 'none': - maximumNumberOfHalfOpenConnections = 4 + state.maximumNumberOfHalfOpenConnections = 4 except: pass @@ -86,7 +87,7 @@ def connectToStream(streamNumber): if BMConfigParser().safeGetBoolean("network", "asyncore"): BMConnectionPool().connectToStream(streamNumber) else: - for i in range(maximumNumberOfHalfOpenConnections): + for i in range(state.maximumNumberOfHalfOpenConnections): a = outgoingSynSender() a.setup(streamNumber, selfInitiatedConnections) a.start() @@ -252,6 +253,9 @@ class Main: asyncoreThread = BMNetworkThread() asyncoreThread.daemon = False asyncoreThread.start() + receiveQueueThread = ReceiveQueueThread() + receiveQueueThread.daemon = False + receiveQueueThread.start() connectToStream(1) diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index d1b5f567..fb28f3d4 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -1,4 +1,4 @@ -from threading import RLock +import Queue import time import asyncore_pollchoose as asyncore @@ -12,20 +12,16 @@ class AdvancedDispatcher(asyncore.dispatcher): asyncore.dispatcher.__init__(self, sock) self.read_buf = b"" self.write_buf = b"" - self.writeLock = RLock() + self.writeQueue = Queue.Queue() + self.receiveQueue = Queue.Queue() self.state = "init" self.lastTx = time.time() self.sentBytes = 0 self.receivedBytes = 0 - def append_write_buf(self, string = None): - with self.writeLock: - self.write_buf += string - def slice_write_buf(self, length=0): if length > 0: - with self.writeLock: - self.write_buf = self.write_buf[length:] + self.write_buf = self.write_buf[length:] def slice_read_buf(self, length=0): if length > 0: @@ -54,7 +50,7 @@ class AdvancedDispatcher(asyncore.dispatcher): self.state = state def writable(self): - return self.connecting or len(self.write_buf) > 0 + return self.connecting or len(self.write_buf) > 0 or not self.writeQueue.empty() def readable(self): return self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len @@ -74,18 +70,28 @@ class AdvancedDispatcher(asyncore.dispatcher): def handle_write(self): self.lastTx = time.time() if asyncore.maxUploadRate > 0: - written = self.send(self.write_buf[0:asyncore.uploadChunk]) - asyncore.uploadBucket -= written + bufSize = asyncore.uploadChunk else: - written = self.send(self.write_buf) - asyncore.updateSent(written) - self.sentBytes += written - self.slice_write_buf(written) + bufSize = self._buf_len + while len(self.write_buf) < bufSize: + try: + self.write_buf += self.writeQueue.get(False) + except Queue.Empty: + break + if len(self.write_buf) > 0: + written = self.send(self.write_buf[0:bufSize]) + asyncore.uploadBucket -= written + asyncore.updateSent(written) + self.sentBytes += written + self.slice_write_buf(written) def handle_connect(self): self.lastTx = time.time() self.process() + def state_close(self): + pass + def close(self): self.read_buf = b"" self.write_buf = b"" diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 3a5ea1b4..88a8f794 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -58,7 +58,7 @@ class BMConnection(TLSDispatcher, BMQueues): self.verackSent = False self.lastTx = time.time() self.streams = [0] - self.connectionFullyEstablished = False + self.fullyEstablished = False self.connectedAt = 0 self.skipUntil = 0 if address is None and sock is not None: @@ -95,8 +95,8 @@ class BMConnection(TLSDispatcher, BMQueues): def state_init(self): self.bm_proto_reset() if self.isOutbound: - self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False)) - print "%s:%i: Sending version (%ib)" % (self.destination.host, self.destination.port, len(self.write_buf)) + self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False)) + print "%s:%i: Sending version" % (self.destination.host, self.destination.port) self.set_state("bm_header") return True @@ -114,12 +114,12 @@ class BMConnection(TLSDispatcher, BMQueues): logger.debug("Skipping processing for %.2fs", self.skipUntil - time.time()) else: logger.debug("Skipping processing due to missing object for %.2fs", self.skipUntil - time.time()) - self.skipUntil = time.time() + now + self.skipUntil = time.time() + delay def set_connection_fully_established(self): UISignalQueue.put(('updateNetworkStatusTab', 'no data')) self.antiIntersectionDelay(True) - self.connectionFullyEstablished = True + self.fullyEstablished = True self.sendAddr() self.sendBigInv() @@ -135,6 +135,8 @@ class BMConnection(TLSDispatcher, BMQueues): self.bm_proto_reset() self.set_state("bm_header", 1) print "Bad magic" + self.close() + return False if self.payloadLength > BMConnection.maxMessageSize: self.invalid = True self.set_state("bm_command", protocol.Header.size) @@ -150,7 +152,7 @@ class BMConnection(TLSDispatcher, BMQueues): print "Bad checksum, ignoring" self.invalid = True retval = True - if not self.connectionFullyEstablished and self.command not in ("version", "verack"): + if not self.fullyEstablished and self.command not in ("version", "verack"): logger.error("Received command %s before connection was fully established, ignoring", self.command) self.invalid = True if not self.invalid: @@ -178,7 +180,10 @@ class BMConnection(TLSDispatcher, BMQueues): except struct.error: print "decoding error, skipping" else: - print "Skipping command %s due to invalid data" % (self.command) + #print "Skipping command %s due to invalid data" % (self.command) + print "Closing due to invalid data" % (self.command) + self.close() + return False if retval: self.set_state("bm_header", self.payloadLength) self.bm_proto_reset() @@ -298,12 +303,7 @@ class BMConnection(TLSDispatcher, BMQueues): if False: self.antiIntersectionDelay() else: - try: - self.append_write_buf(protocol.CreatePacket('object', Inventory()[i].payload)) - # this is faster than "if i in Inventory()" - except KeyError: - self.antiIntersectionDelay() - logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,)) + self.receiveQueue.put(("object", i)) return True def bm_command_inv(self): @@ -327,7 +327,7 @@ class BMConnection(TLSDispatcher, BMQueues): logger.info('inv message lists %i objects. Of those %i are new to me. It took %f seconds to figure that out.', len(items), len(self.objectsNewToMe), time.time()-startTime) payload = addresses.encodeVarint(len(self.objectsNewToMe)) + ''.join(self.objectsNewToMe.keys()) - self.append_write_buf(protocol.CreatePacket('getdata', payload)) + self.writeQueue.put(protocol.CreatePacket('getdata', payload)) # for i in random.sample(self.objectsNewToMe, len(self.objectsNewToMe)): # DownloadQueue().put(i) @@ -370,6 +370,18 @@ class BMConnection(TLSDispatcher, BMQueues): def bm_command_addr(self): addresses = self.decode_payload_content("lQIQ16sH") + import pprint + for i in addresses: + seenTime, stream, services, ip, port = i + decodedIP = protocol.checkIPAddress(ip) + if stream not in state.streamsInWhichIAmParticipating: + continue + #print "maybe adding %s in stream %i to knownnodes (%i)" % (decodedIP, stream, len(knownnodes.knownNodes[stream])) + if decodedIP is not False and seenTime > time.time() - 10800: + peer = state.Peer(decodedIP, port) + if peer in knownnodes.knownNodes[stream] and knownnodes.knownNodes[stream][peer] > seenTime: + continue + knownnodes.knownNodes[stream][peer] = seenTime return True def bm_command_portcheck(self): @@ -377,7 +389,7 @@ class BMConnection(TLSDispatcher, BMQueues): return True def bm_command_ping(self): - self.append_write_buf(protocol.CreatePacket('pong')) + self.writeQueue.put(protocol.CreatePacket('pong')) return True def bm_command_pong(self): @@ -411,11 +423,11 @@ class BMConnection(TLSDispatcher, BMQueues): # TODO ABORT return True #shared.connectedHostsList[self.destination] = self.streams[0] - self.append_write_buf(protocol.CreatePacket('verack')) + self.writeQueue.put(protocol.CreatePacket('verack')) self.verackSent = True if not self.isOutbound: - self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, True)) - print "%s:%i: Sending version (%ib)" % (self.destination.host, self.destination.port, len(self.write_buf)) + self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, True)) + print "%s:%i: Sending version" % (self.destination.host, self.destination.port) if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and protocol.haveSSL(not self.isOutbound)): self.isSSL = True @@ -431,20 +443,20 @@ class BMConnection(TLSDispatcher, BMQueues): def peerValidityChecks(self): if self.remoteProtocolVersion < 3: - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, + self.writeQueue.put(protocol.assembleErrorMessage(fatal=2, errorText="Your is using an old protocol. Closing connection.")) logger.debug ('Closing connection to old protocol version %s, node: %s', str(self.remoteProtocolVersion), str(self.peer)) return False if self.timeOffset > 3600: - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, + self.writeQueue.put(protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the future compared to mine. Closing connection.")) logger.info("%s's time is too far in the future (%s seconds). Closing connection to it.", self.peer, self.timeOffset) shared.timeOffsetWrongCount += 1 return False elif self.timeOffset < -3600: - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, + self.writeQueue.put(protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the past compared to mine. Closing connection.")) logger.info("%s's time is too far in the past (timeOffset %s seconds). Closing connection to it.", self.peer, self.timeOffset) @@ -453,7 +465,7 @@ class BMConnection(TLSDispatcher, BMQueues): else: shared.timeOffsetWrongCount = 0 if len(self.streams) == 0: - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, + self.writeQueue.put(protocol.assembleErrorMessage(fatal=2, errorText="We don't have shared stream interests. Closing connection.")) logger.debug ('Closed connection to %s because there is no overlapping interest in streams.', str(self.peer)) @@ -461,7 +473,7 @@ class BMConnection(TLSDispatcher, BMQueues): if self.destination in network.connectionpool.BMConnectionPool().inboundConnections: try: if not protocol.checkSocksIP(self.destination.host): - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, + self.writeQueue.put(protocol.assembleErrorMessage(fatal=2, errorText="Too many connections from your IP. Closing connection.")) logger.debug ('Closed connection to %s because we are already connected to that IP.', str(self.peer)) @@ -474,7 +486,7 @@ class BMConnection(TLSDispatcher, BMQueues): def sendChunk(): if addressCount == 0: return - self.append_write_buf(protocol.CreatePacket('addr', \ + self.writeQueue.put(protocol.CreatePacket('addr', \ addresses.encodeVarint(addressCount) + payload)) # We are going to share a maximum number of 1000 addrs (per overlapping @@ -563,7 +575,7 @@ class BMConnection(TLSDispatcher, BMQueues): if objectCount == 0: return logger.debug('Sending huge inv message with %i objects to just this one peer', objectCount) - self.append_write_buf(protocol.CreatePacket('inv', addresses.encodeVarint(objectCount) + payload)) + self.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(objectCount) + payload)) # Select all hashes for objects in this stream. bigInvList = {} @@ -613,6 +625,7 @@ class BMConnection(TLSDispatcher, BMQueues): self.close() def close(self, reason=None): + self.set_state("close") if reason is None: print "%s:%i: closing" % (self.destination.host, self.destination.port) #traceback.print_stack() @@ -653,7 +666,10 @@ class BMServer(AdvancedDispatcher): pair = self.accept() if pair is not None: sock, addr = pair - network.connectionpool.BMConnectionPool().addConnection(BMConnection(sock=sock)) + try: + network.connectionpool.BMConnectionPool().addConnection(BMConnection(sock=sock)) + except socket.errno: + pass if __name__ == "__main__": diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 6acf4bb6..8d4f4539 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -104,29 +104,32 @@ class BMConnectionPool(object): print "bootstrapping dns" helper_bootstrap.dns() self.bootstrapped = True - for i in range(len(self.outboundConnections), BMConfigParser().safeGetInt("bitmessagesettings", "maxoutboundconnections")): - chosen = chooseConnection(random.choice(self.streams)) - if chosen in self.outboundConnections: - continue - if chosen.host in self.inboundConnections: - continue - - #for c in self.outboundConnections: - # if chosen == c.destination: - # continue - #for c in self.inboundConnections: - # if chosen.host == c.destination.host: - # continue - try: - if (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS5"): - self.addConnection(network.bmproto.Socks5BMConnection(chosen)) - elif (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS4a"): - self.addConnection(network.bmproto.Socks4aBMConnection(chosen)) - elif not chosen.host.endswith(".onion"): - self.addConnection(network.bmproto.BMConnection(chosen)) - except socket.error as e: - if e.errno == errno.ENETUNREACH: + established = sum(1 for c in self.outboundConnections.values() if (c.connected and c.fullyEstablished)) + pending = len(self.outboundConnections) - established + if established < BMConfigParser().safeGetInt("bitmessagesettings", "maxoutboundconnections"): + for i in range(state.maximumNumberOfHalfOpenConnections - pending): + chosen = chooseConnection(random.choice(self.streams)) + if chosen in self.outboundConnections: continue + if chosen.host in self.inboundConnections: + continue + + #for c in self.outboundConnections: + # if chosen == c.destination: + # continue + #for c in self.inboundConnections: + # if chosen.host == c.destination.host: + # continue + try: + if (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS5"): + self.addConnection(network.bmproto.Socks5BMConnection(chosen)) + elif (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS4a"): + self.addConnection(network.bmproto.Socks4aBMConnection(chosen)) + elif not chosen.host.endswith(".onion"): + self.addConnection(network.bmproto.BMConnection(chosen)) + except socket.error as e: + if e.errno == errno.ENETUNREACH: + continue if acceptConnections and len(self.listeningSockets) == 0: self.startListening() @@ -142,10 +145,10 @@ class BMConnectionPool(object): for i in self.inboundConnections.values() + self.outboundConnections.values(): minTx = time.time() - 20 - if i.connectionFullyEstablished: + if i.fullyEstablished: minTx -= 300 - 20 if i.lastTx < minTx: - if i.connectionFullyEstablished: - i.append_write_buf(protocol.CreatePacket('ping')) + if i.fullyEstablished: + i.writeQueue.put(protocol.CreatePacket('ping')) else: i.close("Timeout (%is)" % (time.time() - i.lastTx)) diff --git a/src/network/networkthread.py b/src/network/networkthread.py index 3a9d8e37..498bd340 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -8,7 +8,7 @@ from network.connectionpool import BMConnectionPool class BMNetworkThread(threading.Thread, StoppableThread): def __init__(self): - threading.Thread.__init__(self, name="BMNetworkThread") + threading.Thread.__init__(self, name="AsyncoreThread") self.initStop() self.name = "AsyncoreThread" BMConnectionPool() diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py new file mode 100644 index 00000000..6405238d --- /dev/null +++ b/src/network/receivequeuethread.py @@ -0,0 +1,44 @@ +import Queue +import threading + +from bmconfigparser import BMConfigParser +from debug import logger +from helper_threading import StoppableThread +from inventory import Inventory +from network.connectionpool import BMConnectionPool +import protocol + +class ReceiveQueueThread(threading.Thread, StoppableThread): + def __init__(self): + threading.Thread.__init__(self, name="ReceiveQueueThread") + self.initStop() + self.name = "ReceiveQueueThread" + BMConnectionPool() + logger.error("init asyncore thread") + + def run(self): + while not self._stopped: + processed = 0 + for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): + try: + command, args = i.receiveQueue.get(False) + except Queue.Empty: + continue + processed += 1 + try: + getattr(self, "command_" + str(command))(i, args) + except AttributeError: + # missing command + raise + if processed == 0: + self.stop.wait(0.2) + + def command_object(self, connection, objHash): + try: + connection.writeQueue.put(protocol.CreatePacket('object', Inventory()[objHash].payload)) + except KeyError: + connection.antiIntersectionDelay() + logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (connection.destination,)) + + def stopThread(self): + super(ReceiveQueueThread, self).stopThread() diff --git a/src/network/socks4a.py b/src/network/socks4a.py index 02c8d4af..4b6b64fa 100644 --- a/src/network/socks4a.py +++ b/src/network/socks4a.py @@ -59,28 +59,28 @@ class Socks4aConnection(Socks4a): def state_auth_done(self): # Now we can request the actual connection rmtrslv = False - self.append_write_buf(struct.pack('>BBH', 0x04, 0x01, self.destination[1])) + self.writeQueue.put(struct.pack('>BBH', 0x04, 0x01, self.destination[1])) # If the given destination address is an IP address, we'll # use the IPv4 address request even if remote resolving was specified. try: self.ipaddr = socket.inet_aton(self.destination[0]) - self.append_write_buf(self.ipaddr) + self.writeQueue.put(self.ipaddr) except socket.error: # Well it's not an IP number, so it's probably a DNS name. if Proxy._remote_dns: # Resolve remotely rmtrslv = True self.ipaddr = None - self.append_write_buf(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01)) + self.writeQueue.put(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01)) else: # Resolve locally self.ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0])) - self.append_write_buf(self.ipaddr) + self.writeQueue.put(self.ipaddr) if self._auth: - self.append_write_buf(self._auth[0]) - self.append_write_buf(chr(0x00).encode()) + self.writeQueue.put(self._auth[0]) + self.writeQueue.put(chr(0x00).encode()) if rmtrslv: - self.append_write_buf(self.destination[0] + chr(0x00).encode()) + self.writeQueue.put(self.destination[0] + chr(0x00).encode()) self.set_state("pre_connect", 0) @@ -92,12 +92,12 @@ class Socks4aResolver(Socks4a): def state_auth_done(self): # Now we can request the actual connection - self.append_write_buf(struct.pack('>BBH', 0x04, 0xF0, self.destination[1])) - self.append_write_buf(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01)) + self.writeQueue.put(struct.pack('>BBH', 0x04, 0xF0, self.destination[1])) + self.writeQueue.put(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01)) if self._auth: - self.append_write_buf(self._auth[0]) - self.append_write_buf(chr(0x00).encode()) - self.append_write_buf(self.host + chr(0x00).encode()) + self.writeQueue.put(self._auth[0]) + self.writeQueue.put(chr(0x00).encode()) + self.writeQueue.put(self.host + chr(0x00).encode()) self.set_state("pre_connect", 0) def resolved(self): diff --git a/src/network/socks5.py b/src/network/socks5.py index 5ba6f3e3..0d1717d4 100644 --- a/src/network/socks5.py +++ b/src/network/socks5.py @@ -17,9 +17,9 @@ class Socks5(Proxy): def state_init(self): if self._auth: - self.append_write_buf(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02)) + self.writeQueue.put(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02)) else: - self.append_write_buf(struct.pack('BBB', 0x05, 0x01, 0x00)) + self.writeQueue.put(struct.pack('BBB', 0x05, 0x01, 0x00)) self.set_state("auth_1", 0) def state_auth_1(self): @@ -35,7 +35,7 @@ class Socks5(Proxy): self.set_state("auth_done", 2) elif ret[1] == 2: # username/password - self.append_write_buf(struct.pack('BB', 1, len(self._auth[0])) + \ + self.writeQueue.put(struct.pack('BB', 1, len(self._auth[0])) + \ self._auth[0] + struct.pack('B', len(self._auth[1])) + \ self._auth[1]) self.set_state("auth_1", 2) @@ -130,23 +130,23 @@ class Socks5Connection(Socks5): def state_auth_done(self): # Now we can request the actual connection - self.append_write_buf(struct.pack('BBB', 0x05, 0x01, 0x00)) + self.writeQueue.put(struct.pack('BBB', 0x05, 0x01, 0x00)) # If the given destination address is an IP address, we'll # use the IPv4 address request even if remote resolving was specified. try: self.ipaddr = socket.inet_aton(self.destination[0]) - self.append_write_buf(chr(0x01).encode() + self.ipaddr) + self.writeQueue.put(chr(0x01).encode() + self.ipaddr) except socket.error: # Well it's not an IP number, so it's probably a DNS name. if Proxy._remote_dns: # Resolve remotely self.ipaddr = None - self.append_write_buf(chr(0x03).encode() + chr(len(self.destination[0])).encode() + self.destination[0]) + self.writeQueue.put(chr(0x03).encode() + chr(len(self.destination[0])).encode() + self.destination[0]) else: # Resolve locally self.ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0])) - self.append_write_buf(chr(0x01).encode() + self.ipaddr) - self.append_write_buf(struct.pack(">H", self.destination[1])) + self.writeQueue.put(chr(0x01).encode() + self.ipaddr) + self.writeQueue.put(struct.pack(">H", self.destination[1])) self.set_state("pre_connect", 0) @@ -158,9 +158,9 @@ class Socks5Resolver(Socks5): def state_auth_done(self): # Now we can request the actual connection - self.append_write_buf(struct.pack('BBB', 0x05, 0xF0, 0x00)) - self.append_write_buf(chr(0x03).encode() + chr(len(self.host)).encode() + str(self.host)) - self.append_write_buf(struct.pack(">H", self.port)) + self.writeQueue.put(struct.pack('BBB', 0x05, 0xF0, 0x00)) + self.writeQueue.put(chr(0x03).encode() + chr(len(self.host)).encode() + str(self.host)) + self.writeQueue.put(struct.pack(">H", self.port)) self.set_state("pre_connect", 0) def resolved(self): diff --git a/src/protocol.py b/src/protocol.py index b7847e8f..83ecb7bd 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -67,7 +67,7 @@ def isBitSetWithinBitfield(fourByteString, n): x, = unpack('>L', fourByteString) return x & 2**n != 0 -# data handling +# ip addresses def encodeHost(host): if host.find('.onion') > -1: @@ -86,6 +86,49 @@ def networkType(host): else: return 'IPv6' +def checkIPAddress(host): + if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': + hostStandardFormat = socket.inet_ntop(socket.AF_INET, host[12:]) + return checkIPv4Address(host[12:], hostStandardFormat) + elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43': + # Onion, based on BMD/bitcoind + hostStandardFormat = base64.b32encode(host[6:]).lower() + ".onion" + return hostStandardFormat + else: + hostStandardFormat = socket.inet_ntop(socket.AF_INET6, host) + if hostStandardFormat == "": + # This can happen on Windows systems which are not 64-bit compatible + # so let us drop the IPv6 address. + return False + return checkIPv6Address(host, hostStandardFormat) + +def checkIPv4Address(host, hostStandardFormat): + if host[0] == '\x7F': # 127/8 + logger.debug('Ignoring IP address in loopback range: ' + hostStandardFormat) + return False + if host[0] == '\x0A': # 10/8 + logger.debug('Ignoring IP address in private range: ' + hostStandardFormat) + return False + if host[0:2] == '\xC0\xA8': # 192.168/16 + logger.debug('Ignoring IP address in private range: ' + hostStandardFormat) + return False + if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12 + logger.debug('Ignoring IP address in private range:' + hostStandardFormat) + return False + return hostStandardFormat + +def checkIPv6Address(host, hostStandardFormat): + if host == ('\x00' * 15) + '\x01': + logger.debug('Ignoring loopback address: ' + hostStandardFormat) + return False + if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80: + logger.debug ('Ignoring local address: ' + hostStandardFormat) + return False + if (ord(host[0]) & 0xfe) == 0xfc: + logger.debug ('Ignoring unique local address: ' + hostStandardFormat) + return False + return hostStandardFormat + # checks def haveSSL(server = False): diff --git a/src/state.py b/src/state.py index c7b79e07..72852f3e 100644 --- a/src/state.py +++ b/src/state.py @@ -21,6 +21,8 @@ curses = False sqlReady = False # set to true by sqlTread when ready for processing +maximumNumberOfHalfOpenConnections = 0 + # If the trustedpeer option is specified in keys.dat then this will # contain a Peer which will be connected to instead of using the # addresses advertised by other peers. The client will only connect to