diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index 8df557ad..49f0d19d 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -1,7 +1,6 @@ """ Improved version of asyncore dispatcher """ -# pylint: disable=attribute-defined-outside-init import socket import threading import time @@ -31,6 +30,7 @@ class AdvancedDispatcher(asyncore.dispatcher): def __init__(self, sock=None): if not hasattr(self, '_map'): asyncore.dispatcher.__init__(self, sock) + self.connectedAt = 0 self.close_reason = None self.read_buf = bytearray() self.write_buf = bytearray() @@ -42,6 +42,7 @@ class AdvancedDispatcher(asyncore.dispatcher): self.readLock = threading.RLock() self.writeLock = threading.RLock() self.processingLock = threading.RLock() + self.uploadChunk = self.downloadChunk = 0 def append_write_buf(self, data): """Append binary data to the end of stream write buffer.""" diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 20cbb8e6..bf2de760 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -1,7 +1,7 @@ """ -Bitmessage Protocol +Class BMProto defines bitmessage's network protocol workflow. """ -# pylint: disable=attribute-defined-outside-init, too-few-public-methods + import base64 import hashlib import logging @@ -66,6 +66,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.pendingUpload = RandomTrackingDict() # canonical identifier of network group self.network_group = None + # userAgent initialization + self.userAgent = '' def bm_proto_reset(self): """Reset the bitmessage object parser""" @@ -100,7 +102,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): length=protocol.Header.size, expectBytes=self.payloadLength) return True - def state_bm_command(self): # pylint: disable=too-many-branches + def state_bm_command(self): # pylint: disable=too-many-branches """Process incoming command""" self.payload = self.read_buf[:self.payloadLength] if self.checksum != hashlib.sha512(self.payload).digest()[0:4]: @@ -185,7 +187,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return Node(services, host, port) - # pylint: disable=too-many-branches, too-many-statements + # pylint: disable=too-many-branches,too-many-statements def decode_payload_content(self, pattern="v"): """ Decode the payload depending on pattern: @@ -202,7 +204,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker): , = end of array """ - # pylint: disable=inconsistent-return-statements def decode_simple(self, char="v"): """Decode the payload using one char pattern""" if char == "v": @@ -221,6 +222,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.payloadOffset += 8 return struct.unpack(">Q", self.payload[ self.payloadOffset - 8:self.payloadOffset])[0] + return None size = None isArray = False @@ -254,10 +256,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker): ]) parserStack[-2][4] = len(parserStack[-2][3]) else: - for j in range(parserStack[-1][4], len(parserStack[-1][3])): + j = 0 + for j in range( + parserStack[-1][4], len(parserStack[-1][3])): if parserStack[-1][3][j] not in "lL0123456789": break - # pylint: disable=undefined-loop-variable parserStack.append([ size, size, isArray, parserStack[-1][3][parserStack[-1][4]:j + 1], 0, [] @@ -268,7 +271,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): elif i == "s": # if parserStack[-2][2]: # parserStack[-1][5].append(self.payload[ - # self.payloadOffset:self.payloadOffset + parserStack[-1][0]]) + # self.payloadOffset:self.payloadOffset + # + parserStack[-1][0]]) # else: parserStack[-1][5] = self.payload[ self.payloadOffset:self.payloadOffset + parserStack[-1][0]] @@ -339,6 +343,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return True def _command_inv(self, dandelion=False): + """ + Common inv announce implementation: + both inv and dinv depending on *dandelion* kwarg + """ items = self.decode_payload_content("l32s") if len(items) > MAX_OBJECT_COUNT: @@ -376,10 +384,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker): nonce, expiresTime, objectType, version, streamNumber, self.payload, self.payloadOffset) - if len(self.payload) - self.payloadOffset > MAX_OBJECT_PAYLOAD_SIZE: + payload_len = len(self.payload) - self.payloadOffset + if payload_len > MAX_OBJECT_PAYLOAD_SIZE: logger.info( - 'The payload length of this object is too large (%d bytes).' - ' Ignoring it.', len(self.payload) - self.payloadOffset) + 'The payload length of this object is too large' + ' (%d bytes). Ignoring it.', payload_len) raise BMProtoExcessiveDataError() try: @@ -434,9 +443,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def bm_command_addr(self): """Incoming addresses, process them""" - # pylint: disable=redefined-outer-name - addresses = self._decode_addr() - for seenTime, stream, _, ip, port in addresses: + # not using services + for seenTime, stream, _, ip, port in self._decode_addr(): ip = str(ip) if ( stream not in state.streamsInWhichIAmParticipating @@ -446,8 +454,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): continue decodedIP = protocol.checkIPAddress(ip) if ( - decodedIP - and time.time() - seenTime > 0 + decodedIP and time.time() - seenTime > 0 and seenTime > time.time() - ADDRESS_ALIVE and port > 0 ): @@ -475,7 +482,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.append_write_buf(protocol.CreatePacket('pong')) return True - def bm_command_pong(self): # pylint: disable=no-self-use + @staticmethod + def bm_command_pong(): """ Incoming pong. Ignore it. PyBitmessage pings connections after about 5 minutes @@ -562,7 +570,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): " compared to mine. Closing connection.", fatal=2)) logger.info( "%s's time is too far in the future (%s seconds)." - " Closing connection to it.", self.destination, self.timeOffset) + " Closing connection to it.", + self.destination, self.timeOffset) BMProto.timeOffsetWrongCount += 1 return False elif self.timeOffset < -MAX_TIME_OFFSET: @@ -570,8 +579,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker): errorText="Your time is too far in the past compared to mine." " Closing connection.", fatal=2)) logger.info( - "%s's time is too far in the past (timeOffset %s seconds)." - " Closing connection to it.", self.destination, self.timeOffset) + "%s's time is too far in the past" + " (timeOffset %s seconds). Closing connection to it.", + self.destination, self.timeOffset) BMProto.timeOffsetWrongCount += 1 return False else: @@ -584,7 +594,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): 'Closed connection to %s because there is no overlapping' ' interest in streams.', self.destination) return False - if self.destination in connectionpool.BMConnectionPool().inboundConnections: + if connectionpool.BMConnectionPool().inboundConnections.get( + self.destination): try: if not protocol.checkSocksIP(self.destination.host): self.append_write_buf(protocol.assembleErrorMessage( @@ -594,7 +605,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): 'Closed connection to %s because we are already' ' connected to that IP.', self.destination) return False - except Exception: + except Exception: # TODO: exception types pass if not self.isOutbound: # incoming from a peer we're connected to as outbound, @@ -614,8 +625,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): 'Closed connection to %s due to server full' ' or duplicate inbound/outbound.', self.destination) return False - if connectionpool.BMConnectionPool().isAlreadyConnected( - self.nonce): + if connectionpool.BMConnectionPool().isAlreadyConnected(self.nonce): self.append_write_buf(protocol.assembleErrorMessage( errorText="I'm connected to myself. Closing connection.", fatal=2)) @@ -628,7 +638,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): @staticmethod def stopDownloadingObject(hashId, forwardAnyway=False): - """Stop downloading an object""" + """Stop downloading object *hashId*""" for connection in connectionpool.BMConnectionPool().connections(): try: del connection.objectsNewToMe[hashId] @@ -658,7 +668,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): except AttributeError: try: logger.debug( - '%(host)s:%(port)i: closing', self.destination._asdict()) + '%s:%i: closing', + self.destination.host, self.destination.port) except AttributeError: logger.debug('Disconnected socket closing') AdvancedDispatcher.handle_close(self) diff --git a/src/network/proxy.py b/src/network/proxy.py index 38676d66..3bd3cc66 100644 --- a/src/network/proxy.py +++ b/src/network/proxy.py @@ -61,9 +61,9 @@ class Proxy(AdvancedDispatcher): @proxy.setter def proxy(self, address): """Set proxy IP and port""" - if (not isinstance(address, tuple) or len(address) < 2 or - not isinstance(address[0], str) or - not isinstance(address[1], int)): + if (not isinstance(address, tuple) or len(address) < 2 + or not isinstance(address[0], str) + or not isinstance(address[1], int)): raise ValueError self.__class__._proxy = address @@ -113,7 +113,6 @@ class Proxy(AdvancedDispatcher): self.destination = address self.isOutbound = True self.fullyEstablished = False - self.connectedAt = 0 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) if BMConfigParser().safeGetBoolean( "bitmessagesettings", "socksauthentication"): @@ -145,6 +144,5 @@ class Proxy(AdvancedDispatcher): def state_proxy_handshake_done(self): """Handshake is complete at this point""" - # pylint: disable=attribute-defined-outside-init self.connectedAt = time.time() return False diff --git a/src/network/tcp.py b/src/network/tcp.py index 1b30a0a2..ff778378 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -51,7 +51,6 @@ class TCPConnection(BMProto, TLSDispatcher): self.verackSent = False self.streams = [0] self.fullyEstablished = False - self.connectedAt = 0 self.skipUntil = 0 if address is None and sock is not None: self.destination = Peer(*sock.getpeername()) diff --git a/src/network/tls.py b/src/network/tls.py index e326fa32..a3774b44 100644 --- a/src/network/tls.py +++ b/src/network/tls.py @@ -16,7 +16,6 @@ logger = logging.getLogger('default') _DISCONNECTED_SSL = frozenset((ssl.SSL_ERROR_EOF,)) -# sslProtocolVersion if sys.version_info >= (2, 7, 13): # this means TLSv1 or higher # in the future change to @@ -27,14 +26,16 @@ elif sys.version_info >= (2, 7, 9): # SSLv2 and 3 are excluded with an option after context is created sslProtocolVersion = ssl.PROTOCOL_SSLv23 else: - # this means TLSv1, there is no way to set "TLSv1 or higher" or - # "TLSv1.2" in < 2.7.9 + # this means TLSv1, there is no way to set "TLSv1 or higher" + # or "TLSv1.2" in < 2.7.9 sslProtocolVersion = ssl.PROTOCOL_TLSv1 # ciphers -if ssl.OPENSSL_VERSION_NUMBER >= 0x10100000 and not \ - ssl.OPENSSL_VERSION.startswith("LibreSSL"): +if ( + ssl.OPENSSL_VERSION_NUMBER >= 0x10100000 + and not ssl.OPENSSL_VERSION.startswith(b"LibreSSL") +): sslProtocolCiphers = "AECDH-AES256-SHA@SECLEVEL=0" else: sslProtocolCiphers = "AECDH-AES256-SHA" @@ -47,16 +48,10 @@ class TLSDispatcher(AdvancedDispatcher): def __init__(self, _=None, sock=None, certfile=None, keyfile=None, server_side=False, ciphers=sslProtocolCiphers): self.want_read = self.want_write = True - if certfile is None: - self.certfile = os.path.join( - paths.codePath(), 'sslkeys', 'cert.pem') - else: - self.certfile = certfile - if keyfile is None: - self.keyfile = os.path.join( - paths.codePath(), 'sslkeys', 'key.pem') - else: - self.keyfile = keyfile + self.certfile = certfile or os.path.join( + paths.codePath(), 'sslkeys', 'cert.pem') + self.keyfile = keyfile or os.path.join( + paths.codePath(), 'sslkeys', 'key.pem') self.server_side = server_side self.ciphers = ciphers self.tlsStarted = False @@ -66,7 +61,6 @@ class TLSDispatcher(AdvancedDispatcher): def state_tls_init(self): """Prepare sockets for TLS handshake""" - # pylint: disable=attribute-defined-outside-init self.isSSL = True self.tlsStarted = True # Once the connection has been established, @@ -96,8 +90,6 @@ class TLSDispatcher(AdvancedDispatcher): self.want_read = self.want_write = True self.set_state("tls_handshake") return False -# if hasattr(self.socket, "context"): -# self.socket.context.set_ecdh_curve("secp256k1") @staticmethod def state_tls_handshake(): @@ -112,9 +104,9 @@ class TLSDispatcher(AdvancedDispatcher): try: if self.tlsStarted and not self.tlsDone and not self.write_buf: return self.want_write - return AdvancedDispatcher.writable(self) except AttributeError: - return AdvancedDispatcher.writable(self) + pass + return AdvancedDispatcher.writable(self) def readable(self): """Handle readable check for TLS-enabled sockets""" @@ -126,14 +118,14 @@ class TLSDispatcher(AdvancedDispatcher): return self.want_read # prior to TLS handshake, # receiveDataThread should emulate synchronous behaviour - elif not self.fullyEstablished and ( + if not self.fullyEstablished and ( self.expectBytes == 0 or not self.write_buf_empty()): return False - return AdvancedDispatcher.readable(self) except AttributeError: - return AdvancedDispatcher.readable(self) + pass + return AdvancedDispatcher.readable(self) - def handle_read(self): # pylint: disable=inconsistent-return-statements + def handle_read(self): """ Handle reads for sockets during TLS handshake. Requires special treatment as during the handshake, buffers must remain empty @@ -142,29 +134,20 @@ class TLSDispatcher(AdvancedDispatcher): try: # wait for write buffer flush if self.tlsStarted and not self.tlsDone and not self.write_buf: - # logger.debug( - # "%s:%i TLS handshaking (read)", self.destination.host, - # self.destination.port) self.tls_handshake() else: - # logger.debug( - # "%s:%i Not TLS handshaking (read)", self.destination.host, - # self.destination.port) - return AdvancedDispatcher.handle_read(self) + AdvancedDispatcher.handle_read(self) except AttributeError: - return AdvancedDispatcher.handle_read(self) + AdvancedDispatcher.handle_read(self) except ssl.SSLError as err: - self.close_reason = "SSL Error in handle_read" if err.errno == ssl.SSL_ERROR_WANT_READ: return - elif err.errno in _DISCONNECTED_SSL: - self.handle_close() - return - logger.info("SSL Error: %s", err) + if err.errno not in _DISCONNECTED_SSL: + logger.info("SSL Error: %s", err) + self.close_reason = "SSL Error in handle_read" self.handle_close() - return - def handle_write(self): # pylint: disable=inconsistent-return-statements + def handle_write(self): """ Handle writes for sockets during TLS handshake. Requires special treatment as during the handshake, buffers must remain empty @@ -173,27 +156,18 @@ class TLSDispatcher(AdvancedDispatcher): try: # wait for write buffer flush if self.tlsStarted and not self.tlsDone and not self.write_buf: - # logger.debug( - # "%s:%i TLS handshaking (write)", self.destination.host, - # self.destination.port) self.tls_handshake() else: - # logger.debug( - # "%s:%i Not TLS handshaking (write)", self.destination.host, - # self.destination.port) - return AdvancedDispatcher.handle_write(self) + AdvancedDispatcher.handle_write(self) except AttributeError: - return AdvancedDispatcher.handle_write(self) + AdvancedDispatcher.handle_write(self) except ssl.SSLError as err: - self.close_reason = "SSL Error in handle_write" if err.errno == ssl.SSL_ERROR_WANT_WRITE: - return 0 - elif err.errno in _DISCONNECTED_SSL: - self.handle_close() - return 0 - logger.info("SSL Error: %s", err) + return + if err.errno not in _DISCONNECTED_SSL: + logger.info("SSL Error: %s", err) + self.close_reason = "SSL Error in handle_write" self.handle_close() - return def tls_handshake(self): """Perform TLS handshake and handle its stages""" diff --git a/src/network/udp.py b/src/network/udp.py index 7852aeea..3f999332 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -28,7 +28,6 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes # .. todo:: sort out streams self.streams = [1] self.fullyEstablished = True - self.connectedAt = 0 self.skipUntil = 0 if sock is None: if host is None: