From f0bc74e65844f7a9a33dd3bc1d63a168cb97b2eb Mon Sep 17 00:00:00 2001 From: lakshyacis Date: Mon, 6 Jan 2020 17:14:47 +0530 Subject: [PATCH 1/2] Network fixes --- src/network/__init__.py | 3 + src/network/addrthread.py | 1 + src/network/advanceddispatcher.py | 35 +++--- src/network/announcethread.py | 7 +- src/network/assemble.py | 17 ++- src/network/asyncore_pollchoose.py | 164 ++++++++++------------------- src/network/bmobject.py | 49 +++++---- src/network/bmproto.py | 82 ++++++++------- src/network/connectionchooser.py | 8 +- src/network/constants.py | 18 ++-- src/network/dandelion.py | 10 +- src/network/downloadthread.py | 5 +- src/network/invthread.py | 20 ++-- 13 files changed, 205 insertions(+), 214 deletions(-) diff --git a/src/network/__init__.py b/src/network/__init__.py index 51c4c4da..70613539 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -1,3 +1,6 @@ +""" +Network subsystem packages +""" from addrthread import AddrThread from announcethread import AnnounceThread from connectionpool import BMConnectionPool diff --git a/src/network/addrthread.py b/src/network/addrthread.py index 8a0396f8..3bf448d8 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -12,6 +12,7 @@ from threads import StoppableThread class AddrThread(StoppableThread): + """(Node) address broadcasting thread""" name = "AddrBroadcaster" def run(self): diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index eeb50bdf..982be819 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -1,9 +1,7 @@ """ -src/network/advanceddispatcher.py -================================= +Improved version of asyncore dispatcher """ # pylint: disable=attribute-defined-outside-init - import socket import threading import time @@ -14,7 +12,8 @@ from threads import BusyError, nonBlocking class ProcessingError(Exception): - """General class for protocol parser exception, use as a base for others.""" + """General class for protocol parser exception, + use as a base for others.""" pass @@ -24,7 +23,8 @@ class UnknownStateError(ProcessingError): class AdvancedDispatcher(asyncore.dispatcher): - """Improved version of asyncore dispatcher, with buffers and protocol state.""" + """Improved version of asyncore dispatcher, + with buffers and protocol state.""" # pylint: disable=too-many-instance-attributes _buf_len = 131072 # 128kB @@ -72,7 +72,8 @@ class AdvancedDispatcher(asyncore.dispatcher): del self.read_buf[0:length] def process(self): - """Process (parse) data that's in the buffer, as long as there is enough data and the connection is open.""" + """Process (parse) data that's in the buffer, + as long as there is enough data and the connection is open.""" while self.connected and not state.shutdown: try: with nonBlocking(self.processingLock): @@ -104,8 +105,9 @@ class AdvancedDispatcher(asyncore.dispatcher): if asyncore.maxUploadRate > 0: self.uploadChunk = int(asyncore.uploadBucket) self.uploadChunk = min(self.uploadChunk, len(self.write_buf)) - return asyncore.dispatcher.writable(self) and \ - (self.connecting or (self.connected and self.uploadChunk > 0)) + return asyncore.dispatcher.writable(self) and ( + self.connecting or ( + self.connected and self.uploadChunk > 0)) def readable(self): """Is the read buffer ready to accept data from the network?""" @@ -114,13 +116,15 @@ class AdvancedDispatcher(asyncore.dispatcher): self.downloadChunk = int(asyncore.downloadBucket) try: if self.expectBytes > 0 and not self.fullyEstablished: - self.downloadChunk = min(self.downloadChunk, self.expectBytes - len(self.read_buf)) + self.downloadChunk = min( + self.downloadChunk, self.expectBytes - len(self.read_buf)) if self.downloadChunk < 0: self.downloadChunk = 0 except AttributeError: pass - return asyncore.dispatcher.readable(self) and \ - (self.connecting or self.accepting or (self.connected and self.downloadChunk > 0)) + return asyncore.dispatcher.readable(self) and ( + self.connecting or self.accepting or ( + self.connected and self.downloadChunk > 0)) def handle_read(self): """Append incoming data to the read buffer.""" @@ -144,20 +148,21 @@ class AdvancedDispatcher(asyncore.dispatcher): try: asyncore.dispatcher.handle_connect_event(self) except socket.error as e: - if e.args[0] not in asyncore._DISCONNECTED: # pylint: disable=protected-access + # pylint: disable=protected-access + if e.args[0] not in asyncore._DISCONNECTED: raise def handle_connect(self): """Method for handling connection established implementations.""" self.lastTx = time.time() - def state_close(self): + def state_close(self): # pylint: disable=no-self-use """Signal to the processing loop to end.""" - # pylint: disable=no-self-use return False def handle_close(self): - """Callback for connection being closed, but can also be called directly when you want connection to close.""" + """Callback for connection being closed, + but can also be called directly when you want connection to close.""" with self.readLock: self.read_buf = bytearray() with self.writeLock: diff --git a/src/network/announcethread.py b/src/network/announcethread.py index c11a2cc6..17c8bcd3 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -1,8 +1,6 @@ """ -src/network/announcethread.py -================================= +Announce myself (node address) """ - import time import state @@ -40,6 +38,7 @@ class AnnounceThread(StoppableThread): stream, Peer( '127.0.0.1', - BMConfigParser().safeGetInt('bitmessagesettings', 'port')), + BMConfigParser().safeGetInt( + 'bitmessagesettings', 'port')), time.time()) connection.append_write_buf(assemble_addr([addr])) diff --git a/src/network/assemble.py b/src/network/assemble.py index 2d31914c..32fad3e4 100644 --- a/src/network/assemble.py +++ b/src/network/assemble.py @@ -1,7 +1,6 @@ """ Create bitmessage protocol command packets """ - import struct import addresses @@ -13,20 +12,20 @@ from protocol import CreatePacket, encodeHost def assemble_addr(peerList): """Create address command""" if isinstance(peerList, Peer): - peerList = (peerList) + peerList = [peerList] if not peerList: return b'' retval = b'' for i in range(0, len(peerList), MAX_ADDR_COUNT): - payload = addresses.encodeVarint( - len(peerList[i:i + MAX_ADDR_COUNT])) + payload = addresses.encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT])) for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]: - payload += struct.pack( - '>Q', timestamp) # 64-bit time + # 64-bit time + payload += struct.pack('>Q', timestamp) payload += struct.pack('>I', stream) - payload += struct.pack( - '>q', 1) # service bit flags offered by this node + # service bit flags offered by this node + payload += struct.pack('>q', 1) payload += encodeHost(peer.host) - payload += struct.pack('>H', peer.port) # remote port + # remote port + payload += struct.pack('>H', peer.port) retval += CreatePacket('addr', payload) return retval diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index 3337c0f0..41757f37 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -1,56 +1,11 @@ +""" +Basic infrastructure for asynchronous socket service clients and servers. +""" # -*- Mode: Python -*- # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp # Author: Sam Rushing -# pylint: disable=too-many-statements,too-many-branches,no-self-use,too-many-lines,attribute-defined-outside-init -# pylint: disable=global-statement -""" -src/network/asyncore_pollchoose.py -================================== - -# ====================================================================== -# Copyright 1996 by Sam Rushing -# -# All Rights Reserved -# -# Permission to use, copy, modify, and distribute this software and -# its documentation for any purpose and without fee is hereby -# granted, provided that the above copyright notice appear in all -# copies and that both that copyright notice and this permission -# notice appear in supporting documentation, and that the name of Sam -# Rushing not be used in advertising or publicity pertaining to -# distribution of the software without specific, written prior -# permission. -# -# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, -# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN -# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR -# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS -# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, -# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN -# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -# ====================================================================== - -Basic infrastructure for asynchronous socket service clients and servers. - -There are only two ways to have a program on a single processor do "more -than one thing at a time". Multi-threaded programming is the simplest and -most popular way to do it, but there is another very different technique, -that lets you have nearly all the advantages of multi-threading, without -actually using multiple threads. it's really only practical if your program -is largely I/O bound. If your program is CPU bound, then pre-emptive -scheduled threads are probably what you really need. Network servers are -rarely CPU-bound, however. - -If your operating system supports the select() system call in its I/O -library (and nearly all do), then you can use it to juggle multiple -communication channels at once; doing other work while your I/O is taking -place in the "background." Although this strategy can seem strange and -complex, especially at first, it is in many ways easier to understand and -control than multi-threaded programming. The module documented here solves -many of the difficult problems for you, making the task of building -sophisticated high-performance network servers and clients a snap. -""" - +# pylint: disable=too-many-branches,too-many-lines,global-statement +# pylint: disable=redefined-builtin,no-self-use import os import select import socket @@ -58,8 +13,9 @@ import sys import time import warnings from errno import ( - EADDRINUSE, EAGAIN, EALREADY, EBADF, ECONNABORTED, ECONNREFUSED, ECONNRESET, EHOSTUNREACH, EINPROGRESS, EINTR, - EINVAL, EISCONN, ENETUNREACH, ENOTCONN, ENOTSOCK, EPIPE, ESHUTDOWN, ETIMEDOUT, EWOULDBLOCK, errorcode + EADDRINUSE, EAGAIN, EALREADY, EBADF, ECONNABORTED, ECONNREFUSED, + ECONNRESET, EHOSTUNREACH, EINPROGRESS, EINTR, EINVAL, EISCONN, ENETUNREACH, + ENOTCONN, ENOTSOCK, EPIPE, ESHUTDOWN, ETIMEDOUT, EWOULDBLOCK, errorcode ) from threading import current_thread @@ -107,7 +63,8 @@ def _strerror(err): class ExitNow(Exception): - """We don't use directly but may be necessary as we replace asyncore due to some library raising or expecting it""" + """We don't use directly but may be necessary as we replace + asyncore due to some library raising or expecting it""" pass @@ -152,7 +109,8 @@ def write(obj): def set_rates(download, upload): """Set throttling rates""" - global maxDownloadRate, maxUploadRate, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp + global maxDownloadRate, maxUploadRate, downloadBucket + global uploadBucket, downloadTimestamp, uploadTimestamp maxDownloadRate = float(download) * 1024 maxUploadRate = float(upload) * 1024 @@ -182,7 +140,8 @@ def update_received(download=0): currentTimestamp = time.time() receivedBytes += download if maxDownloadRate > 0: - bucketIncrease = maxDownloadRate * (currentTimestamp - downloadTimestamp) + bucketIncrease = \ + maxDownloadRate * (currentTimestamp - downloadTimestamp) downloadBucket += bucketIncrease if downloadBucket > maxDownloadRate: downloadBucket = int(maxDownloadRate) @@ -242,7 +201,6 @@ def readwrite(obj, flags): def select_poller(timeout=0.0, map=None): """A poller which uses select(), available on most platforms.""" - # pylint: disable=redefined-builtin if map is None: map = socket_map @@ -298,7 +256,6 @@ def select_poller(timeout=0.0, map=None): def poll_poller(timeout=0.0, map=None): """A poller which uses poll(), available on most UNIXen.""" - # pylint: disable=redefined-builtin if map is None: map = socket_map @@ -356,7 +313,6 @@ poll2 = poll3 = poll_poller def epoll_poller(timeout=0.0, map=None): """A poller which uses epoll(), supported on Linux 2.5.44 and newer.""" - # pylint: disable=redefined-builtin if map is None: map = socket_map @@ -412,7 +368,7 @@ def epoll_poller(timeout=0.0, map=None): def kqueue_poller(timeout=0.0, map=None): """A poller which uses kqueue(), BSD specific.""" - # pylint: disable=redefined-builtin,no-member + # pylint: disable=no-member,too-many-statements if map is None: map = socket_map @@ -440,14 +396,20 @@ def kqueue_poller(timeout=0.0, map=None): poller_flags |= select.KQ_EV_ENABLE else: poller_flags |= select.KQ_EV_DISABLE - updates.append(select.kevent(fd, filter=select.KQ_FILTER_READ, flags=poller_flags)) + updates.append( + select.kevent( + fd, filter=select.KQ_FILTER_READ, + flags=poller_flags)) if kq_filter & 2 != obj.poller_filter & 2: poller_flags = select.KQ_EV_ADD if kq_filter & 2: poller_flags |= select.KQ_EV_ENABLE else: poller_flags |= select.KQ_EV_DISABLE - updates.append(select.kevent(fd, filter=select.KQ_FILTER_WRITE, flags=poller_flags)) + updates.append( + select.kevent( + fd, filter=select.KQ_FILTER_WRITE, + flags=poller_flags)) obj.poller_filter = kq_filter if not selectables: @@ -481,7 +443,6 @@ def kqueue_poller(timeout=0.0, map=None): def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None): """Poll in a loop, until count or timeout is reached""" - # pylint: disable=redefined-builtin if map is None: map = socket_map @@ -520,9 +481,9 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None): count = count - 1 -class dispatcher: +class dispatcher(object): """Dispatcher for socket objects""" - # pylint: disable=too-many-public-methods,too-many-instance-attributes,old-style-class + # pylint: disable=too-many-public-methods,too-many-instance-attributes debug = False connected = False @@ -537,7 +498,6 @@ class dispatcher: minTx = 1500 def __init__(self, sock=None, map=None): - # pylint: disable=redefined-builtin if map is None: self._map = socket_map else: @@ -586,8 +546,7 @@ class dispatcher: def add_channel(self, map=None): """Add a channel""" - # pylint: disable=redefined-builtin - + # pylint: disable=attribute-defined-outside-init if map is None: map = self._map map[self._fileno] = self @@ -596,8 +555,6 @@ class dispatcher: def del_channel(self, map=None): """Delete a channel""" - # pylint: disable=redefined-builtin - fd = self._fileno if map is None: map = self._map @@ -605,12 +562,14 @@ class dispatcher: del map[fd] if self._fileno: try: - kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0) - except (AttributeError, KeyError, TypeError, IOError, OSError): + kqueue_poller.pollster.control([select.kevent( + fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0) + except(AttributeError, KeyError, TypeError, IOError, OSError): pass try: - kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0) - except (AttributeError, KeyError, TypeError, IOError, OSError): + kqueue_poller.pollster.control([select.kevent( + fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0) + except(AttributeError, KeyError, TypeError, IOError, OSError): pass try: epoll_poller.pollster.unregister(fd) @@ -627,8 +586,10 @@ class dispatcher: self.poller_filter = 0 self.poller_registered = False - def create_socket(self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM): + def create_socket( + self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM): """Create a socket""" + # pylint: disable=attribute-defined-outside-init self.family_and_type = family, socket_type sock = socket.socket(family, socket_type) sock.setblocking(0) @@ -636,20 +597,16 @@ class dispatcher: def set_socket(self, sock, map=None): """Set socket""" - # pylint: disable=redefined-builtin - self.socket = sock self._fileno = sock.fileno() self.add_channel(map) def set_reuse_addr(self): """try to re-use a server port if possible""" - try: self.socket.setsockopt( - socket.SOL_SOCKET, socket.SO_REUSEADDR, - self.socket.getsockopt(socket.SOL_SOCKET, - socket.SO_REUSEADDR) | 1 + socket.SOL_SOCKET, socket.SO_REUSEADDR, self.socket.getsockopt( + socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1 ) except socket.error: pass @@ -704,13 +661,16 @@ class dispatcher: raise socket.error(err, errorcode[err]) def accept(self): - """Accept incoming connections. Returns either an address pair or None.""" + """Accept incoming connections. + Returns either an address pair or None.""" try: conn, addr = self.socket.accept() except TypeError: return None except socket.error as why: - if why.args[0] in (EWOULDBLOCK, WSAEWOULDBLOCK, ECONNABORTED, EAGAIN, ENOTCONN): + if why.args[0] in ( + EWOULDBLOCK, WSAEWOULDBLOCK, ECONNABORTED, + EAGAIN, ENOTCONN): return None else: raise @@ -769,11 +729,12 @@ class dispatcher: try: retattr = getattr(self.socket, attr) except AttributeError: - raise AttributeError("%s instance has no attribute '%s'" - % (self.__class__.__name__, attr)) + raise AttributeError( + "%s instance has no attribute '%s'" + % (self.__class__.__name__, attr)) else: - msg = "%(me)s.%(attr)s is deprecated; use %(me)s.socket.%(attr)s " \ - "instead" % {'me': self.__class__.__name__, 'attr': attr} + msg = "%(me)s.%(attr)s is deprecated; use %(me)s.socket.%(attr)s"\ + " instead" % {'me': self.__class__.__name__, 'attr': attr} warnings.warn(msg, DeprecationWarning, stacklevel=2) return retattr @@ -855,13 +816,8 @@ class dispatcher: self.log_info( 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( - self_repr, - t, - v, - tbinfo - ), - 'error' - ) + self_repr, t, v, tbinfo), + 'error') self.handle_close() def handle_accept(self): @@ -902,11 +858,8 @@ class dispatcher_with_send(dispatcher): adds simple buffered output capability, useful for simple clients. [for more sophisticated usage use asynchat.async_chat] """ - # pylint: disable=redefined-builtin def __init__(self, sock=None, map=None): - # pylint: disable=redefined-builtin - dispatcher.__init__(self, sock, map) self.out_buffer = b'' @@ -941,7 +894,8 @@ def compact_traceback(): """Return a compact traceback""" t, v, tb = sys.exc_info() tbinfo = [] - if not tb: # Must have a traceback + # Must have a traceback + if not tb: raise AssertionError("traceback does not exist") while tb: tbinfo.append(( @@ -961,7 +915,6 @@ def compact_traceback(): def close_all(map=None, ignore_all=False): """Close all connections""" - # pylint: disable=redefined-builtin if map is None: map = socket_map @@ -998,13 +951,13 @@ def close_all(map=None, ignore_all=False): if os.name == 'posix': import fcntl - class file_wrapper: + class file_wrapper: # pylint: disable=old-style-class """ - Here we override just enough to make a file look like a socket for the purposes of asyncore. + Here we override just enough to make a file look + like a socket for the purposes of asyncore. The passed fd is automatically os.dup()'d """ - # pylint: disable=old-style-class def __init__(self, fd): self.fd = os.dup(fd) @@ -1019,12 +972,11 @@ if os.name == 'posix': def getsockopt(self, level, optname, buflen=None): """Fake getsockopt()""" - if (level == socket.SOL_SOCKET and - optname == socket.SO_ERROR and + if (level == socket.SOL_SOCKET and optname == socket.SO_ERROR and not buflen): return 0 - raise NotImplementedError("Only asyncore specific behaviour " - "implemented.") + raise NotImplementedError( + "Only asyncore specific behaviour implemented.") read = recv write = send @@ -1041,8 +993,6 @@ if os.name == 'posix': """A dispatcher for file_wrapper objects""" def __init__(self, fd, map=None): - # pylint: disable=redefined-builtin - dispatcher.__init__(self, None, map) self.connected = True try: diff --git a/src/network/bmobject.py b/src/network/bmobject.py index ac6429e4..12b997d7 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -1,7 +1,6 @@ """ BMObject and it's exceptions. """ - import logging import time @@ -15,12 +14,14 @@ logger = logging.getLogger('default') class BMObjectInsufficientPOWError(Exception): - """Exception indicating the object doesn't have sufficient proof of work.""" + """Exception indicating the object + doesn't have sufficient proof of work.""" errorCodes = ("Insufficient proof of work") class BMObjectInvalidDataError(Exception): - """Exception indicating the data being parsed does not match the specification.""" + """Exception indicating the data being parsed + does not match the specification.""" errorCodes = ("Data invalid") @@ -30,7 +31,8 @@ class BMObjectExpiredError(Exception): class BMObjectUnwantedStreamError(Exception): - """Exception indicating the object is in a stream we didn't advertise as being interested in.""" + """Exception indicating the object is in a stream + we didn't advertise as being interested in.""" errorCodes = ("Object in unwanted stream") @@ -44,9 +46,8 @@ class BMObjectAlreadyHaveError(Exception): errorCodes = ("Already have this object") -class BMObject(object): +class BMObject(object): # pylint: disable=too-many-instance-attributes """Bitmessage Object as a class.""" - # pylint: disable=too-many-instance-attributes # max TTL, 28 days and 3 hours maxTTL = 28 * 24 * 60 * 60 + 10800 @@ -81,31 +82,36 @@ class BMObject(object): raise BMObjectInsufficientPOWError() def checkEOLSanity(self): - """Check if object's lifetime isn't ridiculously far in the past or future.""" + """Check if object's lifetime + isn't ridiculously far in the past or future.""" # EOL sanity check if self.expiresTime - int(time.time()) > BMObject.maxTTL: logger.info( - 'This object\'s End of Life time is too far in the future. Ignoring it. Time is %i', - self.expiresTime) + 'This object\'s End of Life time is too far in the future.' + ' Ignoring it. Time is %i', self.expiresTime) # .. todo:: remove from download queue raise BMObjectExpiredError() if self.expiresTime - int(time.time()) < BMObject.minTTL: logger.info( - 'This object\'s End of Life time was too long ago. Ignoring the object. Time is %i', - self.expiresTime) + 'This object\'s End of Life time was too long ago.' + ' Ignoring the object. Time is %i', self.expiresTime) # .. todo:: remove from download queue raise BMObjectExpiredError() def checkStream(self): """Check if object's stream matches streams we are interested in""" if self.streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %i isn\'t one we are interested in.', self.streamNumber) + logger.debug( + 'The streamNumber %i isn\'t one we are interested in.', + self.streamNumber) raise BMObjectUnwantedStreamError() def checkAlreadyHave(self): """ - Check if we already have the object (so that we don't duplicate it in inventory or advertise it unnecessarily) + Check if we already have the object + (so that we don't duplicate it in inventory + or advertise it unnecessarily) """ # if it's a stem duplicate, pretend we don't have it if Dandelion().hasHash(self.inventoryHash): @@ -114,7 +120,8 @@ class BMObject(object): raise BMObjectAlreadyHaveError() def checkObjectByType(self): - """Call a object type specific check (objects can have additional checks based on their types)""" + """Call a object type specific check + (objects can have additional checks based on their types)""" if self.objectType == protocol.OBJECT_GETPUBKEY: self.checkGetpubkey() elif self.objectType == protocol.OBJECT_PUBKEY: @@ -125,20 +132,21 @@ class BMObject(object): self.checkBroadcast() # other objects don't require other types of tests - def checkMessage(self): + def checkMessage(self): # pylint: disable=no-self-use """"Message" object type checks.""" - # pylint: disable=no-self-use return def checkGetpubkey(self): """"Getpubkey" object type checks.""" if len(self.data) < 42: - logger.info('getpubkey message doesn\'t contain enough data. Ignoring.') + logger.info( + 'getpubkey message doesn\'t contain enough data. Ignoring.') raise BMObjectInvalidError() def checkPubkey(self): """"Pubkey" object type checks.""" - if len(self.data) < 146 or len(self.data) > 440: # sanity check + # sanity check + if len(self.data) < 146 or len(self.data) > 440: logger.info('pubkey object too short or too long. Ignoring.') raise BMObjectInvalidError() @@ -146,8 +154,9 @@ class BMObject(object): """"Broadcast" object type checks.""" if len(self.data) < 180: logger.debug( - 'The payload length of this broadcast packet is unreasonably low.' - ' Someone is probably trying funny business. Ignoring message.') + 'The payload length of this broadcast' + ' packet is unreasonably low. Someone is probably' + ' trying funny business. Ignoring message.') raise BMObjectInvalidError() # this isn't supported anymore diff --git a/src/network/bmproto.py b/src/network/bmproto.py index d620daa3..d5d3dbe3 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -1,8 +1,7 @@ """ -src/network/bmproto.py -================================== +Bitmessage Protocol """ -# pylint: disable=attribute-defined-outside-init +# pylint: disable=attribute-defined-outside-init, too-few-public-methods import base64 import hashlib import logging @@ -19,17 +18,16 @@ import state from bmconfigparser import BMConfigParser from inventory import Inventory from network.advanceddispatcher import AdvancedDispatcher -from network.constants import ( - ADDRESS_ALIVE, - MAX_MESSAGE_SIZE, - MAX_OBJECT_COUNT, - MAX_OBJECT_PAYLOAD_SIZE, - MAX_TIME_OFFSET) -from network.dandelion import Dandelion from network.bmobject import ( BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, - BMObjectInvalidError, BMObjectAlreadyHaveError) + BMObjectInvalidError, BMObjectAlreadyHaveError +) +from network.constants import ( + ADDRESS_ALIVE, MAX_MESSAGE_SIZE, MAX_OBJECT_COUNT, + MAX_OBJECT_PAYLOAD_SIZE, MAX_TIME_OFFSET +) +from network.dandelion import Dandelion from network.proxy import ProxyError from node import Node, Peer from objectracker import missingObjects, ObjectTracker @@ -59,7 +57,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # pylint: disable=too-many-instance-attributes, too-many-public-methods timeOffsetWrongCount = 0 - def __init__(self, address=None, sock=None): # pylint: disable=unused-argument, super-init-not-called + def __init__(self, address=None, sock=None): + # pylint: disable=unused-argument, super-init-not-called AdvancedDispatcher.__init__(self, sock) self.isOutbound = False # packet/connection from a local IP @@ -163,7 +162,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def decode_payload_varint(self): """Decode a varint from the payload""" - value, offset = addresses.decodeVarint(self.payload[self.payloadOffset:]) + value, offset = addresses.decodeVarint( + self.payload[self.payloadOffset:]) self.payloadOffset += offset return value @@ -185,8 +185,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return Node(services, host, port) - def decode_payload_content(self, pattern="v"): # pylint: disable=too-many-branches, too-many-statements - + # pylint: disable=too-many-branches, too-many-statements + def decode_payload_content(self, pattern="v"): """ Decode the payload depending on pattern: @@ -202,7 +202,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): , = end of array """ - def decode_simple(self, char="v"): # pylint: disable=inconsistent-return-statements + # pylint: disable=inconsistent-return-statements + def decode_simple(self, char="v"): """Decode the payload using one char pattern""" if char == "v": return self.decode_payload_varint() @@ -312,8 +313,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def bm_command_error(self): """Decode an error message and log it""" - fatalStatus, banTime, inventoryVector, errorText = \ - self.decode_payload_content("vvlsls") + err_values = self.decode_payload_content("vvlsls") + fatalStatus = err_values[0] + # banTime = err_values[1] + # inventoryVector = err_values[2] + errorText = err_values[3] logger.error( '%s:%i error: %i, %s', self.destination.host, self.destination.port, fatalStatus, errorText) @@ -408,8 +412,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker): except KeyError: pass - if self.object.inventoryHash in Inventory() and Dandelion().hasHash(self.object.inventoryHash): - Dandelion().removeHash(self.object.inventoryHash, "cycle detection") + if self.object.inventoryHash in Inventory() and Dandelion().hasHash( + self.object.inventoryHash): + Dandelion().removeHash( + self.object.inventoryHash, "cycle detection") Inventory()[self.object.inventoryHash] = ( self.object.objectType, self.object.streamNumber, @@ -428,27 +434,30 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def bm_command_addr(self): """Incoming addresses, process them""" - addresses = self._decode_addr() # pylint: disable=redefined-outer-name - for i in addresses: - seenTime, stream, services, ip, port = i + # pylint: disable=redefined-outer-name + addresses = self._decode_addr() + for seenTime, stream, _, ip, port in addresses: decodedIP = protocol.checkIPAddress(str(ip)) if stream not in state.streamsInWhichIAmParticipating: continue if ( - decodedIP and time.time() - seenTime > 0 and - seenTime > time.time() - ADDRESS_ALIVE and - port > 0 + decodedIP and time.time() - seenTime > 0 and + seenTime > time.time() - ADDRESS_ALIVE and + port > 0 ): peer = Peer(decodedIP, port) try: - if knownnodes.knownNodes[stream][peer]["lastseen"] > seenTime: + if knownnodes.knownNodes[stream][peer]["lastseen"] > \ + seenTime: continue except KeyError: pass - if len(knownnodes.knownNodes[stream]) < BMConfigParser().safeGetInt("knownnodes", "maxnodes"): + if len(knownnodes.knownNodes[stream]) < \ + BMConfigParser().safeGetInt("knownnodes", "maxnodes"): with knownnodes.knownNodesLock: try: - knownnodes.knownNodes[stream][peer]["lastseen"] = seenTime + knownnodes.knownNodes[stream][peer]["lastseen"] = \ + seenTime except (TypeError, KeyError): knownnodes.knownNodes[stream][peer] = { "lastseen": seenTime, @@ -539,7 +548,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): length=self.payloadLength, expectBytes=0) return False - def peerValidityChecks(self): # pylint: disable=too-many-return-statements + # pylint: disable=too-many-return-statements + def peerValidityChecks(self): """Check the validity of the peer""" if self.remoteProtocolVersion < 3: self.append_write_buf(protocol.assembleErrorMessage( @@ -551,8 +561,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return False if self.timeOffset > MAX_TIME_OFFSET: self.append_write_buf(protocol.assembleErrorMessage( - errorText="Your time is too far in the future compared to mine." - " Closing connection.", fatal=2)) + errorText="Your time is too far in the future" + " compared to mine. Closing connection.", fatal=2)) logger.info( "%s's time is too far in the future (%s seconds)." " Closing connection to it.", self.destination, self.timeOffset) @@ -574,8 +584,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): errorText="We don't have shared stream interests." " Closing connection.", fatal=2)) logger.debug( - 'Closed connection to %s because there is no overlapping interest' - ' in streams.', self.destination) + 'Closed connection to %s because there is no overlapping' + ' interest in streams.', self.destination) return False if self.destination in connectionpool.BMConnectionPool().inboundConnections: try: @@ -584,8 +594,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): errorText="Too many connections from your IP." " Closing connection.", fatal=2)) logger.debug( - 'Closed connection to %s because we are already connected' - ' to that IP.', self.destination) + 'Closed connection to %s because we are already' + ' connected to that IP.', self.destination) return False except: pass diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index 9d2f85d6..badd98b7 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -1,3 +1,6 @@ +""" +Select which node to connect to +""" # pylint: disable=too-many-branches import logging import random # nosec @@ -12,6 +15,7 @@ logger = logging.getLogger('default') def getDiscoveredPeer(): + """Get a peer from the local peer discovery list""" try: peer = random.choice(state.discoveredPeers.keys()) except (IndexError, KeyError): @@ -24,6 +28,7 @@ def getDiscoveredPeer(): def chooseConnection(stream): + """Returns an appropriate connection""" haveOnion = BMConfigParser().safeGet( "bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS' onionOnly = BMConfigParser().safeGetBoolean( @@ -49,7 +54,8 @@ def chooseConnection(stream): logger.warning('Error in %s', peer) rating = 0 if haveOnion: - # do not connect to raw IP addresses--keep all traffic within Tor overlay + # do not connect to raw IP addresses + # --keep all traffic within Tor overlay if onionOnly and not peer.host.endswith('.onion'): continue # onion addresses have a higher priority when SOCKS diff --git a/src/network/constants.py b/src/network/constants.py index a3414ef3..f8f4120f 100644 --- a/src/network/constants.py +++ b/src/network/constants.py @@ -3,9 +3,15 @@ Network protocol constants """ -ADDRESS_ALIVE = 10800 #: address is online if online less than this many seconds ago -MAX_ADDR_COUNT = 1000 #: protocol specification says max 1000 addresses in one addr command -MAX_MESSAGE_SIZE = 1600100 #: ~1.6 MB which is the maximum possible size of an inv message. -MAX_OBJECT_PAYLOAD_SIZE = 2**18 #: 2**18 = 256kB is the maximum size of an object payload -MAX_OBJECT_COUNT = 50000 #: protocol specification says max 50000 objects in one inv command -MAX_TIME_OFFSET = 3600 #: maximum time offset +#: address is online if online less than this many seconds ago +ADDRESS_ALIVE = 10800 +#: protocol specification says max 1000 addresses in one addr command +MAX_ADDR_COUNT = 1000 +#: ~1.6 MB which is the maximum possible size of an inv message. +MAX_MESSAGE_SIZE = 1600100 +#: 2**18 = 256kB is the maximum size of an object payload +MAX_OBJECT_PAYLOAD_SIZE = 2**18 +#: protocol specification says max 50000 objects in one inv command +MAX_OBJECT_COUNT = 50000 +#: maximum time offset +MAX_TIME_OFFSET = 3600 diff --git a/src/network/dandelion.py b/src/network/dandelion.py index 0f68cc24..03f45bd7 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -1,10 +1,9 @@ """ -src/network/dandelion.py -======================== +Dandelion class definition, tracks stages """ import logging from collections import namedtuple -from random import choice, sample, expovariate +from random import choice, expovariate, sample from threading import RLock from time import time @@ -28,7 +27,7 @@ logger = logging.getLogger('default') @Singleton -class Dandelion(): # pylint: disable=old-style-class +class Dandelion: # pylint: disable=old-style-class """Dandelion class for tracking stem/fluff stages.""" def __init__(self): # currently assignable child stems @@ -123,7 +122,8 @@ class Dandelion(): # pylint: disable=old-style-class self.stem.remove(connection) # active mappings to pointing to the removed node for k in ( - k for k, v in self.nodeMap.iteritems() if v == connection + k for k, v in self.nodeMap.iteritems() + if v == connection ): self.nodeMap[k] = None for k, v in { diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index e882f6de..0ae83b5b 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -1,7 +1,6 @@ """ `DownloadThread` class definition """ - import time import addresses @@ -30,7 +29,9 @@ class DownloadThread(StoppableThread): """Expire pending downloads eventually""" deadline = time.time() - self.requestExpires try: - toDelete = [k for k, v in missingObjects.iteritems() if v < deadline] + toDelete = [ + k for k, v in missingObjects.iteritems() + if v < deadline] except RuntimeError: pass else: diff --git a/src/network/invthread.py b/src/network/invthread.py index d5690486..e68b7692 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -1,6 +1,5 @@ """ -src/network/invthread.py -======================== +Thread to send inv annoucements """ import Queue import random @@ -34,7 +33,7 @@ def handleExpiredDandelion(expired): class InvThread(StoppableThread): - """A thread to send inv annoucements.""" + """Main thread that sends inv annoucements""" name = "InvBroadcaster" @@ -43,12 +42,13 @@ class InvThread(StoppableThread): """Locally generated inventory items require special handling""" Dandelion().addHash(hashId, stream=stream) for connection in BMConnectionPool().connections(): - if state.dandelion and connection != Dandelion().objectChildStem(hashId): + if state.dandelion and connection != \ + Dandelion().objectChildStem(hashId): continue connection.objectsNewToThem[hashId] = time() - def run(self): # pylint: disable=too-many-branches - while not state.shutdown: # pylint: disable=too-many-nested-blocks + def run(self): # pylint: disable=too-many-branches + while not state.shutdown: # pylint: disable=too-many-nested-blocks chunk = [] while True: # Dandelion fluff trigger by expiration @@ -92,15 +92,17 @@ class InvThread(StoppableThread): random.shuffle(fluffs) connection.append_write_buf(protocol.CreatePacket( 'inv', - addresses.encodeVarint(len(fluffs)) + ''.join(fluffs))) + addresses.encodeVarint( + len(fluffs)) + ''.join(fluffs))) if stems: random.shuffle(stems) connection.append_write_buf(protocol.CreatePacket( 'dinv', - addresses.encodeVarint(len(stems)) + ''.join(stems))) + addresses.encodeVarint( + len(stems)) + ''.join(stems))) invQueue.iterate() - for i in range(len(chunk)): + for _ in range(len(chunk)): invQueue.task_done() if Dandelion().refresh < time(): From 22e22633c213f5b4b01f5e56ef76c8e7639c17e5 Mon Sep 17 00:00:00 2001 From: lakshyacis Date: Fri, 10 Jan 2020 19:37:51 +0530 Subject: [PATCH 2/2] Added License --- LICENSE | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/LICENSE b/LICENSE index d6df32b5..2c718643 100644 --- a/LICENSE +++ b/LICENSE @@ -47,3 +47,24 @@ Redistribution and use in source and binary forms, with or without modification, 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +===== based on asyncore_pollchoose.py asyncore_pollchoose python implementation. by Sam Rushing + +Copyright 1996 by Sam Rushing. All Rights Reserved + +Permission to use, copy, modify, and distribute this software and +its documentation for any purpose and without fee is hereby +granted, provided that the above copyright notice appear in all +copies and that both that copyright notice and this permission +notice appear in supporting documentation, and that the name of Sam +Rushing not be used in advertising or publicity pertaining to +distribution of the software without specific, written prior +permission. + +SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, +INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN +NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR +CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS +OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, +NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.