diff --git a/src/network/bmproto.py b/src/network/bmproto.py index e4d62f9d..b35f3997 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -5,26 +5,25 @@ import struct import time from binascii import hexlify +import addresses +import connectionpool +import knownnodes +import protocol +import state from bmconfigparser import BMConfigParser from debug import logger from inventory import Inventory -import knownnodes from network.advanceddispatcher import AdvancedDispatcher from network.dandelion import Dandelion -from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, \ - BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError -import network.connectionpool +from network.bmobject import ( + BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, + BMObjectExpiredError, BMObjectUnwantedStreamError, + BMObjectInvalidError, BMObjectAlreadyHaveError) from network.node import Node -from network.objectracker import ObjectTracker from network.proxy import ProxyError -from objectracker import missingObjects -from randomtrackingdict import RandomTrackingDict - - -import addresses +from objectracker import missingObjects, ObjectTracker from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue -import state -import protocol +from randomtrackingdict import RandomTrackingDict class BMProtoError(ProxyError): @@ -73,62 +72,70 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.object = None def state_bm_header(self): - self.magic, self.command, self.payloadLength, self.checksum = protocol.Header.unpack(self.read_buf[:protocol.Header.size]) + self.magic, self.command, self.payloadLength, self.checksum = \ + protocol.Header.unpack(self.read_buf[:protocol.Header.size]) self.command = self.command.rstrip('\x00') if self.magic != 0xE9BEB4D9: # skip 1 byte in order to sync self.set_state("bm_header", length=1) self.bm_proto_reset() - logger.debug("Bad magic") + logger.debug('Bad magic') if self.socket.type == socket.SOCK_STREAM: self.close_reason = "Bad magic" self.set_state("close") return False if self.payloadLength > BMProto.maxMessageSize: self.invalid = True - self.set_state("bm_command", length=protocol.Header.size, expectBytes=self.payloadLength) + self.set_state( + "bm_command", + length=protocol.Header.size, expectBytes=self.payloadLength) return True - + def state_bm_command(self): self.payload = self.read_buf[:self.payloadLength] if self.checksum != hashlib.sha512(self.payload).digest()[0:4]: - logger.debug("Bad checksum, ignoring") + logger.debug('Bad checksum, ignoring') self.invalid = True retval = True - if not self.fullyEstablished and self.command not in ("error", "version", "verack"): - logger.error("Received command %s before connection was fully established, ignoring", self.command) + if not self.fullyEstablished and self.command not in ( + "error", "version", "verack"): + logger.error( + 'Received command %s before connection was fully' + ' established, ignoring', self.command) self.invalid = True if not self.invalid: try: - retval = getattr(self, "bm_command_" + str(self.command).lower())() + retval = getattr( + self, "bm_command_" + str(self.command).lower())() except AttributeError: # unimplemented command - logger.debug("unimplemented command %s", self.command) + logger.debug('unimplemented command %s', self.command) except BMProtoInsufficientDataError: - logger.debug("packet length too short, skipping") + logger.debug('packet length too short, skipping') except BMProtoExcessiveDataError: - logger.debug("too much data, skipping") + logger.debug('too much data, skipping') except BMObjectInsufficientPOWError: - logger.debug("insufficient PoW, skipping") + logger.debug('insufficient PoW, skipping') except BMObjectInvalidDataError: - logger.debug("object invalid data, skipping") + logger.debug('object invalid data, skipping') except BMObjectExpiredError: - logger.debug("object expired, skipping") + logger.debug('object expired, skipping') except BMObjectUnwantedStreamError: - logger.debug("object not in wanted stream, skipping") + logger.debug('object not in wanted stream, skipping') except BMObjectInvalidError: - logger.debug("object invalid, skipping") + logger.debug('object invalid, skipping') except BMObjectAlreadyHaveError: - logger.debug("%s:%i already got object, skipping", self.destination.host, self.destination.port) + logger.debug( + '%(host)s:%(port)i already got object, skipping', + self.destination._asdict()) except struct.error: - logger.debug("decoding error, skipping") + logger.debug('decoding error, skipping') elif self.socket.type == socket.SOCK_DGRAM: # broken read, ignore pass else: - #print "Skipping command %s due to invalid data" % (self.command) - logger.debug("Closing due to invalid command %s", self.command) - self.close_reason = "Invalid command %s" % (self.command) + logger.debug('Closing due to invalid command %s', self.command) + self.close_reason = "Invalid command %s" % self.command self.set_state("close") return False if retval: @@ -138,7 +145,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return True def decode_payload_string(self, length): - value = self.payload[self.payloadOffset:self.payloadOffset+length] + value = self.payload[self.payloadOffset:self.payloadOffset + length] self.payloadOffset += length return value @@ -148,6 +155,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return value def decode_payload_node(self): + # protocol.checkIPAddress() services, host, port = self.decode_payload_content("Q16sH") if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': host = socket.inet_ntop(socket.AF_INET, str(host[12:16])) @@ -157,13 +165,13 @@ class BMProto(AdvancedDispatcher, ObjectTracker): else: host = socket.inet_ntop(socket.AF_INET6, str(host)) if host == "": - # This can happen on Windows systems which are not 64-bit compatible - # so let us drop the IPv6 address. + # This can happen on Windows systems which are not 64-bit + # compatible so let us drop the IPv6 address. host = socket.inet_ntop(socket.AF_INET, str(host[12:16])) return Node(services, host, port) - def decode_payload_content(self, pattern = "v"): + def decode_payload_content(self, pattern="v"): # L = varint indicating the length of the next array # l = varint indicating the length of the next item # v = varint (or array) @@ -182,13 +190,16 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return self.decode_payload_node() if char == "H": self.payloadOffset += 2 - return struct.unpack(">H", self.payload[self.payloadOffset-2:self.payloadOffset])[0] + return struct.unpack(">H", self.payload[ + self.payloadOffset - 2:self.payloadOffset])[0] if char == "I": self.payloadOffset += 4 - return struct.unpack(">I", self.payload[self.payloadOffset-4:self.payloadOffset])[0] + return struct.unpack(">I", self.payload[ + self.payloadOffset - 4:self.payloadOffset])[0] if char == "Q": self.payloadOffset += 8 - return struct.unpack(">Q", self.payload[self.payloadOffset-8:self.payloadOffset])[0] + return struct.unpack(">Q", self.payload[ + self.payloadOffset - 8:self.payloadOffset])[0] size = None isArray = False @@ -201,16 +212,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # retval (array) parserStack = [[1, 1, False, pattern, 0, []]] - #try: - # sys._getframe(200) - # logger.error("Stack depth warning, pattern: %s", pattern) - # return - #except ValueError: - # pass - while True: i = parserStack[-1][3][parserStack[-1][4]] - if i in "0123456789" and (size is None or parserStack[-1][3][parserStack[-1][4]-1] not in "lL"): + if i in "0123456789" and ( + size is None or parserStack[-1][3][parserStack[-1][4] - 1] + not in "lL"): try: size = size * 10 + int(i) except TypeError: @@ -218,34 +224,40 @@ class BMProto(AdvancedDispatcher, ObjectTracker): isArray = False elif i in "Ll" and size is None: size = self.decode_payload_varint() - if i == "L": - isArray = True - else: - isArray = False + isArray = i == "L" elif size is not None: if isArray: - parserStack.append([size, size, isArray, parserStack[-1][3][parserStack[-1][4]:], 0, []]) + parserStack.append([ + size, size, isArray, + parserStack[-1][3][parserStack[-1][4]:], 0, [] + ]) parserStack[-2][4] = len(parserStack[-2][3]) else: for j in range(parserStack[-1][4], len(parserStack[-1][3])): if parserStack[-1][3][j] not in "lL0123456789": break - parserStack.append([size, size, isArray, parserStack[-1][3][parserStack[-1][4]:j+1], 0, []]) + parserStack.append([ + size, size, isArray, + parserStack[-1][3][parserStack[-1][4]:j + 1], 0, [] + ]) parserStack[-2][4] += len(parserStack[-1][3]) - 1 size = None continue elif i == "s": - #if parserStack[-2][2]: - # parserStack[-1][5].append(self.payload[self.payloadOffset:self.payloadOffset + parserStack[-1][0]]) - #else: - parserStack[-1][5] = self.payload[self.payloadOffset:self.payloadOffset + parserStack[-1][0]] + # if parserStack[-2][2]: + # parserStack[-1][5].append(self.payload[ + # self.payloadOffset:self.payloadOffset + parserStack[-1][0]]) + # else: + parserStack[-1][5] = self.payload[ + self.payloadOffset:self.payloadOffset + parserStack[-1][0]] self.payloadOffset += parserStack[-1][0] parserStack[-1][1] = 0 parserStack[-1][2] = True - #del parserStack[-1] + # del parserStack[-1] size = None elif i in "viHIQ": - parserStack[-1][5].append(decode_simple(self, parserStack[-1][3][parserStack[-1][4]])) + parserStack[-1][5].append(decode_simple( + self, parserStack[-1][3][parserStack[-1][4]])) size = None else: size = None @@ -256,25 +268,33 @@ class BMProto(AdvancedDispatcher, ObjectTracker): parserStack[depth][4] = 0 if depth > 0: if parserStack[depth][2]: - parserStack[depth - 1][5].append(parserStack[depth][5]) + parserStack[depth - 1][5].append( + parserStack[depth][5]) else: - parserStack[depth - 1][5].extend(parserStack[depth][5]) + parserStack[depth - 1][5].extend( + parserStack[depth][5]) parserStack[depth][5] = [] if parserStack[depth][1] <= 0: if depth == 0: - # we're done, at depth 0 counter is at 0 and pattern is done parsing + # we're done, at depth 0 counter is at 0 + # and pattern is done parsing return parserStack[depth][5] del parserStack[-1] continue break break if self.payloadOffset > self.payloadLength: - logger.debug("Insufficient data %i/%i", self.payloadOffset, self.payloadLength) + logger.debug( + 'Insufficient data %i/%i', + self.payloadOffset, self.payloadLength) raise BMProtoInsufficientDataError() def bm_command_error(self): - fatalStatus, banTime, inventoryVector, errorText = self.decode_payload_content("vvlsls") - logger.error("%s:%i error: %i, %s", self.destination.host, self.destination.port, fatalStatus, errorText) + fatalStatus, banTime, inventoryVector, errorText = \ + self.decode_payload_content("vvlsls") + logger.error( + '%s:%i error: %i, %s', self.destination.host, + self.destination.port, fatalStatus, errorText) return True def bm_command_getdata(self): @@ -291,10 +311,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker): items = self.decode_payload_content("l32s") if len(items) > BMProto.maxObjectCount: - logger.error("Too many items in %sinv message!", "d" if dandelion else "") + logger.error( + 'Too many items in %sinv message!', 'd' if dandelion else '') raise BMProtoExcessiveDataError() - else: - pass # ignore dinv if dandelion turned off if dandelion and not state.dandelion: @@ -320,31 +339,41 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def bm_command_object(self): objectOffset = self.payloadOffset - nonce, expiresTime, objectType, version, streamNumber = self.decode_payload_content("QQIvv") - self.object = BMObject(nonce, expiresTime, objectType, version, streamNumber, self.payload, self.payloadOffset) + nonce, expiresTime, objectType, version, streamNumber = \ + self.decode_payload_content("QQIvv") + self.object = BMObject( + nonce, expiresTime, objectType, version, streamNumber, + self.payload, self.payloadOffset) if len(self.payload) - self.payloadOffset > BMProto.maxObjectPayloadSize: - logger.info('The payload length of this object is too large (%d bytes). Ignoring it.' % (len(self.payload) - self.payloadOffset)) + logger.info( + 'The payload length of this object is too large (%d bytes).' + ' Ignoring it.', len(self.payload) - self.payloadOffset) raise BMProtoExcessiveDataError() try: self.object.checkProofOfWorkSufficient() self.object.checkEOLSanity() self.object.checkAlreadyHave() - except (BMObjectExpiredError, BMObjectAlreadyHaveError, BMObjectInsufficientPOWError) as e: + except (BMObjectExpiredError, BMObjectAlreadyHaveError, + BMObjectInsufficientPOWError): BMProto.stopDownloadingObject(self.object.inventoryHash) - raise e + raise try: self.object.checkStream() - except (BMObjectUnwantedStreamError,) as e: - BMProto.stopDownloadingObject(self.object.inventoryHash, BMConfigParser().get("inventory", "acceptmismatch")) - if not BMConfigParser().get("inventory", "acceptmismatch"): - raise e + except BMObjectUnwantedStreamError: + acceptmismatch = BMConfigParser().get( + "inventory", "acceptmismatch") + BMProto.stopDownloadingObject( + self.object.inventoryHash, acceptmismatch) + if not acceptmismatch: + raise try: self.object.checkObjectByType() - objectProcessorQueue.put((self.object.objectType, buffer(self.object.data))) - except BMObjectInvalidError as e: + objectProcessorQueue.put(( + self.object.objectType, buffer(self.object.data))) + except BMObjectInvalidError: BMProto.stopDownloadingObject(self.object.inventoryHash, True) else: try: @@ -356,9 +385,15 @@ class BMProto(AdvancedDispatcher, ObjectTracker): Dandelion().removeHash(self.object.inventoryHash, "cycle detection") Inventory()[self.object.inventoryHash] = ( - self.object.objectType, self.object.streamNumber, buffer(self.payload[objectOffset:]), self.object.expiresTime, buffer(self.object.tag)) - self.handleReceivedObject(self.object.streamNumber, self.object.inventoryHash) - invQueue.put((self.object.streamNumber, self.object.inventoryHash, self.destination)) + self.object.objectType, self.object.streamNumber, + buffer(self.payload[objectOffset:]), self.object.expiresTime, + buffer(self.object.tag) + ) + self.handleReceivedObject( + self.object.streamNumber, self.object.inventoryHash) + invQueue.put(( + self.object.streamNumber, self.object.inventoryHash, + self.destination)) return True def _decode_addr(self): @@ -409,104 +444,125 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def bm_command_verack(self): self.verackReceived = True - if self.verackSent: - if self.isSSL: - self.set_state("tls_init", length=self.payloadLength, expectBytes=0) - return False - self.set_state("connection_fully_established", length=self.payloadLength, expectBytes=0) - return False - return True + if not self.verackSent: + return True + self.set_state( + "tls_init" if self.isSSL else "connection_fully_established", + length=self.payloadLength, expectBytes=0) + return False def bm_command_version(self): - self.remoteProtocolVersion, self.services, self.timestamp, self.sockNode, self.peerNode, self.nonce, \ - self.userAgent, self.streams = self.decode_payload_content("IQQiiQlsLv") + (self.remoteProtocolVersion, self.services, self.timestamp, + self.sockNode, self.peerNode, self.nonce, self.userAgent, + self.streams) = self.decode_payload_content("IQQiiQlsLv") self.nonce = struct.pack('>Q', self.nonce) self.timeOffset = self.timestamp - int(time.time()) - logger.debug("remoteProtocolVersion: %i", self.remoteProtocolVersion) - logger.debug("services: 0x%08X", self.services) - logger.debug("time offset: %i", self.timestamp - int(time.time())) - logger.debug("my external IP: %s", self.sockNode.host) - logger.debug("remote node incoming address: %s:%i", self.destination.host, self.peerNode.port) - logger.debug("user agent: %s", self.userAgent) - logger.debug("streams: [%s]", ",".join(map(str,self.streams))) + logger.debug('remoteProtocolVersion: %i', self.remoteProtocolVersion) + logger.debug('services: 0x%08X', self.services) + logger.debug('time offset: %i', self.timestamp - int(time.time())) + logger.debug('my external IP: %s', self.sockNode.host) + logger.debug( + 'remote node incoming address: %s:%i', + self.destination.host, self.peerNode.port) + logger.debug('user agent: %s', self.userAgent) + logger.debug('streams: [%s]', ','.join(map(str, self.streams))) if not self.peerValidityChecks(): # TODO ABORT return True self.append_write_buf(protocol.CreatePacket('verack')) self.verackSent = True if not self.isOutbound: - self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, \ - network.connectionpool.BMConnectionPool().streams, True, nodeid=self.nodeid)) - #print "%s:%i: Sending version" % (self.destination.host, self.destination.port) + self.append_write_buf(protocol.assembleVersionMessage( + self.destination.host, self.destination.port, + connectionpool.BMConnectionPool().streams, True, + nodeid=self.nodeid)) + logger.debug( + '%(host)s:%(port)i sending version', + self.destination._asdict()) if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and protocol.haveSSL(not self.isOutbound)): self.isSSL = True - if self.verackReceived: - if self.isSSL: - self.set_state("tls_init", length=self.payloadLength, expectBytes=0) - return False - self.set_state("connection_fully_established", length=self.payloadLength, expectBytes=0) - return False - return True + if not self.verackReceived: + return True + self.set_state( + "tls_init" if self.isSSL else "connection_fully_established", + length=self.payloadLength, expectBytes=0) + return False def peerValidityChecks(self): if self.remoteProtocolVersion < 3: - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, - errorText="Your is using an old protocol. Closing connection.")) - logger.debug ('Closing connection to old protocol version %s, node: %s', - str(self.remoteProtocolVersion), str(self.destination)) + self.append_write_buf(protocol.assembleErrorMessage( + errorText="Your is using an old protocol. Closing connection.", + fatal=2)) + logger.debug( + 'Closing connection to old protocol version %s, node: %s', + self.remoteProtocolVersion, self.destination) return False if self.timeOffset > BMProto.maxTimeOffset: - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, - errorText="Your time is too far in the future compared to mine. Closing connection.")) - logger.info("%s's time is too far in the future (%s seconds). Closing connection to it.", - self.destination, self.timeOffset) + self.append_write_buf(protocol.assembleErrorMessage( + errorText="Your time is too far in the future compared to mine." + " Closing connection.", fatal=2)) + logger.info( + "%s's time is too far in the future (%s seconds)." + " Closing connection to it.", self.destination, self.timeOffset) BMProto.timeOffsetWrongCount += 1 return False elif self.timeOffset < -BMProto.maxTimeOffset: - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, - errorText="Your time is too far in the past compared to mine. Closing connection.")) - logger.info("%s's time is too far in the past (timeOffset %s seconds). Closing connection to it.", - self.destination, self.timeOffset) + self.append_write_buf(protocol.assembleErrorMessage( + errorText="Your time is too far in the past compared to mine." + " Closing connection.", fatal=2)) + logger.info( + "%s's time is too far in the past (timeOffset %s seconds)." + " Closing connection to it.", self.destination, self.timeOffset) BMProto.timeOffsetWrongCount += 1 return False else: BMProto.timeOffsetWrongCount = 0 if not self.streams: - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, - errorText="We don't have shared stream interests. Closing connection.")) - logger.debug ('Closed connection to %s because there is no overlapping interest in streams.', - str(self.destination)) + self.append_write_buf(protocol.assembleErrorMessage( + errorText="We don't have shared stream interests." + " Closing connection.", fatal=2)) + logger.debug( + 'Closed connection to %s because there is no overlapping interest' + ' in streams.', self.destination) return False - if self.destination in network.connectionpool.BMConnectionPool().inboundConnections: + if self.destination in connectionpool.BMConnectionPool().inboundConnections: try: if not protocol.checkSocksIP(self.destination.host): - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, - errorText="Too many connections from your IP. Closing connection.")) - logger.debug ('Closed connection to %s because we are already connected to that IP.', - str(self.destination)) + self.append_write_buf(protocol.assembleErrorMessage( + errorText="Too many connections from your IP." + " Closing connection.", fatal=2)) + logger.debug( + 'Closed connection to %s because we are already connected' + ' to that IP.', self.destination) return False except: pass if not self.isOutbound: - # incoming from a peer we're connected to as outbound, or server full - # report the same error to counter deanonymisation - if state.Peer(self.destination.host, self.peerNode.port) in \ - network.connectionpool.BMConnectionPool().inboundConnections or \ - len(network.connectionpool.BMConnectionPool().inboundConnections) + \ - len(network.connectionpool.BMConnectionPool().outboundConnections) > \ - BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections") + \ - BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections"): - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, - errorText="Server full, please try again later.")) - logger.debug ("Closed connection to %s due to server full or duplicate inbound/outbound.", - str(self.destination)) + # incoming from a peer we're connected to as outbound, + # or server full report the same error to counter deanonymisation + if ( + state.Peer(self.destination.host, self.peerNode.port) in + connectionpool.BMConnectionPool().inboundConnections or + len(connectionpool.BMConnectionPool().inboundConnections) + + len(connectionpool.BMConnectionPool().outboundConnections) > + BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections") + + BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections") + ): + self.append_write_buf(protocol.assembleErrorMessage( + errorText="Server full, please try again later.", fatal=2)) + logger.debug( + 'Closed connection to %s due to server full' + ' or duplicate inbound/outbound.', self.destination) return False - if network.connectionpool.BMConnectionPool().isAlreadyConnected(self.nonce): - self.append_write_buf(protocol.assembleErrorMessage(fatal=2, - errorText="I'm connected to myself. Closing connection.")) - logger.debug ("Closed connection to %s because I'm connected to myself.", - str(self.destination)) + if connectionpool.BMConnectionPool().isAlreadyConnected( + self.nonce): + self.append_write_buf(protocol.assembleErrorMessage( + errorText="I'm connected to myself. Closing connection.", + fatal=2)) + logger.debug( + "Closed connection to %s because I'm connected to myself.", + self.destination) return False return True @@ -519,7 +575,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return b'' retval = b'' for i in range(0, len(peerList), BMProto.maxAddrCount): - payload = addresses.encodeVarint(len(peerList[i:i + BMProto.maxAddrCount])) + payload = addresses.encodeVarint( + len(peerList[i:i + BMProto.maxAddrCount])) for address in peerList[i:i + BMProto.maxAddrCount]: stream, peer, timestamp = address payload += struct.pack( @@ -534,8 +591,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker): @staticmethod def stopDownloadingObject(hashId, forwardAnyway=False): - for connection in network.connectionpool.BMConnectionPool().inboundConnections.values() + \ - network.connectionpool.BMConnectionPool().outboundConnections.values(): + for connection in ( + connectionpool.BMConnectionPool().inboundConnections.values() + + connectionpool.BMConnectionPool().outboundConnections.values() + ): try: del connection.objectsNewToMe[hashId] except KeyError: @@ -557,12 +616,15 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # already disconnected return try: - logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, self.close_reason) + logger.debug( + '%s:%i: closing, %s', self.destination.host, + self.destination.port, self.close_reason) except AttributeError: try: - logger.debug("%s:%i: closing", self.destination.host, self.destination.port) + logger.debug( + '%(host)s:%(port)i: closing', self.destination._asdict()) except AttributeError: - logger.debug("Disconnected socket closing") + logger.debug('Disconnected socket closing') AdvancedDispatcher.handle_close(self) diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index fb662fe0..c769f287 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -1,62 +1,70 @@ -from ConfigParser import NoOptionError, NoSectionError import errno +import re import socket import time -import random -import re +from ConfigParser import NoOptionError, NoSectionError -from bmconfigparser import BMConfigParser -from debug import logger +import asyncore_pollchoose as asyncore import helper_bootstrap -import knownnodes -from network.proxy import Proxy -from network.tcp import TCPServer, Socks5BMConnection, Socks4aBMConnection, TCPConnection -from network.udp import UDPSocket -from network.connectionchooser import chooseConnection -import network.asyncore_pollchoose as asyncore -import protocol -from singleton import Singleton -import state import helper_random +import knownnodes +import protocol +import state +from bmconfigparser import BMConfigParser +from connectionchooser import chooseConnection +from debug import logger +from proxy import Proxy +from singleton import Singleton +from tcp import ( + TCPServer, Socks5BMConnection, Socks4aBMConnection, TCPConnection) +from udp import UDPSocket @Singleton class BMConnectionPool(object): def __init__(self): asyncore.set_rates( - BMConfigParser().safeGetInt("bitmessagesettings", "maxdownloadrate"), - BMConfigParser().safeGetInt("bitmessagesettings", "maxuploadrate")) + BMConfigParser().safeGetInt( + "bitmessagesettings", "maxdownloadrate"), + BMConfigParser().safeGetInt( + "bitmessagesettings", "maxuploadrate") + ) self.outboundConnections = {} self.inboundConnections = {} self.listeningSockets = {} self.udpSockets = {} self.streams = [] self.lastSpawned = 0 - self.spawnWait = 2 + self.spawnWait = 2 self.bootstrapped = False def connectToStream(self, streamNumber): self.streams.append(streamNumber) def getConnectionByAddr(self, addr): - if addr in self.inboundConnections: + try: return self.inboundConnections[addr] - try: - if addr.host in self.inboundConnections: - return self.inboundConnections[addr.host] - except AttributeError: + except KeyError: pass - if addr in self.outboundConnections: - return self.outboundConnections[addr] try: - if addr.host in self.udpSockets: - return self.udpSockets[addr.host] - except AttributeError: + return self.inboundConnections[addr.host] + except (KeyError, AttributeError): + pass + try: + return self.outboundConnections[addr] + except KeyError: + pass + try: + return self.udpSockets[addr.host] + except (KeyError, AttributeError): pass raise KeyError def isAlreadyConnected(self, nodeid): - for i in self.inboundConnections.values() + self.outboundConnections.values(): + for i in ( + self.inboundConnections.values() + + self.outboundConnections.values() + ): try: if nodeid == i.nodeid: return True @@ -73,13 +81,15 @@ class BMConnectionPool(object): if connection.destination.host in self.inboundConnections: self.inboundConnections[connection.destination] = connection else: - self.inboundConnections[connection.destination.host] = connection + self.inboundConnections[connection.destination.host] = \ + connection def removeConnection(self, connection): if isinstance(connection, UDPSocket): del self.udpSockets[connection.listening.host] elif isinstance(connection, TCPServer): - del self.listeningSockets[state.Peer(connection.destination.host, connection.destination.port)] + del self.listeningSockets[state.Peer( + connection.destination.host, connection.destination.port)] elif connection.isOutbound: try: del self.outboundConnections[connection.destination] @@ -96,14 +106,18 @@ class BMConnectionPool(object): connection.handle_close() def getListeningIP(self): - if BMConfigParser().safeGet("bitmessagesettings", "onionhostname").endswith(".onion"): - host = BMConfigParser().safeGet("bitmessagesettings", "onionbindip") + if BMConfigParser().safeGet( + "bitmessagesettings", "onionhostname").endswith(".onion"): + host = BMConfigParser().safeGet( + "bitmessagesettings", "onionbindip") else: host = '127.0.0.1' - if BMConfigParser().safeGetBoolean("bitmessagesettings", "sockslisten") or \ - BMConfigParser().get("bitmessagesettings", "socksproxytype") == "none": + if (BMConfigParser().safeGetBoolean( + "bitmessagesettings", "sockslisten") or + BMConfigParser().get( + "bitmessagesettings", "socksproxytype") == "none"): # python doesn't like bind + INADDR_ANY? - #host = socket.INADDR_ANY + # host = socket.INADDR_ANY host = BMConfigParser().get("network", "bind") return host @@ -130,13 +144,18 @@ class BMConnectionPool(object): # defaults to empty loop if outbound connections are maxed spawnConnections = False acceptConnections = True - if BMConfigParser().safeGetBoolean('bitmessagesettings', 'dontconnect'): + if BMConfigParser().safeGetBoolean( + 'bitmessagesettings', 'dontconnect'): acceptConnections = False - elif BMConfigParser().safeGetBoolean('bitmessagesettings', 'sendoutgoingconnections'): + elif BMConfigParser().safeGetBoolean( + 'bitmessagesettings', 'sendoutgoingconnections'): spawnConnections = True - if BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and \ - (not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and \ - ".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname')): + if (BMConfigParser().get( + 'bitmessagesettings', 'socksproxytype')[:5] == 'SOCKS' and + not BMConfigParser().getboolean( + 'bitmessagesettings', 'sockslisten') and + ".onion" not in BMConfigParser().get( + 'bitmessagesettings', 'onionhostname')): acceptConnections = False if spawnConnections: @@ -144,23 +163,38 @@ class BMConnectionPool(object): helper_bootstrap.dns() if not self.bootstrapped: self.bootstrapped = True - Proxy.proxy = (BMConfigParser().safeGet("bitmessagesettings", "sockshostname"), - BMConfigParser().safeGetInt("bitmessagesettings", "socksport")) + Proxy.proxy = ( + BMConfigParser().safeGet( + "bitmessagesettings", "sockshostname"), + BMConfigParser().safeGetInt( + "bitmessagesettings", "socksport") + ) # TODO AUTH # TODO reset based on GUI settings changes try: - if not BMConfigParser().get("network", "onionsocksproxytype").startswith("SOCKS"): + if not BMConfigParser().get( + "network", "onionsocksproxytype" + ).startswith("SOCKS"): raise NoOptionError - Proxy.onionproxy = (BMConfigParser().get("network", "onionsockshostname"), - BMConfigParser().getint("network", "onionsocksport")) + Proxy.onionproxy = ( + BMConfigParser().get( + "network", "onionsockshostname"), + BMConfigParser().getint( + "network", "onionsocksport") + ) except (NoOptionError, NoSectionError): Proxy.onionproxy = None - established = sum(1 for c in self.outboundConnections.values() if (c.connected and c.fullyEstablished)) + established = sum( + 1 for c in self.outboundConnections.values() + if (c.connected and c.fullyEstablished)) pending = len(self.outboundConnections) - established - if established < BMConfigParser().safeGetInt("bitmessagesettings", "maxoutboundconnections"): - for i in range(state.maximumNumberOfHalfOpenConnections - pending): + if established < BMConfigParser().safeGetInt( + "bitmessagesettings", "maxoutboundconnections"): + for i in range( + state.maximumNumberOfHalfOpenConnections - pending): try: - chosen = chooseConnection(helper_random.randomchoice(self.streams)) + chosen = chooseConnection( + helper_random.randomchoice(self.streams)) except ValueError: continue if chosen in self.outboundConnections: @@ -170,22 +204,25 @@ class BMConnectionPool(object): # don't connect to self if chosen in state.ownAddresses: continue - - #for c in self.outboundConnections: - # if chosen == c.destination: - # continue - #for c in self.inboundConnections: - # if chosen.host == c.destination.host: - # continue + try: - if chosen.host.endswith(".onion") and Proxy.onionproxy is not None: - if BMConfigParser().get("network", "onionsocksproxytype") == "SOCKS5": + if (chosen.host.endswith(".onion") and + Proxy.onionproxy is not None): + if BMConfigParser().get( + "network", "onionsocksproxytype" + ) == "SOCKS5": self.addConnection(Socks5BMConnection(chosen)) - elif BMConfigParser().get("network", "onionsocksproxytype") == "SOCKS4a": + elif BMConfigParser().get( + "network", "onionsocksproxytype" + ) == "SOCKS4a": self.addConnection(Socks4aBMConnection(chosen)) - elif BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS5": + elif BMConfigParser().safeGet( + "bitmessagesettings", "socksproxytype" + ) == "SOCKS5": self.addConnection(Socks5BMConnection(chosen)) - elif BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS4a": + elif BMConfigParser().safeGet( + "bitmessagesettings", "socksproxytype" + ) == "SOCKS4a": self.addConnection(Socks4aBMConnection(chosen)) else: self.addConnection(TCPConnection(chosen)) @@ -199,8 +236,8 @@ class BMConnectionPool(object): self.lastSpawned = time.time() else: for i in ( - self.inboundConnections.values() + - self.outboundConnections.values() + self.inboundConnections.values() + + self.outboundConnections.values() ): # FIXME: rating will be increased after next connection i.handle_close() @@ -210,14 +247,20 @@ class BMConnectionPool(object): if BMConfigParser().safeGet("network", "bind") == '': self.startListening() else: - for bind in re.sub("[^\w.]+", " ", BMConfigParser().safeGet("network", "bind")).split(): + for bind in re.sub( + "[^\w.]+", " ", + BMConfigParser().safeGet("network", "bind") + ).split(): self.startListening(bind) logger.info('Listening for incoming connections.') if not self.udpSockets: if BMConfigParser().safeGet("network", "bind") == '': self.startUDPSocket() else: - for bind in re.sub("[^\w.]+", " ", BMConfigParser().safeGet("network", "bind")).split(): + for bind in re.sub( + "[^\w.]+", " ", + BMConfigParser().safeGet("network", "bind") + ).split(): self.startUDPSocket(bind) self.startUDPSocket(False) logger.info('Starting UDP socket(s).') @@ -239,7 +282,10 @@ class BMConnectionPool(object): asyncore.loop(timeout=loopTime, count=1000) reaper = [] - for i in self.inboundConnections.values() + self.outboundConnections.values(): + for i in ( + self.inboundConnections.values() + + self.outboundConnections.values() + ): minTx = time.time() - 20 if i.fullyEstablished: minTx -= 300 - 20 @@ -247,9 +293,15 @@ class BMConnectionPool(object): if i.fullyEstablished: i.append_write_buf(protocol.CreatePacket('ping')) else: - i.close_reason = "Timeout (%is)" % (time.time() - i.lastTx) + i.close_reason = "Timeout (%is)" % ( + time.time() - i.lastTx) i.set_state("close") - for i in self.inboundConnections.values() + self.outboundConnections.values() + self.listeningSockets.values() + self.udpSockets.values(): + for i in ( + self.inboundConnections.values() + + self.outboundConnections.values() + + self.listeningSockets.values() + + self.udpSockets.values() + ): if not (i.accepting or i.connecting or i.connected): reaper.append(i) else: diff --git a/src/network/dandelion.py b/src/network/dandelion.py index 60d84cdf..b92bd7f8 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -3,12 +3,11 @@ from random import choice, sample, expovariate from threading import RLock from time import time -from bmconfigparser import BMConfigParser -import network.connectionpool +import connectionpool +import state from debug import logging from queues import invQueue from singleton import Singleton -import state # randomise routes after 600 seconds REASSIGN_INTERVAL = 600 @@ -21,6 +20,7 @@ MAX_STEMS = 2 Stem = namedtuple('Stem', ['child', 'stream', 'timeout']) + @Singleton class Dandelion(): def __init__(self): @@ -39,27 +39,29 @@ class Dandelion(): start = time() if average == 0: average = FLUFF_TRIGGER_MEAN_DELAY - return start + expovariate(1.0/average) + FLUFF_TRIGGER_FIXED_DELAY + return start + expovariate(1.0 / average) + FLUFF_TRIGGER_FIXED_DELAY def addHash(self, hashId, source=None, stream=1): if not state.dandelion: return with self.lock: self.hashMap[hashId] = Stem( - self.getNodeStem(source), - stream, - self.poissonTimeout()) + self.getNodeStem(source), + stream, + self.poissonTimeout()) def setHashStream(self, hashId, stream=1): with self.lock: if hashId in self.hashMap: self.hashMap[hashId] = Stem( - self.hashMap[hashId].child, - stream, - self.poissonTimeout()) + self.hashMap[hashId].child, + stream, + self.poissonTimeout()) def removeHash(self, hashId, reason="no reason specified"): - logging.debug("%s entering fluff mode due to %s.", ''.join('%02x'%ord(i) for i in hashId), reason) + logging.debug( + "%s entering fluff mode due to %s.", + ''.join('%02x' % ord(i) for i in hashId), reason) with self.lock: try: del self.hashMap[hashId] @@ -79,21 +81,30 @@ class Dandelion(): self.stem.append(connection) for k in (k for k, v in self.nodeMap.iteritems() if v is None): self.nodeMap[k] = connection - for k, v in {k: v for k, v in self.hashMap.iteritems() if v.child is None}.iteritems(): - self.hashMap[k] = Stem(connection, v.stream, self.poissonTimeout()) + for k, v in { + k: v for k, v in self.hashMap.iteritems() + if v.child is None + }.iteritems(): + self.hashMap[k] = Stem( + connection, v.stream, self.poissonTimeout()) invQueue.put((v.stream, k, v.child)) - def maybeRemoveStem(self, connection): # is the stem active? with self.lock: if connection in self.stem: self.stem.remove(connection) # active mappings to pointing to the removed node - for k in (k for k, v in self.nodeMap.iteritems() if v == connection): + for k in ( + k for k, v in self.nodeMap.iteritems() if v == connection + ): self.nodeMap[k] = None - for k, v in {k: v for k, v in self.hashMap.iteritems() if v.child == connection}.iteritems(): - self.hashMap[k] = Stem(None, v.stream, self.poissonTimeout()) + for k, v in { + k: v for k, v in self.hashMap.iteritems() + if v.child == connection + }.iteritems(): + self.hashMap[k] = Stem( + None, v.stream, self.poissonTimeout()) def pickStem(self, parent=None): try: @@ -136,10 +147,13 @@ class Dandelion(): with self.lock: try: # random two connections - self.stem = sample(network.connectionpool.BMConnectionPool().outboundConnections.values(), MAX_STEMS) + self.stem = sample( + connectionpool.BMConnectionPool( + ).outboundConnections.values(), MAX_STEMS) # not enough stems available except ValueError: - self.stem = network.connectionpool.BMConnectionPool().outboundConnections.values() + self.stem = connectionpool.BMConnectionPool( + ).outboundConnections.values() self.nodeMap = {} # hashMap stays to cater for pending stems self.refresh = time() + REASSIGN_INTERVAL diff --git a/src/network/proxy.py b/src/network/proxy.py index 1d7ca357..e2d03446 100644 --- a/src/network/proxy.py +++ b/src/network/proxy.py @@ -1,27 +1,28 @@ import socket import time -from advanceddispatcher import AdvancedDispatcher import asyncore_pollchoose as asyncore +import state +from advanceddispatcher import AdvancedDispatcher from bmconfigparser import BMConfigParser from debug import logger -import network.connectionpool -import state + class ProxyError(Exception): - errorCodes = ("UnknownError") + errorCodes = ("Unknown error",) def __init__(self, code=-1): self.code = code try: - self.message = self.__class__.errorCodes[self.code] + self.message = self.errorCodes[code] except IndexError: - self.message = self.__class__.errorCodes[-1] + self.message = self.errorCodes[-1] super(ProxyError, self).__init__(self.message) class GeneralProxyError(ProxyError): - errorCodes = ("Success", + errorCodes = ( + "Success", "Invalid data", "Not connected", "Not available", @@ -30,12 +31,13 @@ class GeneralProxyError(ProxyError): "Timed out", "Network unreachable", "Connection refused", - "Host unreachable") + "Host unreachable" + ) class Proxy(AdvancedDispatcher): - # these are global, and if you change config during runtime, all active/new - # instances should change too + # these are global, and if you change config during runtime, + # all active/new instances should change too _proxy = ("127.0.0.1", 9050) _auth = None _onion_proxy = None @@ -48,8 +50,9 @@ class Proxy(AdvancedDispatcher): @proxy.setter def proxy(self, address): - if not isinstance(address, tuple) or (len(address) < 2) or \ - (not isinstance(address[0], str) or not isinstance(address[1], int)): + if (not isinstance(address, tuple) or len(address) < 2 or + not isinstance(address[0], str) or + not isinstance(address[1], int)): raise ValueError self.__class__._proxy = address @@ -67,8 +70,10 @@ class Proxy(AdvancedDispatcher): @onion_proxy.setter def onion_proxy(self, address): - if address is not None and (not isinstance(address, tuple) or (len(address) < 2) or \ - (not isinstance(address[0], str) or not isinstance(address[1], int))): + if address is not None and ( + not isinstance(address, tuple) or len(address) < 2 or + not isinstance(address[0], str) or + not isinstance(address[1], int)): raise ValueError self.__class__._onion_proxy = address @@ -88,15 +93,21 @@ class Proxy(AdvancedDispatcher): self.isOutbound = True self.fullyEstablished = False self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - if BMConfigParser().safeGetBoolean("bitmessagesettings", "socksauthentication"): - self.auth = (BMConfigParser().safeGet("bitmessagesettings", "socksusername"), - BMConfigParser().safeGet("bitmessagesettings", "sockspassword")) + if BMConfigParser().safeGetBoolean( + "bitmessagesettings", "socksauthentication"): + self.auth = ( + BMConfigParser().safeGet( + "bitmessagesettings", "socksusername"), + BMConfigParser().safeGet( + "bitmessagesettings", "sockspassword") + ) else: self.auth = None - if address.host.endswith(".onion") and self.onion_proxy is not None: - self.connect(self.onion_proxy) - else: - self.connect(self.proxy) + self.connect( + self.onion_proxy + if address.host.endswith(".onion") and self.onion_proxy else + self.proxy + ) def handle_connect(self): self.set_state("init") @@ -104,7 +115,9 @@ class Proxy(AdvancedDispatcher): AdvancedDispatcher.handle_connect(self) except socket.error as e: if e.errno in asyncore._DISCONNECTED: - logger.debug("%s:%i: Connection failed: %s", self.destination.host, self.destination.port, str(e)) + logger.debug( + "%s:%i: Connection failed: %s", + self.destination.host, self.destination.port, e) return self.state_init() diff --git a/src/network/socks4a.py b/src/network/socks4a.py index 978ede04..c50c6db5 100644 --- a/src/network/socks4a.py +++ b/src/network/socks4a.py @@ -3,12 +3,17 @@ import struct from proxy import Proxy, ProxyError, GeneralProxyError + class Socks4aError(ProxyError): - errorCodes = ("Request granted", + errorCodes = ( + "Request granted", "Request rejected or failed", - "Request rejected because SOCKS server cannot connect to identd on the client", - "Request rejected because the client program and identd report different user-ids", - "Unknown error") + "Request rejected because SOCKS server cannot connect to identd" + " on the client", + "Request rejected because the client program and identd report" + " different user-ids", + "Unknown error" + ) class Socks4a(Proxy): @@ -40,14 +45,15 @@ class Socks4a(Proxy): self.boundaddr = self.read_buf[4:] self.__proxysockname = (self.boundaddr, self.boundport) if self.ipaddr: - self.__proxypeername = (socket.inet_ntoa(self.ipaddr), self.destination[1]) + self.__proxypeername = ( + socket.inet_ntoa(self.ipaddr), self.destination[1]) else: self.__proxypeername = (self.destination[0], self.destport) self.set_state("proxy_handshake_done", length=8) return True def proxy_sock_name(self): - return socket.inet_ntoa(self.__proxysockname[0]) + return socket.inet_ntoa(self.__proxysockname[0]) class Socks4aConnection(Socks4a): @@ -57,7 +63,8 @@ class Socks4aConnection(Socks4a): def state_auth_done(self): # Now we can request the actual connection rmtrslv = False - self.append_write_buf(struct.pack('>BBH', 0x04, 0x01, self.destination[1])) + self.append_write_buf( + struct.pack('>BBH', 0x04, 0x01, self.destination[1])) # If the given destination address is an IP address, we'll # use the IPv4 address request even if remote resolving was specified. try: @@ -69,10 +76,12 @@ class Socks4aConnection(Socks4a): # Resolve remotely rmtrslv = True self.ipaddr = None - self.append_write_buf(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01)) + self.append_write_buf( + struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01)) else: # Resolve locally - self.ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0])) + self.ipaddr = socket.inet_aton( + socket.gethostbyname(self.destination[0])) self.append_write_buf(self.ipaddr) if self._auth: self.append_write_buf(self._auth[0]) @@ -98,7 +107,8 @@ class Socks4aResolver(Socks4a): def state_auth_done(self): # Now we can request the actual connection - self.append_write_buf(struct.pack('>BBH', 0x04, 0xF0, self.destination[1])) + self.append_write_buf( + struct.pack('>BBH', 0x04, 0xF0, self.destination[1])) self.append_write_buf(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01)) if self._auth: self.append_write_buf(self._auth[0]) diff --git a/src/network/socks5.py b/src/network/socks5.py index 504eea03..2e0821da 100644 --- a/src/network/socks5.py +++ b/src/network/socks5.py @@ -12,26 +12,30 @@ from proxy import GeneralProxyError, Proxy, ProxyError class Socks5AuthError(ProxyError): - """Thrown when the socks5 protocol encounters an authentication error""" - errorCodes = ("Succeeded", - "Authentication is required", - "All offered authentication methods were rejected", - "Unknown username or invalid password", - "Unknown error") + """Rised when the socks5 protocol encounters an authentication error""" + errorCodes = ( + "Succeeded", + "Authentication is required", + "All offered authentication methods were rejected", + "Unknown username or invalid password", + "Unknown error" + ) class Socks5Error(ProxyError): - """Thrown when socks5 protocol encounters an error""" - errorCodes = ("Succeeded", - "General SOCKS server failure", - "Connection not allowed by ruleset", - "Network unreachable", - "Host unreachable", - "Connection refused", - "TTL expired", - "Command not supported", - "Address type not supported", - "Unknown error") + """Rised when socks5 protocol encounters an error""" + errorCodes = ( + "Succeeded", + "General SOCKS server failure", + "Connection not allowed by ruleset", + "Network unreachable", + "Host unreachable", + "Connection refused", + "TTL expired", + "Command not supported", + "Address type not supported", + "Unknown error" + ) class Socks5(Proxy): @@ -42,7 +46,7 @@ class Socks5(Proxy): self.destport = address[1] def state_init(self): - """Protocol initialisation (before connection is established)""" + """Protocol initialization (before connection is established)""" if self._auth: self.append_write_buf(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02)) else: @@ -61,9 +65,11 @@ class Socks5(Proxy): self.set_state("auth_done", length=2) elif ret[1] == 2: # username/password - self.append_write_buf(struct.pack('BB', 1, len(self._auth[0])) + - self._auth[0] + struct.pack('B', len(self._auth[1])) + - self._auth[1]) + self.append_write_buf( + struct.pack('BB', 1, len(self._auth[0])) + + self._auth[0] + struct.pack('B', len(self._auth[1])) + + self._auth[1] + ) self.set_state("auth_needed", length=2, expectBytes=2) else: if ret[1] == 0xff: @@ -118,17 +124,19 @@ class Socks5(Proxy): def state_proxy_addr_2_1(self): """ - Handle other addresses than IPv4 returned for peer (e.g. IPv6, onion, ...). This is part 1 which retrieves the + Handle other addresses than IPv4 returned for peer + (e.g. IPv6, onion, ...). This is part 1 which retrieves the length of the data. """ self.address_length = ord(self.read_buf[0:1]) - self.set_state("proxy_addr_2_2", length=1, expectBytes=self.address_length) + self.set_state( + "proxy_addr_2_2", length=1, expectBytes=self.address_length) return True def state_proxy_addr_2_2(self): """ - Handle other addresses than IPv4 returned for peer (e.g. IPv6, onion, ...). This is part 2 which retrieves the - data. + Handle other addresses than IPv4 returned for peer + (e.g. IPv6, onion, ...). This is part 2 which retrieves the data. """ self.boundaddr = self.read_buf[0:self.address_length] self.set_state("proxy_port", length=self.address_length, expectBytes=2) @@ -139,7 +147,8 @@ class Socks5(Proxy): self.boundport = struct.unpack(">H", self.read_buf[0:2])[0] self.__proxysockname = (self.boundaddr, self.boundport) if self.ipaddr is not None: - self.__proxypeername = (socket.inet_ntoa(self.ipaddr), self.destination[1]) + self.__proxypeername = ( + socket.inet_ntoa(self.ipaddr), self.destination[1]) else: self.__proxypeername = (self.destination[0], self.destport) self.set_state("proxy_handshake_done", length=2) @@ -169,12 +178,15 @@ class Socks5Connection(Socks5): if Proxy._remote_dns: # pylint: disable=protected-access # Resolve remotely self.ipaddr = None - self.append_write_buf(chr(0x03).encode() + - chr(len(self.destination[0])).encode() + - self.destination[0]) + self.append_write_buf( + chr(0x03).encode() + + chr(len(self.destination[0])).encode() + + self.destination[0] + ) else: # Resolve locally - self.ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0])) + self.ipaddr = socket.inet_aton( + socket.gethostbyname(self.destination[0])) self.append_write_buf(chr(0x01).encode() + self.ipaddr) self.append_write_buf(struct.pack(">H", self.destination[1])) self.set_state("pre_connect", length=0, expectBytes=4) @@ -200,14 +212,18 @@ class Socks5Resolver(Socks5): """Perform resolving""" # Now we can request the actual connection self.append_write_buf(struct.pack('BBB', 0x05, 0xF0, 0x00)) - self.append_write_buf(chr(0x03).encode() + chr(len(self.host)).encode() + str(self.host)) + self.append_write_buf( + chr(0x03).encode() + chr(len(self.host)).encode() + + str(self.host) + ) self.append_write_buf(struct.pack(">H", self.port)) self.set_state("pre_connect", length=0, expectBytes=4) return True def resolved(self): """ - Resolving is done, process the return value. To use this within PyBitmessage, a callback needs to be + Resolving is done, process the return value. + To use this within PyBitmessage, a callback needs to be implemented which hasn't been done yet. """ print "Resolved %s as %s" % (self.host, self.proxy_sock_name()) diff --git a/src/network/tcp.py b/src/network/tcp.py index 1790d59b..e06cf89f 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -10,10 +10,10 @@ import socket import time import addresses +import asyncore_pollchoose as asyncore +import connectionpool import helper_random import knownnodes -import network.asyncore_pollchoose as asyncore -import network.connectionpool import protocol import shared import state @@ -31,7 +31,8 @@ from network.tls import TLSDispatcher from queues import UISignalQueue, invQueue, receiveDataQueue -class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instance-attributes +class TCPConnection(BMProto, TLSDispatcher): + # pylint: disable=too-many-instance-attributes """ .. todo:: Look to understand and/or fix the non-parent-init-called @@ -46,27 +47,32 @@ class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instanc self.connectedAt = 0 self.skipUntil = 0 if address is None and sock is not None: - self.destination = state.Peer(sock.getpeername()[0], sock.getpeername()[1]) + self.destination = state.Peer(*sock.getpeername()) self.isOutbound = False TLSDispatcher.__init__(self, sock, server_side=True) self.connectedAt = time.time() - logger.debug("Received connection from %s:%i", self.destination.host, self.destination.port) + logger.debug( + 'Received connection from %s:%i', + self.destination.host, self.destination.port) self.nodeid = randomBytes(8) elif address is not None and sock is not None: TLSDispatcher.__init__(self, sock, server_side=False) self.isOutbound = True - logger.debug("Outbound proxy connection to %s:%i", self.destination.host, self.destination.port) + logger.debug( + 'Outbound proxy connection to %s:%i', + self.destination.host, self.destination.port) else: self.destination = address self.isOutbound = True - if ":" in address.host: - self.create_socket(socket.AF_INET6, socket.SOCK_STREAM) - else: - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.create_socket( + socket.AF_INET6 if ":" in address.host else socket.AF_INET, + socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) TLSDispatcher.__init__(self, sock, server_side=False) self.connect(self.destination) - logger.debug("Connecting to %s:%i", self.destination.host, self.destination.port) + logger.debug( + 'Connecting to %s:%i', + self.destination.host, self.destination.port) encodedAddr = protocol.encodeHost(self.destination.host) self.local = all([ protocol.checkIPAddress(encodedAddr, True), @@ -80,13 +86,18 @@ class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instanc """ This is a defense against the so called intersection attacks. - It is called when you notice peer is requesting non-existing objects, or right after the connection is - established. It will estimate how long an object will take to propagate across the network, and skip processing - "getdata" requests until then. This means an attacker only has one shot per IP to perform the attack. + It is called when you notice peer is requesting non-existing + objects, or right after the connection is established. It will + estimate how long an object will take to propagate across the + network, and skip processing "getdata" requests until then. This + means an attacker only has one shot per IP to perform the attack. """ - # estimated time for a small object to propagate across the whole network - max_known_nodes = max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) - delay = math.ceil(math.log(max_known_nodes + 2, 20)) * (0.2 + invQueue.queueCount / 2.0) + # estimated time for a small object to propagate across the + # whole network + max_known_nodes = max( + len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) + delay = math.ceil(math.log(max_known_nodes + 2, 20)) * ( + 0.2 + invQueue.queueCount / 2.0) # take the stream with maximum amount of nodes # +2 is to avoid problems with log(0) and log(1) # 20 is avg connected nodes count @@ -95,15 +106,20 @@ class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instanc if initial: self.skipUntil = self.connectedAt + delay if self.skipUntil > time.time(): - logger.debug("Initial skipping processing getdata for %.2fs", self.skipUntil - time.time()) + logger.debug( + 'Initial skipping processing getdata for %.2fs', + self.skipUntil - time.time()) else: - logger.debug("Skipping processing getdata due to missing object for %.2fs", delay) + logger.debug( + 'Skipping processing getdata due to missing object' + ' for %.2fs', delay) self.skipUntil = time.time() + delay def state_connection_fully_established(self): """ - State after the bitmessage protocol handshake is completed (version/verack exchange, and if both side support - TLS, the TLS handshake as well). + State after the bitmessage protocol handshake is completed + (version/verack exchange, and if both side support TLS, + the TLS handshake as well). """ self.set_connection_fully_established() self.set_state("bm_header") @@ -115,12 +131,14 @@ class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instanc if not self.isOutbound and not self.local: shared.clientHasReceivedIncomingConnections = True UISignalQueue.put(('setStatusIcon', 'green')) - UISignalQueue.put(('updateNetworkStatusTab', (self.isOutbound, True, self.destination))) + UISignalQueue.put(( + 'updateNetworkStatusTab', + (self.isOutbound, True, self.destination) + )) self.antiIntersectionDelay(True) self.fullyEstablished = True if self.isOutbound: knownnodes.increaseRating(self.destination) - if self.isOutbound: Dandelion().maybeAddStem(self) self.sendAddr() self.sendBigInv() @@ -161,18 +179,25 @@ class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instanc self.append_write_buf(BMProto.assembleAddr(templist)) def sendBigInv(self): - """Send hashes of all inventory objects, chunked as the protocol has a per-command limit.""" + """ + Send hashes of all inventory objects, chunked as the protocol has + a per-command limit. + """ def sendChunk(): """Send one chunk of inv entries in one command""" if objectCount == 0: return - logger.debug('Sending huge inv message with %i objects to just this one peer', objectCount) - self.append_write_buf(protocol.CreatePacket('inv', addresses.encodeVarint(objectCount) + payload)) + logger.debug( + 'Sending huge inv message with %i objects to just this' + ' one peer', objectCount) + self.append_write_buf(protocol.CreatePacket( + 'inv', addresses.encodeVarint(objectCount) + payload)) # Select all hashes for objects in this stream. bigInvList = {} for stream in self.streams: - # may lock for a long time, but I think it's better than thousands of small locks + # may lock for a long time, but I think it's better than + # thousands of small locks with self.objectsNewToThemLock: for objHash in Inventory().unexpired_hashes_by_stream(stream): # don't advertise stem objects on bigInv @@ -203,17 +228,18 @@ class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instanc try: AdvancedDispatcher.handle_connect(self) except socket.error as e: - if e.errno in asyncore._DISCONNECTED: # pylint: disable=protected-access - logger.debug("%s:%i: Connection failed: %s", self.destination.host, self.destination.port, str(e)) + # pylint: disable=protected-access + if e.errno in asyncore._DISCONNECTED: + logger.debug( + '%s:%i: Connection failed: %s', + self.destination.host, self.destination.port, e) return self.nodeid = randomBytes(8) self.append_write_buf( protocol.assembleVersionMessage( - self.destination.host, - self.destination.port, - network.connectionpool.BMConnectionPool().streams, - False, - nodeid=self.nodeid)) + self.destination.host, self.destination.port, + connectionpool.BMConnectionPool().streams, + False, nodeid=self.nodeid)) self.connectedAt = time.time() receiveDataQueue.put(self.destination) @@ -224,7 +250,8 @@ class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instanc for s in self.streams: try: with knownnodes.knownNodesLock: - knownnodes.knownNodes[s][self.destination]["lastseen"] = time.time() + knownnodes.knownNodes[s][self.destination][ + "lastseen"] = time.time() except KeyError: pass receiveDataQueue.put(self.destination) @@ -238,7 +265,10 @@ class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instanc if self.isOutbound and not self.fullyEstablished: knownnodes.decreaseRating(self.destination) if self.fullyEstablished: - UISignalQueue.put(('updateNetworkStatusTab', (self.isOutbound, False, self.destination))) + UISignalQueue.put(( + 'updateNetworkStatusTab', + (self.isOutbound, False, self.destination) + )) if self.isOutbound: Dandelion().maybeRemoveStem(self) BMProto.handle_close(self) @@ -253,16 +283,17 @@ class Socks5BMConnection(Socks5Connection, TCPConnection): self.set_state("init") def state_proxy_handshake_done(self): - """State when SOCKS5 connection succeeds, we need to send a Bitmessage handshake to peer.""" + """ + State when SOCKS5 connection succeeds, we need to send a + Bitmessage handshake to peer. + """ Socks5Connection.state_proxy_handshake_done(self) self.nodeid = randomBytes(8) self.append_write_buf( protocol.assembleVersionMessage( - self.destination.host, - self.destination.port, - network.connectionpool.BMConnectionPool().streams, - False, - nodeid=self.nodeid)) + self.destination.host, self.destination.port, + connectionpool.BMConnectionPool().streams, + False, nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True @@ -276,16 +307,17 @@ class Socks4aBMConnection(Socks4aConnection, TCPConnection): self.set_state("init") def state_proxy_handshake_done(self): - """State when SOCKS4a connection succeeds, we need to send a Bitmessage handshake to peer.""" + """ + State when SOCKS4a connection succeeds, we need to send a + Bitmessage handshake to peer. + """ Socks4aConnection.state_proxy_handshake_done(self) self.nodeid = randomBytes(8) self.append_write_buf( protocol.assembleVersionMessage( - self.destination.host, - self.destination.port, - network.connectionpool.BMConnectionPool().streams, - False, - nodeid=self.nodeid)) + self.destination.host, self.destination.port, + connectionpool.BMConnectionPool().streams, + False, nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True @@ -293,7 +325,7 @@ class Socks4aBMConnection(Socks4aConnection, TCPConnection): class TCPServer(AdvancedDispatcher): """TCP connection server for Bitmessage protocol""" - def __init__(self, host='127.0.0.1', port=8444): # pylint: disable=redefined-outer-name + def __init__(self, host='127.0.0.1', port=8444): if not hasattr(self, '_map'): AdvancedDispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -308,7 +340,8 @@ class TCPServer(AdvancedDispatcher): continue else: if attempt > 0: - BMConfigParser().set("bitmessagesettings", "port", str(port)) + BMConfigParser().set( + 'bitmessagesettings', 'port', str(port)) BMConfigParser().save() break self.destination = state.Peer(host, port) @@ -324,23 +357,30 @@ class TCPServer(AdvancedDispatcher): def handle_accept(self): """Incoming connection callback""" - pair = self.accept() - if pair is not None: - sock, _ = pair - state.ownAddresses[state.Peer(sock.getsockname()[0], sock.getsockname()[1])] = True - if len(network.connectionpool.BMConnectionPool().inboundConnections) + \ - len(network.connectionpool.BMConnectionPool().outboundConnections) > \ - BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections") + \ - BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections") + 10: - # 10 is a sort of buffer, in between it will go through the version handshake - # and return an error to the peer - logger.warning("Server full, dropping connection") - sock.close() - return - try: - network.connectionpool.BMConnectionPool().addConnection(TCPConnection(sock=sock)) - except socket.error: - pass + try: + sock = self.accept()[0] + except (TypeError, IndexError): + return + + state.ownAddresses[state.Peer(*sock.getsockname())] = True + if ( + len(connectionpool.BMConnectionPool().inboundConnections) + + len(connectionpool.BMConnectionPool().outboundConnections) > + BMConfigParser().safeGetInt( + 'bitmessagesettings', 'maxtotalconnections') + + BMConfigParser().safeGetInt( + 'bitmessagesettings', 'maxbootstrapconnections') + 10 + ): + # 10 is a sort of buffer, in between it will go through + # the version handshake and return an error to the peer + logger.warning("Server full, dropping connection") + sock.close() + return + try: + connectionpool.BMConnectionPool().addConnection( + TCPConnection(sock=sock)) + except socket.error: + pass if __name__ == "__main__":