From f0bc74e65844f7a9a33dd3bc1d63a168cb97b2eb Mon Sep 17 00:00:00 2001 From: lakshyacis Date: Mon, 6 Jan 2020 17:14:47 +0530 Subject: [PATCH] Network fixes --- src/network/__init__.py | 3 + src/network/addrthread.py | 1 + src/network/advanceddispatcher.py | 35 +++--- src/network/announcethread.py | 7 +- src/network/assemble.py | 17 ++- src/network/asyncore_pollchoose.py | 164 ++++++++++------------------- src/network/bmobject.py | 49 +++++---- src/network/bmproto.py | 82 ++++++++------- src/network/connectionchooser.py | 8 +- src/network/constants.py | 18 ++-- src/network/dandelion.py | 10 +- src/network/downloadthread.py | 5 +- src/network/invthread.py | 20 ++-- 13 files changed, 205 insertions(+), 214 deletions(-) diff --git a/src/network/__init__.py b/src/network/__init__.py index 51c4c4da..70613539 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -1,3 +1,6 @@ +""" +Network subsystem packages +""" from addrthread import AddrThread from announcethread import AnnounceThread from connectionpool import BMConnectionPool diff --git a/src/network/addrthread.py b/src/network/addrthread.py index 8a0396f8..3bf448d8 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -12,6 +12,7 @@ from threads import StoppableThread class AddrThread(StoppableThread): + """(Node) address broadcasting thread""" name = "AddrBroadcaster" def run(self): diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index eeb50bdf..982be819 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -1,9 +1,7 @@ """ -src/network/advanceddispatcher.py -================================= +Improved version of asyncore dispatcher """ # pylint: disable=attribute-defined-outside-init - import socket import threading import time @@ -14,7 +12,8 @@ from threads import BusyError, nonBlocking class ProcessingError(Exception): - """General class for protocol parser exception, use as a base for others.""" + """General class for protocol parser exception, + use as a base for others.""" pass @@ -24,7 +23,8 @@ class UnknownStateError(ProcessingError): class AdvancedDispatcher(asyncore.dispatcher): - """Improved version of asyncore dispatcher, with buffers and protocol state.""" + """Improved version of asyncore dispatcher, + with buffers and protocol state.""" # pylint: disable=too-many-instance-attributes _buf_len = 131072 # 128kB @@ -72,7 +72,8 @@ class AdvancedDispatcher(asyncore.dispatcher): del self.read_buf[0:length] def process(self): - """Process (parse) data that's in the buffer, as long as there is enough data and the connection is open.""" + """Process (parse) data that's in the buffer, + as long as there is enough data and the connection is open.""" while self.connected and not state.shutdown: try: with nonBlocking(self.processingLock): @@ -104,8 +105,9 @@ class AdvancedDispatcher(asyncore.dispatcher): if asyncore.maxUploadRate > 0: self.uploadChunk = int(asyncore.uploadBucket) self.uploadChunk = min(self.uploadChunk, len(self.write_buf)) - return asyncore.dispatcher.writable(self) and \ - (self.connecting or (self.connected and self.uploadChunk > 0)) + return asyncore.dispatcher.writable(self) and ( + self.connecting or ( + self.connected and self.uploadChunk > 0)) def readable(self): """Is the read buffer ready to accept data from the network?""" @@ -114,13 +116,15 @@ class AdvancedDispatcher(asyncore.dispatcher): self.downloadChunk = int(asyncore.downloadBucket) try: if self.expectBytes > 0 and not self.fullyEstablished: - self.downloadChunk = min(self.downloadChunk, self.expectBytes - len(self.read_buf)) + self.downloadChunk = min( + self.downloadChunk, self.expectBytes - len(self.read_buf)) if self.downloadChunk < 0: self.downloadChunk = 0 except AttributeError: pass - return asyncore.dispatcher.readable(self) and \ - (self.connecting or self.accepting or (self.connected and self.downloadChunk > 0)) + return asyncore.dispatcher.readable(self) and ( + self.connecting or self.accepting or ( + self.connected and self.downloadChunk > 0)) def handle_read(self): """Append incoming data to the read buffer.""" @@ -144,20 +148,21 @@ class AdvancedDispatcher(asyncore.dispatcher): try: asyncore.dispatcher.handle_connect_event(self) except socket.error as e: - if e.args[0] not in asyncore._DISCONNECTED: # pylint: disable=protected-access + # pylint: disable=protected-access + if e.args[0] not in asyncore._DISCONNECTED: raise def handle_connect(self): """Method for handling connection established implementations.""" self.lastTx = time.time() - def state_close(self): + def state_close(self): # pylint: disable=no-self-use """Signal to the processing loop to end.""" - # pylint: disable=no-self-use return False def handle_close(self): - """Callback for connection being closed, but can also be called directly when you want connection to close.""" + """Callback for connection being closed, + but can also be called directly when you want connection to close.""" with self.readLock: self.read_buf = bytearray() with self.writeLock: diff --git a/src/network/announcethread.py b/src/network/announcethread.py index c11a2cc6..17c8bcd3 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -1,8 +1,6 @@ """ -src/network/announcethread.py -================================= +Announce myself (node address) """ - import time import state @@ -40,6 +38,7 @@ class AnnounceThread(StoppableThread): stream, Peer( '127.0.0.1', - BMConfigParser().safeGetInt('bitmessagesettings', 'port')), + BMConfigParser().safeGetInt( + 'bitmessagesettings', 'port')), time.time()) connection.append_write_buf(assemble_addr([addr])) diff --git a/src/network/assemble.py b/src/network/assemble.py index 2d31914c..32fad3e4 100644 --- a/src/network/assemble.py +++ b/src/network/assemble.py @@ -1,7 +1,6 @@ """ Create bitmessage protocol command packets """ - import struct import addresses @@ -13,20 +12,20 @@ from protocol import CreatePacket, encodeHost def assemble_addr(peerList): """Create address command""" if isinstance(peerList, Peer): - peerList = (peerList) + peerList = [peerList] if not peerList: return b'' retval = b'' for i in range(0, len(peerList), MAX_ADDR_COUNT): - payload = addresses.encodeVarint( - len(peerList[i:i + MAX_ADDR_COUNT])) + payload = addresses.encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT])) for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]: - payload += struct.pack( - '>Q', timestamp) # 64-bit time + # 64-bit time + payload += struct.pack('>Q', timestamp) payload += struct.pack('>I', stream) - payload += struct.pack( - '>q', 1) # service bit flags offered by this node + # service bit flags offered by this node + payload += struct.pack('>q', 1) payload += encodeHost(peer.host) - payload += struct.pack('>H', peer.port) # remote port + # remote port + payload += struct.pack('>H', peer.port) retval += CreatePacket('addr', payload) return retval diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index 3337c0f0..41757f37 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -1,56 +1,11 @@ +""" +Basic infrastructure for asynchronous socket service clients and servers. +""" # -*- Mode: Python -*- # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp # Author: Sam Rushing -# pylint: disable=too-many-statements,too-many-branches,no-self-use,too-many-lines,attribute-defined-outside-init -# pylint: disable=global-statement -""" -src/network/asyncore_pollchoose.py -================================== - -# ====================================================================== -# Copyright 1996 by Sam Rushing -# -# All Rights Reserved -# -# Permission to use, copy, modify, and distribute this software and -# its documentation for any purpose and without fee is hereby -# granted, provided that the above copyright notice appear in all -# copies and that both that copyright notice and this permission -# notice appear in supporting documentation, and that the name of Sam -# Rushing not be used in advertising or publicity pertaining to -# distribution of the software without specific, written prior -# permission. -# -# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, -# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN -# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR -# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS -# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, -# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN -# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -# ====================================================================== - -Basic infrastructure for asynchronous socket service clients and servers. - -There are only two ways to have a program on a single processor do "more -than one thing at a time". Multi-threaded programming is the simplest and -most popular way to do it, but there is another very different technique, -that lets you have nearly all the advantages of multi-threading, without -actually using multiple threads. it's really only practical if your program -is largely I/O bound. If your program is CPU bound, then pre-emptive -scheduled threads are probably what you really need. Network servers are -rarely CPU-bound, however. - -If your operating system supports the select() system call in its I/O -library (and nearly all do), then you can use it to juggle multiple -communication channels at once; doing other work while your I/O is taking -place in the "background." Although this strategy can seem strange and -complex, especially at first, it is in many ways easier to understand and -control than multi-threaded programming. The module documented here solves -many of the difficult problems for you, making the task of building -sophisticated high-performance network servers and clients a snap. -""" - +# pylint: disable=too-many-branches,too-many-lines,global-statement +# pylint: disable=redefined-builtin,no-self-use import os import select import socket @@ -58,8 +13,9 @@ import sys import time import warnings from errno import ( - EADDRINUSE, EAGAIN, EALREADY, EBADF, ECONNABORTED, ECONNREFUSED, ECONNRESET, EHOSTUNREACH, EINPROGRESS, EINTR, - EINVAL, EISCONN, ENETUNREACH, ENOTCONN, ENOTSOCK, EPIPE, ESHUTDOWN, ETIMEDOUT, EWOULDBLOCK, errorcode + EADDRINUSE, EAGAIN, EALREADY, EBADF, ECONNABORTED, ECONNREFUSED, + ECONNRESET, EHOSTUNREACH, EINPROGRESS, EINTR, EINVAL, EISCONN, ENETUNREACH, + ENOTCONN, ENOTSOCK, EPIPE, ESHUTDOWN, ETIMEDOUT, EWOULDBLOCK, errorcode ) from threading import current_thread @@ -107,7 +63,8 @@ def _strerror(err): class ExitNow(Exception): - """We don't use directly but may be necessary as we replace asyncore due to some library raising or expecting it""" + """We don't use directly but may be necessary as we replace + asyncore due to some library raising or expecting it""" pass @@ -152,7 +109,8 @@ def write(obj): def set_rates(download, upload): """Set throttling rates""" - global maxDownloadRate, maxUploadRate, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp + global maxDownloadRate, maxUploadRate, downloadBucket + global uploadBucket, downloadTimestamp, uploadTimestamp maxDownloadRate = float(download) * 1024 maxUploadRate = float(upload) * 1024 @@ -182,7 +140,8 @@ def update_received(download=0): currentTimestamp = time.time() receivedBytes += download if maxDownloadRate > 0: - bucketIncrease = maxDownloadRate * (currentTimestamp - downloadTimestamp) + bucketIncrease = \ + maxDownloadRate * (currentTimestamp - downloadTimestamp) downloadBucket += bucketIncrease if downloadBucket > maxDownloadRate: downloadBucket = int(maxDownloadRate) @@ -242,7 +201,6 @@ def readwrite(obj, flags): def select_poller(timeout=0.0, map=None): """A poller which uses select(), available on most platforms.""" - # pylint: disable=redefined-builtin if map is None: map = socket_map @@ -298,7 +256,6 @@ def select_poller(timeout=0.0, map=None): def poll_poller(timeout=0.0, map=None): """A poller which uses poll(), available on most UNIXen.""" - # pylint: disable=redefined-builtin if map is None: map = socket_map @@ -356,7 +313,6 @@ poll2 = poll3 = poll_poller def epoll_poller(timeout=0.0, map=None): """A poller which uses epoll(), supported on Linux 2.5.44 and newer.""" - # pylint: disable=redefined-builtin if map is None: map = socket_map @@ -412,7 +368,7 @@ def epoll_poller(timeout=0.0, map=None): def kqueue_poller(timeout=0.0, map=None): """A poller which uses kqueue(), BSD specific.""" - # pylint: disable=redefined-builtin,no-member + # pylint: disable=no-member,too-many-statements if map is None: map = socket_map @@ -440,14 +396,20 @@ def kqueue_poller(timeout=0.0, map=None): poller_flags |= select.KQ_EV_ENABLE else: poller_flags |= select.KQ_EV_DISABLE - updates.append(select.kevent(fd, filter=select.KQ_FILTER_READ, flags=poller_flags)) + updates.append( + select.kevent( + fd, filter=select.KQ_FILTER_READ, + flags=poller_flags)) if kq_filter & 2 != obj.poller_filter & 2: poller_flags = select.KQ_EV_ADD if kq_filter & 2: poller_flags |= select.KQ_EV_ENABLE else: poller_flags |= select.KQ_EV_DISABLE - updates.append(select.kevent(fd, filter=select.KQ_FILTER_WRITE, flags=poller_flags)) + updates.append( + select.kevent( + fd, filter=select.KQ_FILTER_WRITE, + flags=poller_flags)) obj.poller_filter = kq_filter if not selectables: @@ -481,7 +443,6 @@ def kqueue_poller(timeout=0.0, map=None): def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None): """Poll in a loop, until count or timeout is reached""" - # pylint: disable=redefined-builtin if map is None: map = socket_map @@ -520,9 +481,9 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None): count = count - 1 -class dispatcher: +class dispatcher(object): """Dispatcher for socket objects""" - # pylint: disable=too-many-public-methods,too-many-instance-attributes,old-style-class + # pylint: disable=too-many-public-methods,too-many-instance-attributes debug = False connected = False @@ -537,7 +498,6 @@ class dispatcher: minTx = 1500 def __init__(self, sock=None, map=None): - # pylint: disable=redefined-builtin if map is None: self._map = socket_map else: @@ -586,8 +546,7 @@ class dispatcher: def add_channel(self, map=None): """Add a channel""" - # pylint: disable=redefined-builtin - + # pylint: disable=attribute-defined-outside-init if map is None: map = self._map map[self._fileno] = self @@ -596,8 +555,6 @@ class dispatcher: def del_channel(self, map=None): """Delete a channel""" - # pylint: disable=redefined-builtin - fd = self._fileno if map is None: map = self._map @@ -605,12 +562,14 @@ class dispatcher: del map[fd] if self._fileno: try: - kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0) - except (AttributeError, KeyError, TypeError, IOError, OSError): + kqueue_poller.pollster.control([select.kevent( + fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0) + except(AttributeError, KeyError, TypeError, IOError, OSError): pass try: - kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0) - except (AttributeError, KeyError, TypeError, IOError, OSError): + kqueue_poller.pollster.control([select.kevent( + fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0) + except(AttributeError, KeyError, TypeError, IOError, OSError): pass try: epoll_poller.pollster.unregister(fd) @@ -627,8 +586,10 @@ class dispatcher: self.poller_filter = 0 self.poller_registered = False - def create_socket(self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM): + def create_socket( + self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM): """Create a socket""" + # pylint: disable=attribute-defined-outside-init self.family_and_type = family, socket_type sock = socket.socket(family, socket_type) sock.setblocking(0) @@ -636,20 +597,16 @@ class dispatcher: def set_socket(self, sock, map=None): """Set socket""" - # pylint: disable=redefined-builtin - self.socket = sock self._fileno = sock.fileno() self.add_channel(map) def set_reuse_addr(self): """try to re-use a server port if possible""" - try: self.socket.setsockopt( - socket.SOL_SOCKET, socket.SO_REUSEADDR, - self.socket.getsockopt(socket.SOL_SOCKET, - socket.SO_REUSEADDR) | 1 + socket.SOL_SOCKET, socket.SO_REUSEADDR, self.socket.getsockopt( + socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1 ) except socket.error: pass @@ -704,13 +661,16 @@ class dispatcher: raise socket.error(err, errorcode[err]) def accept(self): - """Accept incoming connections. Returns either an address pair or None.""" + """Accept incoming connections. + Returns either an address pair or None.""" try: conn, addr = self.socket.accept() except TypeError: return None except socket.error as why: - if why.args[0] in (EWOULDBLOCK, WSAEWOULDBLOCK, ECONNABORTED, EAGAIN, ENOTCONN): + if why.args[0] in ( + EWOULDBLOCK, WSAEWOULDBLOCK, ECONNABORTED, + EAGAIN, ENOTCONN): return None else: raise @@ -769,11 +729,12 @@ class dispatcher: try: retattr = getattr(self.socket, attr) except AttributeError: - raise AttributeError("%s instance has no attribute '%s'" - % (self.__class__.__name__, attr)) + raise AttributeError( + "%s instance has no attribute '%s'" + % (self.__class__.__name__, attr)) else: - msg = "%(me)s.%(attr)s is deprecated; use %(me)s.socket.%(attr)s " \ - "instead" % {'me': self.__class__.__name__, 'attr': attr} + msg = "%(me)s.%(attr)s is deprecated; use %(me)s.socket.%(attr)s"\ + " instead" % {'me': self.__class__.__name__, 'attr': attr} warnings.warn(msg, DeprecationWarning, stacklevel=2) return retattr @@ -855,13 +816,8 @@ class dispatcher: self.log_info( 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( - self_repr, - t, - v, - tbinfo - ), - 'error' - ) + self_repr, t, v, tbinfo), + 'error') self.handle_close() def handle_accept(self): @@ -902,11 +858,8 @@ class dispatcher_with_send(dispatcher): adds simple buffered output capability, useful for simple clients. [for more sophisticated usage use asynchat.async_chat] """ - # pylint: disable=redefined-builtin def __init__(self, sock=None, map=None): - # pylint: disable=redefined-builtin - dispatcher.__init__(self, sock, map) self.out_buffer = b'' @@ -941,7 +894,8 @@ def compact_traceback(): """Return a compact traceback""" t, v, tb = sys.exc_info() tbinfo = [] - if not tb: # Must have a traceback + # Must have a traceback + if not tb: raise AssertionError("traceback does not exist") while tb: tbinfo.append(( @@ -961,7 +915,6 @@ def compact_traceback(): def close_all(map=None, ignore_all=False): """Close all connections""" - # pylint: disable=redefined-builtin if map is None: map = socket_map @@ -998,13 +951,13 @@ def close_all(map=None, ignore_all=False): if os.name == 'posix': import fcntl - class file_wrapper: + class file_wrapper: # pylint: disable=old-style-class """ - Here we override just enough to make a file look like a socket for the purposes of asyncore. + Here we override just enough to make a file look + like a socket for the purposes of asyncore. The passed fd is automatically os.dup()'d """ - # pylint: disable=old-style-class def __init__(self, fd): self.fd = os.dup(fd) @@ -1019,12 +972,11 @@ if os.name == 'posix': def getsockopt(self, level, optname, buflen=None): """Fake getsockopt()""" - if (level == socket.SOL_SOCKET and - optname == socket.SO_ERROR and + if (level == socket.SOL_SOCKET and optname == socket.SO_ERROR and not buflen): return 0 - raise NotImplementedError("Only asyncore specific behaviour " - "implemented.") + raise NotImplementedError( + "Only asyncore specific behaviour implemented.") read = recv write = send @@ -1041,8 +993,6 @@ if os.name == 'posix': """A dispatcher for file_wrapper objects""" def __init__(self, fd, map=None): - # pylint: disable=redefined-builtin - dispatcher.__init__(self, None, map) self.connected = True try: diff --git a/src/network/bmobject.py b/src/network/bmobject.py index ac6429e4..12b997d7 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -1,7 +1,6 @@ """ BMObject and it's exceptions. """ - import logging import time @@ -15,12 +14,14 @@ logger = logging.getLogger('default') class BMObjectInsufficientPOWError(Exception): - """Exception indicating the object doesn't have sufficient proof of work.""" + """Exception indicating the object + doesn't have sufficient proof of work.""" errorCodes = ("Insufficient proof of work") class BMObjectInvalidDataError(Exception): - """Exception indicating the data being parsed does not match the specification.""" + """Exception indicating the data being parsed + does not match the specification.""" errorCodes = ("Data invalid") @@ -30,7 +31,8 @@ class BMObjectExpiredError(Exception): class BMObjectUnwantedStreamError(Exception): - """Exception indicating the object is in a stream we didn't advertise as being interested in.""" + """Exception indicating the object is in a stream + we didn't advertise as being interested in.""" errorCodes = ("Object in unwanted stream") @@ -44,9 +46,8 @@ class BMObjectAlreadyHaveError(Exception): errorCodes = ("Already have this object") -class BMObject(object): +class BMObject(object): # pylint: disable=too-many-instance-attributes """Bitmessage Object as a class.""" - # pylint: disable=too-many-instance-attributes # max TTL, 28 days and 3 hours maxTTL = 28 * 24 * 60 * 60 + 10800 @@ -81,31 +82,36 @@ class BMObject(object): raise BMObjectInsufficientPOWError() def checkEOLSanity(self): - """Check if object's lifetime isn't ridiculously far in the past or future.""" + """Check if object's lifetime + isn't ridiculously far in the past or future.""" # EOL sanity check if self.expiresTime - int(time.time()) > BMObject.maxTTL: logger.info( - 'This object\'s End of Life time is too far in the future. Ignoring it. Time is %i', - self.expiresTime) + 'This object\'s End of Life time is too far in the future.' + ' Ignoring it. Time is %i', self.expiresTime) # .. todo:: remove from download queue raise BMObjectExpiredError() if self.expiresTime - int(time.time()) < BMObject.minTTL: logger.info( - 'This object\'s End of Life time was too long ago. Ignoring the object. Time is %i', - self.expiresTime) + 'This object\'s End of Life time was too long ago.' + ' Ignoring the object. Time is %i', self.expiresTime) # .. todo:: remove from download queue raise BMObjectExpiredError() def checkStream(self): """Check if object's stream matches streams we are interested in""" if self.streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %i isn\'t one we are interested in.', self.streamNumber) + logger.debug( + 'The streamNumber %i isn\'t one we are interested in.', + self.streamNumber) raise BMObjectUnwantedStreamError() def checkAlreadyHave(self): """ - Check if we already have the object (so that we don't duplicate it in inventory or advertise it unnecessarily) + Check if we already have the object + (so that we don't duplicate it in inventory + or advertise it unnecessarily) """ # if it's a stem duplicate, pretend we don't have it if Dandelion().hasHash(self.inventoryHash): @@ -114,7 +120,8 @@ class BMObject(object): raise BMObjectAlreadyHaveError() def checkObjectByType(self): - """Call a object type specific check (objects can have additional checks based on their types)""" + """Call a object type specific check + (objects can have additional checks based on their types)""" if self.objectType == protocol.OBJECT_GETPUBKEY: self.checkGetpubkey() elif self.objectType == protocol.OBJECT_PUBKEY: @@ -125,20 +132,21 @@ class BMObject(object): self.checkBroadcast() # other objects don't require other types of tests - def checkMessage(self): + def checkMessage(self): # pylint: disable=no-self-use """"Message" object type checks.""" - # pylint: disable=no-self-use return def checkGetpubkey(self): """"Getpubkey" object type checks.""" if len(self.data) < 42: - logger.info('getpubkey message doesn\'t contain enough data. Ignoring.') + logger.info( + 'getpubkey message doesn\'t contain enough data. Ignoring.') raise BMObjectInvalidError() def checkPubkey(self): """"Pubkey" object type checks.""" - if len(self.data) < 146 or len(self.data) > 440: # sanity check + # sanity check + if len(self.data) < 146 or len(self.data) > 440: logger.info('pubkey object too short or too long. Ignoring.') raise BMObjectInvalidError() @@ -146,8 +154,9 @@ class BMObject(object): """"Broadcast" object type checks.""" if len(self.data) < 180: logger.debug( - 'The payload length of this broadcast packet is unreasonably low.' - ' Someone is probably trying funny business. Ignoring message.') + 'The payload length of this broadcast' + ' packet is unreasonably low. Someone is probably' + ' trying funny business. Ignoring message.') raise BMObjectInvalidError() # this isn't supported anymore diff --git a/src/network/bmproto.py b/src/network/bmproto.py index d620daa3..d5d3dbe3 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -1,8 +1,7 @@ """ -src/network/bmproto.py -================================== +Bitmessage Protocol """ -# pylint: disable=attribute-defined-outside-init +# pylint: disable=attribute-defined-outside-init, too-few-public-methods import base64 import hashlib import logging @@ -19,17 +18,16 @@ import state from bmconfigparser import BMConfigParser from inventory import Inventory from network.advanceddispatcher import AdvancedDispatcher -from network.constants import ( - ADDRESS_ALIVE, - MAX_MESSAGE_SIZE, - MAX_OBJECT_COUNT, - MAX_OBJECT_PAYLOAD_SIZE, - MAX_TIME_OFFSET) -from network.dandelion import Dandelion from network.bmobject import ( BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, - BMObjectInvalidError, BMObjectAlreadyHaveError) + BMObjectInvalidError, BMObjectAlreadyHaveError +) +from network.constants import ( + ADDRESS_ALIVE, MAX_MESSAGE_SIZE, MAX_OBJECT_COUNT, + MAX_OBJECT_PAYLOAD_SIZE, MAX_TIME_OFFSET +) +from network.dandelion import Dandelion from network.proxy import ProxyError from node import Node, Peer from objectracker import missingObjects, ObjectTracker @@ -59,7 +57,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # pylint: disable=too-many-instance-attributes, too-many-public-methods timeOffsetWrongCount = 0 - def __init__(self, address=None, sock=None): # pylint: disable=unused-argument, super-init-not-called + def __init__(self, address=None, sock=None): + # pylint: disable=unused-argument, super-init-not-called AdvancedDispatcher.__init__(self, sock) self.isOutbound = False # packet/connection from a local IP @@ -163,7 +162,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def decode_payload_varint(self): """Decode a varint from the payload""" - value, offset = addresses.decodeVarint(self.payload[self.payloadOffset:]) + value, offset = addresses.decodeVarint( + self.payload[self.payloadOffset:]) self.payloadOffset += offset return value @@ -185,8 +185,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return Node(services, host, port) - def decode_payload_content(self, pattern="v"): # pylint: disable=too-many-branches, too-many-statements - + # pylint: disable=too-many-branches, too-many-statements + def decode_payload_content(self, pattern="v"): """ Decode the payload depending on pattern: @@ -202,7 +202,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): , = end of array """ - def decode_simple(self, char="v"): # pylint: disable=inconsistent-return-statements + # pylint: disable=inconsistent-return-statements + def decode_simple(self, char="v"): """Decode the payload using one char pattern""" if char == "v": return self.decode_payload_varint() @@ -312,8 +313,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def bm_command_error(self): """Decode an error message and log it""" - fatalStatus, banTime, inventoryVector, errorText = \ - self.decode_payload_content("vvlsls") + err_values = self.decode_payload_content("vvlsls") + fatalStatus = err_values[0] + # banTime = err_values[1] + # inventoryVector = err_values[2] + errorText = err_values[3] logger.error( '%s:%i error: %i, %s', self.destination.host, self.destination.port, fatalStatus, errorText) @@ -408,8 +412,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker): except KeyError: pass - if self.object.inventoryHash in Inventory() and Dandelion().hasHash(self.object.inventoryHash): - Dandelion().removeHash(self.object.inventoryHash, "cycle detection") + if self.object.inventoryHash in Inventory() and Dandelion().hasHash( + self.object.inventoryHash): + Dandelion().removeHash( + self.object.inventoryHash, "cycle detection") Inventory()[self.object.inventoryHash] = ( self.object.objectType, self.object.streamNumber, @@ -428,27 +434,30 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def bm_command_addr(self): """Incoming addresses, process them""" - addresses = self._decode_addr() # pylint: disable=redefined-outer-name - for i in addresses: - seenTime, stream, services, ip, port = i + # pylint: disable=redefined-outer-name + addresses = self._decode_addr() + for seenTime, stream, _, ip, port in addresses: decodedIP = protocol.checkIPAddress(str(ip)) if stream not in state.streamsInWhichIAmParticipating: continue if ( - decodedIP and time.time() - seenTime > 0 and - seenTime > time.time() - ADDRESS_ALIVE and - port > 0 + decodedIP and time.time() - seenTime > 0 and + seenTime > time.time() - ADDRESS_ALIVE and + port > 0 ): peer = Peer(decodedIP, port) try: - if knownnodes.knownNodes[stream][peer]["lastseen"] > seenTime: + if knownnodes.knownNodes[stream][peer]["lastseen"] > \ + seenTime: continue except KeyError: pass - if len(knownnodes.knownNodes[stream]) < BMConfigParser().safeGetInt("knownnodes", "maxnodes"): + if len(knownnodes.knownNodes[stream]) < \ + BMConfigParser().safeGetInt("knownnodes", "maxnodes"): with knownnodes.knownNodesLock: try: - knownnodes.knownNodes[stream][peer]["lastseen"] = seenTime + knownnodes.knownNodes[stream][peer]["lastseen"] = \ + seenTime except (TypeError, KeyError): knownnodes.knownNodes[stream][peer] = { "lastseen": seenTime, @@ -539,7 +548,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): length=self.payloadLength, expectBytes=0) return False - def peerValidityChecks(self): # pylint: disable=too-many-return-statements + # pylint: disable=too-many-return-statements + def peerValidityChecks(self): """Check the validity of the peer""" if self.remoteProtocolVersion < 3: self.append_write_buf(protocol.assembleErrorMessage( @@ -551,8 +561,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return False if self.timeOffset > MAX_TIME_OFFSET: self.append_write_buf(protocol.assembleErrorMessage( - errorText="Your time is too far in the future compared to mine." - " Closing connection.", fatal=2)) + errorText="Your time is too far in the future" + " compared to mine. Closing connection.", fatal=2)) logger.info( "%s's time is too far in the future (%s seconds)." " Closing connection to it.", self.destination, self.timeOffset) @@ -574,8 +584,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): errorText="We don't have shared stream interests." " Closing connection.", fatal=2)) logger.debug( - 'Closed connection to %s because there is no overlapping interest' - ' in streams.', self.destination) + 'Closed connection to %s because there is no overlapping' + ' interest in streams.', self.destination) return False if self.destination in connectionpool.BMConnectionPool().inboundConnections: try: @@ -584,8 +594,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): errorText="Too many connections from your IP." " Closing connection.", fatal=2)) logger.debug( - 'Closed connection to %s because we are already connected' - ' to that IP.', self.destination) + 'Closed connection to %s because we are already' + ' connected to that IP.', self.destination) return False except: pass diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index 9d2f85d6..badd98b7 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -1,3 +1,6 @@ +""" +Select which node to connect to +""" # pylint: disable=too-many-branches import logging import random # nosec @@ -12,6 +15,7 @@ logger = logging.getLogger('default') def getDiscoveredPeer(): + """Get a peer from the local peer discovery list""" try: peer = random.choice(state.discoveredPeers.keys()) except (IndexError, KeyError): @@ -24,6 +28,7 @@ def getDiscoveredPeer(): def chooseConnection(stream): + """Returns an appropriate connection""" haveOnion = BMConfigParser().safeGet( "bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS' onionOnly = BMConfigParser().safeGetBoolean( @@ -49,7 +54,8 @@ def chooseConnection(stream): logger.warning('Error in %s', peer) rating = 0 if haveOnion: - # do not connect to raw IP addresses--keep all traffic within Tor overlay + # do not connect to raw IP addresses + # --keep all traffic within Tor overlay if onionOnly and not peer.host.endswith('.onion'): continue # onion addresses have a higher priority when SOCKS diff --git a/src/network/constants.py b/src/network/constants.py index a3414ef3..f8f4120f 100644 --- a/src/network/constants.py +++ b/src/network/constants.py @@ -3,9 +3,15 @@ Network protocol constants """ -ADDRESS_ALIVE = 10800 #: address is online if online less than this many seconds ago -MAX_ADDR_COUNT = 1000 #: protocol specification says max 1000 addresses in one addr command -MAX_MESSAGE_SIZE = 1600100 #: ~1.6 MB which is the maximum possible size of an inv message. -MAX_OBJECT_PAYLOAD_SIZE = 2**18 #: 2**18 = 256kB is the maximum size of an object payload -MAX_OBJECT_COUNT = 50000 #: protocol specification says max 50000 objects in one inv command -MAX_TIME_OFFSET = 3600 #: maximum time offset +#: address is online if online less than this many seconds ago +ADDRESS_ALIVE = 10800 +#: protocol specification says max 1000 addresses in one addr command +MAX_ADDR_COUNT = 1000 +#: ~1.6 MB which is the maximum possible size of an inv message. +MAX_MESSAGE_SIZE = 1600100 +#: 2**18 = 256kB is the maximum size of an object payload +MAX_OBJECT_PAYLOAD_SIZE = 2**18 +#: protocol specification says max 50000 objects in one inv command +MAX_OBJECT_COUNT = 50000 +#: maximum time offset +MAX_TIME_OFFSET = 3600 diff --git a/src/network/dandelion.py b/src/network/dandelion.py index 0f68cc24..03f45bd7 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -1,10 +1,9 @@ """ -src/network/dandelion.py -======================== +Dandelion class definition, tracks stages """ import logging from collections import namedtuple -from random import choice, sample, expovariate +from random import choice, expovariate, sample from threading import RLock from time import time @@ -28,7 +27,7 @@ logger = logging.getLogger('default') @Singleton -class Dandelion(): # pylint: disable=old-style-class +class Dandelion: # pylint: disable=old-style-class """Dandelion class for tracking stem/fluff stages.""" def __init__(self): # currently assignable child stems @@ -123,7 +122,8 @@ class Dandelion(): # pylint: disable=old-style-class self.stem.remove(connection) # active mappings to pointing to the removed node for k in ( - k for k, v in self.nodeMap.iteritems() if v == connection + k for k, v in self.nodeMap.iteritems() + if v == connection ): self.nodeMap[k] = None for k, v in { diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index e882f6de..0ae83b5b 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -1,7 +1,6 @@ """ `DownloadThread` class definition """ - import time import addresses @@ -30,7 +29,9 @@ class DownloadThread(StoppableThread): """Expire pending downloads eventually""" deadline = time.time() - self.requestExpires try: - toDelete = [k for k, v in missingObjects.iteritems() if v < deadline] + toDelete = [ + k for k, v in missingObjects.iteritems() + if v < deadline] except RuntimeError: pass else: diff --git a/src/network/invthread.py b/src/network/invthread.py index d5690486..e68b7692 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -1,6 +1,5 @@ """ -src/network/invthread.py -======================== +Thread to send inv annoucements """ import Queue import random @@ -34,7 +33,7 @@ def handleExpiredDandelion(expired): class InvThread(StoppableThread): - """A thread to send inv annoucements.""" + """Main thread that sends inv annoucements""" name = "InvBroadcaster" @@ -43,12 +42,13 @@ class InvThread(StoppableThread): """Locally generated inventory items require special handling""" Dandelion().addHash(hashId, stream=stream) for connection in BMConnectionPool().connections(): - if state.dandelion and connection != Dandelion().objectChildStem(hashId): + if state.dandelion and connection != \ + Dandelion().objectChildStem(hashId): continue connection.objectsNewToThem[hashId] = time() - def run(self): # pylint: disable=too-many-branches - while not state.shutdown: # pylint: disable=too-many-nested-blocks + def run(self): # pylint: disable=too-many-branches + while not state.shutdown: # pylint: disable=too-many-nested-blocks chunk = [] while True: # Dandelion fluff trigger by expiration @@ -92,15 +92,17 @@ class InvThread(StoppableThread): random.shuffle(fluffs) connection.append_write_buf(protocol.CreatePacket( 'inv', - addresses.encodeVarint(len(fluffs)) + ''.join(fluffs))) + addresses.encodeVarint( + len(fluffs)) + ''.join(fluffs))) if stems: random.shuffle(stems) connection.append_write_buf(protocol.CreatePacket( 'dinv', - addresses.encodeVarint(len(stems)) + ''.join(stems))) + addresses.encodeVarint( + len(stems)) + ''.join(stems))) invQueue.iterate() - for i in range(len(chunk)): + for _ in range(len(chunk)): invQueue.task_done() if Dandelion().refresh < time():