diff --git a/src/defaults.py b/src/defaults.py index 8401bf2e..d10f9000 100644 --- a/src/defaults.py +++ b/src/defaults.py @@ -1,3 +1,8 @@ +""" +src/defaults.py +=============== +""" + # sanity check, prevent doing ridiculous PoW # 20 million PoWs equals approximately 2 days on dev's dual R9 290 ridiculousDifficulty = 20000000 @@ -7,7 +12,13 @@ ridiculousDifficulty = 20000000 # namecoin integration to "namecoind". namecoinDefaultRpcPort = "8336" -#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them! -networkDefaultProofOfWorkNonceTrialsPerByte = 1000 #The amount of work that should be performed (and demanded) per byte of the payload. -networkDefaultPayloadLengthExtraBytes = 1000 #To make sending short messages a little more difficult, this value is added to the payload length for use in calculating the proof of work target. - +# If changed, these values will cause particularly unexpected behavior: +# You won't be able to either send or receive messages because the proof +# of work you do (or demand) won't match that done or demanded by others. +# Don't change them! +# The amount of work that should be performed (and demanded) per byte of the payload. +networkDefaultProofOfWorkNonceTrialsPerByte = 1000 +# To make sending short messages a little more difficult, this value is +# added to the payload length for use in calculating the proof of work +# target. +networkDefaultPayloadLengthExtraBytes = 1000 diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index e50be61b..3337c0f0 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -1,6 +1,11 @@ # -*- Mode: Python -*- # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp # Author: Sam Rushing +# pylint: disable=too-many-statements,too-many-branches,no-self-use,too-many-lines,attribute-defined-outside-init +# pylint: disable=global-statement +""" +src/network/asyncore_pollchoose.py +================================== # ====================================================================== # Copyright 1996 by Sam Rushing @@ -25,7 +30,7 @@ # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # ====================================================================== -"""Basic infrastructure for asynchronous socket service clients and servers. +Basic infrastructure for asynchronous socket service clients and servers. There are only two ways to have a program on a single processor do "more than one thing at a time". Multi-threaded programming is the simplest and @@ -46,22 +51,20 @@ many of the difficult problems for you, making the task of building sophisticated high-performance network servers and clients a snap. """ -# randomise object order for bandwidth balancing -import random +import os import select import socket import sys import time -from threading import current_thread import warnings +from errno import ( + EADDRINUSE, EAGAIN, EALREADY, EBADF, ECONNABORTED, ECONNREFUSED, ECONNRESET, EHOSTUNREACH, EINPROGRESS, EINTR, + EINVAL, EISCONN, ENETUNREACH, ENOTCONN, ENOTSOCK, EPIPE, ESHUTDOWN, ETIMEDOUT, EWOULDBLOCK, errorcode +) +from threading import current_thread -import os import helper_random -from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \ - ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \ - ECONNREFUSED, EHOSTUNREACH, ENETUNREACH, ENOTSOCK, EINTR, ETIMEDOUT, \ - EADDRINUSE, \ - errorcode + try: from errno import WSAEWOULDBLOCK except (ImportError, AttributeError): @@ -75,13 +78,15 @@ try: except (ImportError, AttributeError): WSAECONNRESET = ECONNRESET try: - from errno import WSAEADDRINUSE + # Desirable side-effects on Windows; imports winsock error numbers + from errno import WSAEADDRINUSE # pylint: disable=unused-import except (ImportError, AttributeError): WSAEADDRINUSE = EADDRINUSE -_DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, - EBADF, ECONNREFUSED, EHOSTUNREACH, ENETUNREACH, ETIMEDOUT, - WSAECONNRESET)) + +_DISCONNECTED = frozenset(( + ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, EBADF, ECONNREFUSED, + EHOSTUNREACH, ENETUNREACH, ETIMEDOUT, WSAECONNRESET)) OP_READ = 1 OP_WRITE = 2 @@ -91,17 +96,21 @@ try: except NameError: socket_map = {} + def _strerror(err): try: return os.strerror(err) except (ValueError, OverflowError, NameError): if err in errorcode: return errorcode[err] - return "Unknown error %s" %err + return "Unknown error %s" % err + class ExitNow(Exception): + """We don't use directly but may be necessary as we replace asyncore due to some library raising or expecting it""" pass + _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit) maxDownloadRate = 0 @@ -113,28 +122,38 @@ uploadTimestamp = 0 uploadBucket = 0 sentBytes = 0 + def read(obj): + """Event to read from the object, i.e. its network socket.""" + if not can_receive(): return try: obj.handle_read_event() except _reraised_exceptions: raise - except: + except BaseException: obj.handle_error() + def write(obj): + """Event to write to the object, i.e. its network socket.""" + if not can_send(): return try: obj.handle_write_event() except _reraised_exceptions: raise - except: + except BaseException: obj.handle_error() + def set_rates(download, upload): + """Set throttling rates""" + global maxDownloadRate, maxUploadRate, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp + maxDownloadRate = float(download) * 1024 maxUploadRate = float(upload) * 1024 downloadBucket = maxDownloadRate @@ -142,14 +161,24 @@ def set_rates(download, upload): downloadTimestamp = time.time() uploadTimestamp = time.time() + def can_receive(): + """Predicate indicating whether the download throttle is in effect""" + return maxDownloadRate == 0 or downloadBucket > 0 + def can_send(): + """Predicate indicating whether the upload throttle is in effect""" + return maxUploadRate == 0 or uploadBucket > 0 + def update_received(download=0): + """Update the receiving throttle""" + global receivedBytes, downloadBucket, downloadTimestamp + currentTimestamp = time.time() receivedBytes += download if maxDownloadRate > 0: @@ -160,8 +189,12 @@ def update_received(download=0): downloadBucket -= download downloadTimestamp = currentTimestamp + def update_sent(upload=0): + """Update the sending throttle""" + global sentBytes, uploadBucket, uploadTimestamp + currentTimestamp = time.time() sentBytes += upload if maxUploadRate > 0: @@ -172,15 +205,21 @@ def update_sent(upload=0): uploadBucket -= upload uploadTimestamp = currentTimestamp + def _exception(obj): + """Handle exceptions as appropriate""" + try: obj.handle_expt_event() except _reraised_exceptions: raise - except: + except BaseException: obj.handle_error() + def readwrite(obj, flags): + """Read and write any pending data to/from the object""" + try: if flags & select.POLLIN and can_receive(): obj.handle_read_event() @@ -197,15 +236,20 @@ def readwrite(obj, flags): obj.handle_close() except _reraised_exceptions: raise - except: + except BaseException: obj.handle_error() + def select_poller(timeout=0.0, map=None): """A poller which uses select(), available on most platforms.""" + # pylint: disable=redefined-builtin + if map is None: map = socket_map if map: - r = []; w = []; e = [] + r = [] + w = [] + e = [] for fd, obj in list(map.items()): is_r = obj.readable() is_w = obj.writable() @@ -251,13 +295,16 @@ def select_poller(timeout=0.0, map=None): else: current_thread().stop.wait(timeout) + def poll_poller(timeout=0.0, map=None): """A poller which uses poll(), available on most UNIXen.""" + # pylint: disable=redefined-builtin + if map is None: map = socket_map if timeout is not None: # timeout is in milliseconds - timeout = int(timeout*1000) + timeout = int(timeout * 1000) try: poll_poller.pollster except AttributeError: @@ -301,12 +348,16 @@ def poll_poller(timeout=0.0, map=None): else: current_thread().stop.wait(timeout) + # Aliases for backward compatibility poll = select_poller poll2 = poll3 = poll_poller + def epoll_poller(timeout=0.0, map=None): """A poller which uses epoll(), supported on Linux 2.5.44 and newer.""" + # pylint: disable=redefined-builtin + if map is None: map = socket_map try: @@ -346,7 +397,7 @@ def epoll_poller(timeout=0.0, map=None): if e.errno != EINTR: raise r = [] - except select.error, err: + except select.error as err: if err.args[0] != EINTR: raise r = [] @@ -354,12 +405,15 @@ def epoll_poller(timeout=0.0, map=None): obj = map.get(fd) if obj is None: continue - readwrite(obj, flags) + readwrite(obj, flags) else: current_thread().stop.wait(timeout) + def kqueue_poller(timeout=0.0, map=None): """A poller which uses kqueue(), BSD specific.""" + # pylint: disable=redefined-builtin,no-member + if map is None: map = socket_map try: @@ -408,7 +462,7 @@ def kqueue_poller(timeout=0.0, map=None): for event in events: fd = event.ident - obj = map.get(fd) + obj = map.get(fd) if obj is None: continue if event.flags & select.KQ_EV_ERROR: @@ -425,13 +479,15 @@ def kqueue_poller(timeout=0.0, map=None): current_thread().stop.wait(timeout) -def loop(timeout=30.0, use_poll=False, map=None, count=None, - poller=None): +def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None): + """Poll in a loop, until count or timeout is reached""" + # pylint: disable=redefined-builtin + if map is None: map = socket_map if count is None: - count = True - # code which grants backward compatibility with "use_poll" + count = True + # code which grants backward compatibility with "use_poll" # argument which should no longer be used in favor of # "poller" @@ -460,10 +516,13 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, break # then poll poller(subtimeout, map) - if type(count) is int: + if isinstance(count, int): count = count - 1 + class dispatcher: + """Dispatcher for socket objects""" + # pylint: disable=too-many-public-methods,too-many-instance-attributes,old-style-class debug = False connected = False @@ -478,6 +537,7 @@ class dispatcher: minTx = 1500 def __init__(self, sock=None, map=None): + # pylint: disable=redefined-builtin if map is None: self._map = socket_map else: @@ -510,7 +570,7 @@ class dispatcher: self.socket = None def __repr__(self): - status = [self.__class__.__module__+"."+self.__class__.__name__] + status = [self.__class__.__module__ + "." + self.__class__.__name__] if self.accepting and self.addr: status.append('listening') elif self.connected: @@ -525,7 +585,9 @@ class dispatcher: __str__ = __repr__ def add_channel(self, map=None): - #self.log_info('adding channel %s' % self) + """Add a channel""" + # pylint: disable=redefined-builtin + if map is None: map = self._map map[self._fileno] = self @@ -533,11 +595,13 @@ class dispatcher: self.poller_filter = 0 def del_channel(self, map=None): + """Delete a channel""" + # pylint: disable=redefined-builtin + fd = self._fileno if map is None: map = self._map if fd in map: - #self.log_info('closing channel %d:%s' % (fd, self)) del map[fd] if self._fileno: try: @@ -564,25 +628,29 @@ class dispatcher: self.poller_registered = False def create_socket(self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM): + """Create a socket""" self.family_and_type = family, socket_type sock = socket.socket(family, socket_type) sock.setblocking(0) self.set_socket(sock) def set_socket(self, sock, map=None): + """Set socket""" + # pylint: disable=redefined-builtin + self.socket = sock -## self.__dict__['socket'] = sock self._fileno = sock.fileno() self.add_channel(map) def set_reuse_addr(self): - # try to re-use a server port if possible + """try to re-use a server port if possible""" + try: self.socket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1 - ) + ) except socket.error: pass @@ -593,11 +661,13 @@ class dispatcher: # ================================================== def readable(self): + """Predicate to indicate download throttle status""" if maxDownloadRate > 0: return downloadBucket > dispatcher.minTx return True def writable(self): + """Predicate to indicate upload throttle status""" if maxUploadRate > 0: return uploadBucket > dispatcher.minTx return True @@ -607,21 +677,24 @@ class dispatcher: # ================================================== def listen(self, num): + """Listen on a port""" self.accepting = True if os.name == 'nt' and num > 5: num = 5 return self.socket.listen(num) def bind(self, addr): + """Bind to an address""" self.addr = addr return self.socket.bind(addr) def connect(self, address): + """Connect to an address""" self.connected = False self.connecting = True err = self.socket.connect_ex(address) if err in (EINPROGRESS, EALREADY, EWOULDBLOCK, WSAEWOULDBLOCK) \ - or err == EINVAL and os.name in ('nt', 'ce'): + or err == EINVAL and os.name in ('nt', 'ce'): self.addr = address return if err in (0, EISCONN): @@ -631,7 +704,7 @@ class dispatcher: raise socket.error(err, errorcode[err]) def accept(self): - # XXX can return either an address pair or None + """Accept incoming connections. Returns either an address pair or None.""" try: conn, addr = self.socket.accept() except TypeError: @@ -645,6 +718,7 @@ class dispatcher: return conn, addr def send(self, data): + """Send data""" try: result = self.socket.send(data) return result @@ -658,6 +732,7 @@ class dispatcher: raise def recv(self, buffer_size): + """Receive data""" try: data = self.socket.recv(buffer_size) if not data: @@ -665,8 +740,7 @@ class dispatcher: # a read condition, and having recv() return 0. self.handle_close() return b'' - else: - return data + return data except socket.error as why: # winsock sometimes raises ENOTCONN if why.args[0] in (EAGAIN, EWOULDBLOCK, WSAEWOULDBLOCK): @@ -678,6 +752,7 @@ class dispatcher: raise def close(self): + """Close connection""" self.connected = False self.accepting = False self.connecting = False @@ -695,10 +770,10 @@ class dispatcher: retattr = getattr(self.socket, attr) except AttributeError: raise AttributeError("%s instance has no attribute '%s'" - %(self.__class__.__name__, attr)) + % (self.__class__.__name__, attr)) else: msg = "%(me)s.%(attr)s is deprecated; use %(me)s.socket.%(attr)s " \ - "instead" % {'me' : self.__class__.__name__, 'attr' : attr} + "instead" % {'me': self.__class__.__name__, 'attr': attr} warnings.warn(msg, DeprecationWarning, stacklevel=2) return retattr @@ -707,13 +782,16 @@ class dispatcher: # and 'log_info' is for informational, warning and error logging. def log(self, message): + """Log a message to stderr""" sys.stderr.write('log: %s\n' % str(message)) def log_info(self, message, log_type='info'): + """Conditionally print a message""" if log_type not in self.ignore_log_types: - print('%s: %s' % (log_type, message)) + print '%s: %s' % (log_type, message) def handle_read_event(self): + """Handle a read event""" if self.accepting: # accepting sockets are never connected, they "spawn" new # sockets that are connected @@ -726,6 +804,7 @@ class dispatcher: self.handle_read() def handle_connect_event(self): + """Handle a connection event""" err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err != 0: raise socket.error(err, _strerror(err)) @@ -734,6 +813,7 @@ class dispatcher: self.connecting = False def handle_write_event(self): + """Handle a write event""" if self.accepting: # Accepting sockets shouldn't get a write event. # We will pretend it didn't happen. @@ -745,6 +825,7 @@ class dispatcher: self.handle_write() def handle_expt_event(self): + """Handle expected exceptions""" # handle_expt_event() is called if there might be an error on the # socket, or if there is OOB data # check for the error condition first @@ -763,12 +844,13 @@ class dispatcher: self.handle_expt() def handle_error(self): - nil, t, v, tbinfo = compact_traceback() + """Handle unexpected exceptions""" + _, t, v, tbinfo = compact_traceback() # sometimes a user repr method will crash. try: self_repr = repr(self) - except: + except BaseException: self_repr = '<__repr__(self) failed for object at %0x>' % id(self) self.log_info( @@ -777,89 +859,110 @@ class dispatcher: t, v, tbinfo - ), + ), 'error' - ) + ) self.handle_close() - def handle_expt(self): - self.log_info('unhandled incoming priority event', 'warning') - - def handle_read(self): - self.log_info('unhandled read event', 'warning') - - def handle_write(self): - self.log_info('unhandled write event', 'warning') - - def handle_connect(self): - self.log_info('unhandled connect event', 'warning') - def handle_accept(self): + """Handle an accept event""" pair = self.accept() if pair is not None: self.handle_accepted(*pair) + def handle_expt(self): + """Log that the subclass does not implement handle_expt""" + self.log_info('unhandled incoming priority event', 'warning') + + def handle_read(self): + """Log that the subclass does not implement handle_read""" + self.log_info('unhandled read event', 'warning') + + def handle_write(self): + """Log that the subclass does not implement handle_write""" + self.log_info('unhandled write event', 'warning') + + def handle_connect(self): + """Log that the subclass does not implement handle_connect""" + self.log_info('unhandled connect event', 'warning') + def handle_accepted(self, sock, addr): + """Log that the subclass does not implement handle_accepted""" sock.close() self.log_info('unhandled accepted event on %s' % (addr), 'warning') def handle_close(self): + """Log that the subclass does not implement handle_close""" self.log_info('unhandled close event', 'warning') self.close() -# --------------------------------------------------------------------------- -# adds simple buffered output capability, useful for simple clients. -# [for more sophisticated usage use asynchat.async_chat] -# --------------------------------------------------------------------------- class dispatcher_with_send(dispatcher): + """ + adds simple buffered output capability, useful for simple clients. + [for more sophisticated usage use asynchat.async_chat] + """ + # pylint: disable=redefined-builtin def __init__(self, sock=None, map=None): + # pylint: disable=redefined-builtin + dispatcher.__init__(self, sock, map) self.out_buffer = b'' def initiate_send(self): + """Initiate a send""" num_sent = 0 num_sent = dispatcher.send(self, self.out_buffer[:512]) self.out_buffer = self.out_buffer[num_sent:] def handle_write(self): + """Handle a write event""" self.initiate_send() def writable(self): - return (not self.connected) or len(self.out_buffer) + """Predicate to indicate if the object is writable""" + return not self.connected or len(self.out_buffer) def send(self, data): + """Send data""" if self.debug: self.log_info('sending %s' % repr(data)) self.out_buffer = self.out_buffer + data self.initiate_send() + # --------------------------------------------------------------------------- # used for debugging. # --------------------------------------------------------------------------- + def compact_traceback(): + """Return a compact traceback""" t, v, tb = sys.exc_info() tbinfo = [] - if not tb: # Must have a traceback + if not tb: # Must have a traceback raise AssertionError("traceback does not exist") while tb: tbinfo.append(( tb.tb_frame.f_code.co_filename, tb.tb_frame.f_code.co_name, str(tb.tb_lineno) - )) + )) tb = tb.tb_next # just to be safe del tb - file, function, line = tbinfo[-1] + filename, function, line = tbinfo[-1] info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) - return (file, function, line), t, v, info + return (filename, function, line), t, v, info + def close_all(map=None, ignore_all=False): + """Close all connections""" + # pylint: disable=redefined-builtin + if map is None: map = socket_map for x in list(map.values()): @@ -872,11 +975,12 @@ def close_all(map=None, ignore_all=False): raise except _reraised_exceptions: raise - except: + except BaseException: if not ignore_all: raise map.clear() + # Asynchronous File I/O: # # After a little research (reading man pages on various unixen, and @@ -890,27 +994,34 @@ def close_all(map=None, ignore_all=False): # # Regardless, this is useful for pipes, and stdin/stdout... + if os.name == 'posix': import fcntl class file_wrapper: - # Here we override just enough to make a file - # look like a socket for the purposes of asyncore. - # The passed fd is automatically os.dup()'d + """ + Here we override just enough to make a file look like a socket for the purposes of asyncore. + + The passed fd is automatically os.dup()'d + """ + # pylint: disable=old-style-class def __init__(self, fd): self.fd = os.dup(fd) def recv(self, *args): + """Fake recv()""" return os.read(self.fd, *args) def send(self, *args): + """Fake send()""" return os.write(self.fd, *args) def getsockopt(self, level, optname, buflen=None): + """Fake getsockopt()""" if (level == socket.SOL_SOCKET and - optname == socket.SO_ERROR and - not buflen): + optname == socket.SO_ERROR and + not buflen): return 0 raise NotImplementedError("Only asyncore specific behaviour " "implemented.") @@ -919,14 +1030,19 @@ if os.name == 'posix': write = send def close(self): + """Fake close()""" os.close(self.fd) def fileno(self): + """Fake fileno()""" return self.fd class file_dispatcher(dispatcher): + """A dispatcher for file_wrapper objects""" def __init__(self, fd, map=None): + # pylint: disable=redefined-builtin + dispatcher.__init__(self, None, map) self.connected = True try: @@ -940,6 +1056,7 @@ if os.name == 'posix': fcntl.fcntl(fd, fcntl.F_SETFL, flags) def set_file(self, fd): + """Set file""" self.socket = file_wrapper(fd) self._fileno = self.socket.fileno() self.add_channel() diff --git a/src/network/tcp.py b/src/network/tcp.py index 6cba566f..a4d0aa1f 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -1,41 +1,42 @@ -import base64 -from binascii import hexlify -import hashlib -import math -import time -from pprint import pprint -import socket -import struct -import random -import traceback +# pylint: disable=too-many-ancestors +""" +src/network/tcp.py +================== +""" -from addresses import calculateInventoryHash -from debug import logger -from helper_random import randomBytes -import helper_random -from inventory import Inventory -import knownnodes -from network.advanceddispatcher import AdvancedDispatcher -from network.bmproto import BMProtoError, BMProtoInsufficientDataError, BMProtoExcessiveDataError, BMProto -from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError -import network.connectionpool -from network.dandelion import Dandelion -from network.node import Node -import network.asyncore_pollchoose as asyncore -from network.proxy import Proxy, ProxyError, GeneralProxyError -from network.objectracker import ObjectTracker -from network.socks5 import Socks5Connection, Socks5Resolver, Socks5AuthError, Socks5Error -from network.socks4a import Socks4aConnection, Socks4aResolver, Socks4aError -from network.tls import TLSDispatcher +import math +import random +import socket +import time import addresses -from bmconfigparser import BMConfigParser -from queues import invQueue, objectProcessorQueue, portCheckerQueue, UISignalQueue, receiveDataQueue +import helper_random +import knownnodes +import network.asyncore_pollchoose as asyncore +import network.connectionpool +import protocol import shared import state -import protocol +from bmconfigparser import BMConfigParser +from debug import logger +from helper_random import randomBytes +from inventory import Inventory +from network.advanceddispatcher import AdvancedDispatcher +from network.bmproto import BMProto +from network.dandelion import Dandelion +from network.objectracker import ObjectTracker +from network.socks4a import Socks4aConnection +from network.socks5 import Socks5Connection +from network.tls import TLSDispatcher +from queues import UISignalQueue, invQueue, receiveDataQueue + + +class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instance-attributes + """ + + .. todo:: Look to understand and/or fix the non-parent-init-called + """ -class TCPConnection(BMProto, TLSDispatcher): def __init__(self, address=None, sock=None): BMProto.__init__(self, address=address, sock=sock) self.verackReceived = False @@ -67,18 +68,25 @@ class TCPConnection(BMProto, TLSDispatcher): self.connect(self.destination) logger.debug("Connecting to %s:%i", self.destination.host, self.destination.port) encodedAddr = protocol.encodeHost(self.destination.host) - if protocol.checkIPAddress(encodedAddr, True) and not protocol.checkSocksIP(self.destination.host): - self.local = True - else: - self.local = False - #shared.connectedHostsList[self.destination] = 0 - ObjectTracker.__init__(self) + self.local = all([ + protocol.checkIPAddress(encodedAddr, True), + not protocol.checkSocksIP(self.destination.host) + ]) + ObjectTracker.__init__(self) # pylint: disable=non-parent-init-called self.bm_proto_reset() self.set_state("bm_header", expectBytes=protocol.Header.size) - def antiIntersectionDelay(self, initial = False): + def antiIntersectionDelay(self, initial=False): + """ + This is a defense against the so called intersection attacks. + + It is called when you notice peer is requesting non-existing objects, or right after the connection is + established. It will estimate how long an object will take to propagate across the network, and skip processing + "getdata" requests until then. This means an attacker only has one shot per IP to perform the attack. + """ # estimated time for a small object to propagate across the whole network - delay = math.ceil(math.log(max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) + 2, 20)) * (0.2 + invQueue.queueCount/2.0) + max_known_nodes = max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) + delay = math.ceil(math.log(max_known_nodes + 2, 20)) * (0.2 + invQueue.queueCount / 2.0) # take the stream with maximum amount of nodes # +2 is to avoid problems with log(0) and log(1) # 20 is avg connected nodes count @@ -93,12 +101,17 @@ class TCPConnection(BMProto, TLSDispatcher): self.skipUntil = time.time() + delay def state_connection_fully_established(self): + """ + State after the bitmessage protocol handshake is completed (version/verack exchange, and if both side support + TLS, the TLS handshake as well). + """ self.set_connection_fully_established() self.set_state("bm_header") self.bm_proto_reset() return True def set_connection_fully_established(self): + """Initiate inventory synchronisation.""" if not self.isOutbound and not self.local: shared.clientHasReceivedIncomingConnections = True UISignalQueue.put(('setStatusIcon', 'green')) @@ -113,50 +126,50 @@ class TCPConnection(BMProto, TLSDispatcher): self.sendBigInv() def sendAddr(self): + """Send a partial list of known addresses to peer.""" # We are going to share a maximum number of 1000 addrs (per overlapping # stream) with our peer. 500 from overlapping streams, 250 from the # left child stream, and 250 from the right child stream. maxAddrCount = BMConfigParser().safeGetInt("bitmessagesettings", "maxaddrperstreamsend", 500) # init - addressCount = 0 - payload = b'' - templist = [] addrs = {} for stream in self.streams: with knownnodes.knownNodesLock: - if len(knownnodes.knownNodes[stream]) > 0: + if knownnodes.knownNodes[stream]: filtered = {k: v for k, v in knownnodes.knownNodes[stream].items() - if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} + if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} elemCount = len(filtered) if elemCount > maxAddrCount: elemCount = maxAddrCount # only if more recent than 3 hours addrs[stream] = helper_random.randomsample(filtered.items(), elemCount) # sent 250 only if the remote isn't interested in it - if len(knownnodes.knownNodes[stream * 2]) > 0 and stream not in self.streams: - filtered = {k: v for k, v in knownnodes.knownNodes[stream*2].items() - if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} + if knownnodes.knownNodes[stream * 2] and stream not in self.streams: + filtered = {k: v for k, v in knownnodes.knownNodes[stream * 2].items() + if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} elemCount = len(filtered) if elemCount > maxAddrCount / 2: elemCount = int(maxAddrCount / 2) addrs[stream * 2] = helper_random.randomsample(filtered.items(), elemCount) - if len(knownnodes.knownNodes[(stream * 2) + 1]) > 0 and stream not in self.streams: - filtered = {k: v for k, v in knownnodes.knownNodes[stream*2+1].items() - if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} + if knownnodes.knownNodes[(stream * 2) + 1] and stream not in self.streams: + filtered = {k: v for k, v in knownnodes.knownNodes[stream * 2 + 1].items() + if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} elemCount = len(filtered) if elemCount > maxAddrCount / 2: elemCount = int(maxAddrCount / 2) addrs[stream * 2 + 1] = helper_random.randomsample(filtered.items(), elemCount) - for substream in addrs.keys(): + for substream in addrs: for peer, params in addrs[substream]: templist.append((substream, peer, params["lastseen"])) - if len(templist) > 0: + if templist: self.append_write_buf(BMProto.assembleAddr(templist)) def sendBigInv(self): + """Send hashes of all inventory objects, chunked as the protocol has a per-command limit.""" def sendChunk(): + """Send one chunk of inv entries in one command""" if objectCount == 0: return logger.debug('Sending huge inv message with %i objects to just this one peer', objectCount) @@ -172,13 +185,12 @@ class TCPConnection(BMProto, TLSDispatcher): if Dandelion().hasHash(objHash): continue bigInvList[objHash] = 0 - #self.objectsNewToThem[objHash] = time.time() objectCount = 0 payload = b'' # Now let us start appending all of these hashes together. They will be # sent out in a big inv message to our new peer. - for hash, storedValue in bigInvList.items(): - payload += hash + for obj_hash, _ in bigInvList.items(): + payload += obj_hash objectCount += 1 # Remove -1 below when sufficient time has passed for users to @@ -193,20 +205,26 @@ class TCPConnection(BMProto, TLSDispatcher): sendChunk() def handle_connect(self): + """Callback for TCP connection being established.""" try: AdvancedDispatcher.handle_connect(self) except socket.error as e: - if e.errno in asyncore._DISCONNECTED: - logger.debug("%s:%i: Connection failed: %s" % (self.destination.host, self.destination.port, str(e))) + if e.errno in asyncore._DISCONNECTED: # pylint: disable=protected-access + logger.debug("%s:%i: Connection failed: %s", self.destination.host, self.destination.port, str(e)) return self.nodeid = randomBytes(8) - self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, \ - network.connectionpool.BMConnectionPool().streams, False, nodeid=self.nodeid)) - #print "%s:%i: Sending version" % (self.destination.host, self.destination.port) + self.append_write_buf( + protocol.assembleVersionMessage( + self.destination.host, + self.destination.port, + network.connectionpool.BMConnectionPool().streams, + False, + nodeid=self.nodeid)) self.connectedAt = time.time() receiveDataQueue.put(self.destination) def handle_read(self): + """Callback for reading from a socket""" TLSDispatcher.handle_read(self) if self.isOutbound and self.fullyEstablished: for s in self.streams: @@ -218,9 +236,11 @@ class TCPConnection(BMProto, TLSDispatcher): receiveDataQueue.put(self.destination) def handle_write(self): + """Callback for writing to a socket""" TLSDispatcher.handle_write(self) def handle_close(self): + """Callback for connection being closed.""" if self.isOutbound and not self.fullyEstablished: knownnodes.decreaseRating(self.destination) if self.fullyEstablished: @@ -231,37 +251,55 @@ class TCPConnection(BMProto, TLSDispatcher): class Socks5BMConnection(Socks5Connection, TCPConnection): + """SOCKS5 wrapper for TCP connections""" + def __init__(self, address): Socks5Connection.__init__(self, address=address) TCPConnection.__init__(self, address=address, sock=self.socket) self.set_state("init") def state_proxy_handshake_done(self): + """State when SOCKS5 connection succeeds, we need to send a Bitmessage handshake to peer.""" Socks5Connection.state_proxy_handshake_done(self) self.nodeid = randomBytes(8) - self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, \ - network.connectionpool.BMConnectionPool().streams, False, nodeid=self.nodeid)) + self.append_write_buf( + protocol.assembleVersionMessage( + self.destination.host, + self.destination.port, + network.connectionpool.BMConnectionPool().streams, + False, + nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True class Socks4aBMConnection(Socks4aConnection, TCPConnection): + """SOCKS4a wrapper for TCP connections""" + def __init__(self, address): Socks4aConnection.__init__(self, address=address) TCPConnection.__init__(self, address=address, sock=self.socket) self.set_state("init") def state_proxy_handshake_done(self): + """State when SOCKS4a connection succeeds, we need to send a Bitmessage handshake to peer.""" Socks4aConnection.state_proxy_handshake_done(self) self.nodeid = randomBytes(8) - self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, \ - network.connectionpool.BMConnectionPool().streams, False, nodeid=self.nodeid)) + self.append_write_buf( + protocol.assembleVersionMessage( + self.destination.host, + self.destination.port, + network.connectionpool.BMConnectionPool().streams, + False, + nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True class TCPServer(AdvancedDispatcher): - def __init__(self, host='127.0.0.1', port=8444): + """TCP connection server for Bitmessage protocol""" + + def __init__(self, host='127.0.0.1', port=8444): # pylint: disable=redefined-outer-name if not hasattr(self, '_map'): AdvancedDispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -284,20 +322,22 @@ class TCPServer(AdvancedDispatcher): self.listen(5) def is_bound(self): + """Is the socket bound?""" try: return self.bound except AttributeError: return False def handle_accept(self): + """Incoming connection callback""" pair = self.accept() if pair is not None: - sock, addr = pair + sock, _ = pair state.ownAddresses[state.Peer(sock.getsockname()[0], sock.getsockname()[1])] = True if len(network.connectionpool.BMConnectionPool().inboundConnections) + \ - len(network.connectionpool.BMConnectionPool().outboundConnections) > \ - BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections") + \ - BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections") + 10: + len(network.connectionpool.BMConnectionPool().outboundConnections) > \ + BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections") + \ + BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections") + 10: # 10 is a sort of buffer, in between it will go through the version handshake # and return an error to the peer logger.warning("Server full, dropping connection") @@ -314,17 +354,7 @@ if __name__ == "__main__": for host in (("127.0.0.1", 8448),): direct = TCPConnection(host) - while len(asyncore.socket_map) > 0: + while asyncore.socket_map: print "loop, state = %s" % (direct.state) asyncore.loop(timeout=10, count=1) continue - - proxy = Socks5BMConnection(host) - while len(asyncore.socket_map) > 0: -# print "loop, state = %s" % (proxy.state) - asyncore.loop(timeout=10, count=1) - - proxy = Socks4aBMConnection(host) - while len(asyncore.socket_map) > 0: -# print "loop, state = %s" % (proxy.state) - asyncore.loop(timeout=10, count=1) diff --git a/src/proofofwork.py b/src/proofofwork.py index df6ed295..bb16951c 100644 --- a/src/proofofwork.py +++ b/src/proofofwork.py @@ -1,60 +1,71 @@ -#import shared -#import time -#from multiprocessing import Pool, cpu_count +# pylint: disable=too-many-branches,too-many-statements,protected-access +""" +src/proofofwork.py +================== +""" + +import ctypes import hashlib -from struct import unpack, pack -from subprocess import call +import os import sys import time +from struct import pack, unpack +from subprocess import call + +import openclpow +import paths +import queues +import state +import tr from bmconfigparser import BMConfigParser from debug import logger -import paths -import openclpow -import queues -import tr -import os -import ctypes - -import state bitmsglib = 'bitmsghash.so' - bmpow = None + def _set_idle(): if 'linux' in sys.platform: os.nice(20) else: try: + # pylint: disable=no-member,import-error sys.getwindowsversion() - import win32api,win32process,win32con # @UnresolvedImport + import win32api + import win32process + import win32con pid = win32api.GetCurrentProcessId() handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, pid) win32process.SetPriorityClass(handle, win32process.IDLE_PRIORITY_CLASS) except: - #Windows 64-bit + # Windows 64-bit pass + def _pool_worker(nonce, initialHash, target, pool_size): _set_idle() trialValue = float('inf') while trialValue > target: nonce += pool_size - trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) + trialValue, = unpack('>Q', hashlib.sha512(hashlib.sha512( + pack('>Q', nonce) + initialHash).digest()).digest()[0:8]) return [trialValue, nonce] + def _doSafePoW(target, initialHash): logger.debug("Safe PoW start") nonce = 0 trialValue = float('inf') while trialValue > target and state.shutdown == 0: nonce += 1 - trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) + trialValue, = unpack('>Q', hashlib.sha512(hashlib.sha512( + pack('>Q', nonce) + initialHash).digest()).digest()[0:8]) if state.shutdown != 0: - raise StopIteration("Interrupted") + raise StopIteration("Interrupted") # pylint: misplaced-bare-raise logger.debug("Safe PoW done") return [trialValue, nonce] + def _doFastPoW(target, initialHash): logger.debug("Fast PoW start") from multiprocessing import Pool, cpu_count @@ -96,7 +107,8 @@ def _doFastPoW(target, initialHash): logger.debug("Fast PoW done") return result[0], result[1] time.sleep(0.2) - + + def _doCPoW(target, initialHash): h = initialHash m = target @@ -104,33 +116,47 @@ def _doCPoW(target, initialHash): out_m = ctypes.c_ulonglong(m) logger.debug("C PoW start") nonce = bmpow(out_h, out_m) - trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) + trialValue, = unpack('>Q', hashlib.sha512(hashlib.sha512(pack('>Q', nonce) + initialHash).digest()).digest()[0:8]) if state.shutdown != 0: raise StopIteration("Interrupted") logger.debug("C PoW done") return [trialValue, nonce] + def _doGPUPoW(target, initialHash): logger.debug("GPU PoW start") nonce = openclpow.do_opencl_pow(initialHash.encode("hex"), target) - trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) - #print "{} - value {} < {}".format(nonce, trialValue, target) + trialValue, = unpack('>Q', hashlib.sha512(hashlib.sha512(pack('>Q', nonce) + initialHash).digest()).digest()[0:8]) if trialValue > target: deviceNames = ", ".join(gpu.name for gpu in openclpow.enabledGpus) - queues.UISignalQueue.put(('updateStatusBar', (tr._translate("MainWindow",'Your GPU(s) did not calculate correctly, disabling OpenCL. Please report to the developers.'), 1))) - logger.error("Your GPUs (%s) did not calculate correctly, disabling OpenCL. Please report to the developers.", deviceNames) + queues.UISignalQueue.put(( + 'updateStatusBar', ( + tr._translate( + "MainWindow", + 'Your GPU(s) did not calculate correctly, disabling OpenCL. Please report to the developers.' + ), + 1))) + logger.error( + "Your GPUs (%s) did not calculate correctly, disabling OpenCL. Please report to the developers.", + deviceNames) openclpow.enabledGpus = [] raise Exception("GPU did not calculate correctly.") if state.shutdown != 0: raise StopIteration("Interrupted") logger.debug("GPU PoW done") return [trialValue, nonce] - -def estimate(difficulty, format = False): + + +def estimate(difficulty, format=False): # pylint: disable=redefined-builtin + """ + .. todo: fix unused variable + """ ret = difficulty / 10 if ret < 1: ret = 1 + if format: + # pylint: disable=unused-variable out = str(int(ret)) + " seconds" if ret > 60: ret /= 60 @@ -148,25 +174,46 @@ def estimate(difficulty, format = False): if ret > 366: ret /= 366 out = str(int(ret)) + " years" - else: - return ret + ret = None # Ensure legacy behaviour + + return ret + def getPowType(): + """Get the proof of work implementation""" + if openclpow.openclEnabled(): return "OpenCL" if bmpow: return "C" return "python" + def notifyBuild(tried=False): + """Notify the user of the success or otherwise of building the PoW C module""" + if bmpow: - queues.UISignalQueue.put(('updateStatusBar', (tr._translate("proofofwork", "C PoW module built successfully."), 1))) + queues.UISignalQueue.put(('updateStatusBar', (tr._translate( + "proofofwork", "C PoW module built successfully."), 1))) elif tried: - queues.UISignalQueue.put(('updateStatusBar', (tr._translate("proofofwork", "Failed to build C PoW module. Please build it manually."), 1))) + queues.UISignalQueue.put( + ( + 'updateStatusBar', ( + tr._translate( + "proofofwork", + "Failed to build C PoW module. Please build it manually." + ), + 1 + ) + ) + ) else: - queues.UISignalQueue.put(('updateStatusBar', (tr._translate("proofofwork", "C PoW module unavailable. Please build it."), 1))) + queues.UISignalQueue.put(('updateStatusBar', (tr._translate( + "proofofwork", "C PoW module unavailable. Please build it."), 1))) + def buildCPoW(): + """Attempt to build the PoW C module""" if bmpow is not None: return if paths.frozen is not None: @@ -190,29 +237,27 @@ def buildCPoW(): except: notifyBuild(True) + def run(target, initialHash): + """Run the proof of work thread""" + if state.shutdown != 0: - raise + raise # pylint: disable=misplaced-bare-raise target = int(target) if openclpow.openclEnabled(): -# trialvalue1, nonce1 = _doGPUPoW(target, initialHash) -# trialvalue, nonce = _doFastPoW(target, initialHash) -# print "GPU: %s, %s" % (trialvalue1, nonce1) -# print "Fast: %s, %s" % (trialvalue, nonce) -# return [trialvalue, nonce] try: return _doGPUPoW(target, initialHash) except StopIteration: raise except: - pass # fallback + pass # fallback if bmpow: try: return _doCPoW(target, initialHash) except StopIteration: raise except: - pass # fallback + pass # fallback if paths.frozen == "macosx_app" or not paths.frozen: # on my (Peter Surda) Windows 10, Windows Defender # does not like this and fights with PyBitmessage @@ -225,24 +270,30 @@ def run(target, initialHash): raise except: logger.error("Fast PoW got exception:", exc_info=True) - pass #fallback try: return _doSafePoW(target, initialHash) except StopIteration: raise except: - pass #fallback + pass # fallback + def resetPoW(): + """Initialise the OpenCL PoW""" openclpow.initCL() + # init + + def init(): - global bitmsglib, bso, bmpow + """Initialise PoW""" + # pylint: disable=global-statement + global bitmsglib, bmpow openclpow.initCL() - if "win32" == sys.platform: + if sys.platform == "win32": if ctypes.sizeof(ctypes.c_voidp) == 4: bitmsglib = 'bitmsghash32.dll' else: diff --git a/src/upnp.py b/src/upnp.py index 46d55956..6430ecfe 100644 --- a/src/upnp.py +++ b/src/upnp.py @@ -1,21 +1,33 @@ -# A simple upnp module to forward port for BitMessage -# Reference: http://mattscodecave.com/posts/using-python-and-upnp-to-forward-a-port +# pylint: disable=too-many-statements,too-many-branches,protected-access,no-self-use +""" +src/upnp.py +=========== + +A simple upnp module to forward port for BitMessage +Reference: http://mattscodecave.com/posts/using-python-and-upnp-to-forward-a-port +""" + import httplib -from random import randint import socket -from struct import unpack, pack import threading import time -from bmconfigparser import BMConfigParser -from network.connectionpool import BMConnectionPool -from helper_threading import * +import urllib2 +from random import randint +from urlparse import urlparse +from xml.dom.minidom import Document, parseString + import queues import shared import state import tr +from bmconfigparser import BMConfigParser +from debug import logger +from helper_threading import StoppableThread +from network.connectionpool import BMConnectionPool + def createRequestXML(service, action, arguments=None): - from xml.dom.minidom import Document + """Router UPnP requests are XML formatted""" doc = Document() @@ -63,22 +75,24 @@ def createRequestXML(service, action, arguments=None): # our tree is ready, conver it to a string return doc.toxml() -class UPnPError(Exception): - def __init__(self, message): - self.message -class Router: +class UPnPError(Exception): + """Handle a UPnP error""" + + def __init__(self, message): + super(UPnPError, self).__init__() + logger.error(message) + + +class Router: # pylint: disable=old-style-class + """Encapulate routing""" name = "" path = "" address = None routerPath = None extPort = None - + def __init__(self, ssdpResponse, address): - import urllib2 - from xml.dom.minidom import parseString - from urlparse import urlparse - from debug import logger self.address = address @@ -92,9 +106,9 @@ class Router: try: self.routerPath = urlparse(header['location']) if not self.routerPath or not hasattr(self.routerPath, "hostname"): - logger.error ("UPnP: no hostname: %s", header['location']) + logger.error("UPnP: no hostname: %s", header['location']) except KeyError: - logger.error ("UPnP: missing location header") + logger.error("UPnP: missing location header") # get the profile xml file and read it into a variable directory = urllib2.urlopen(header['location']).read() @@ -108,45 +122,58 @@ class Router: for service in service_types: if service.childNodes[0].data.find('WANIPConnection') > 0 or \ - service.childNodes[0].data.find('WANPPPConnection') > 0: + service.childNodes[0].data.find('WANPPPConnection') > 0: self.path = service.parentNode.getElementsByTagName('controlURL')[0].childNodes[0].data self.upnp_schema = service.childNodes[0].data.split(':')[-2] - def AddPortMapping(self, externalPort, internalPort, internalClient, protocol, description, leaseDuration = 0, enabled = 1): - from debug import logger + def AddPortMapping( + self, + externalPort, + internalPort, + internalClient, + protocol, + description, + leaseDuration=0, + enabled=1, + ): # pylint: disable=too-many-arguments + """Add UPnP port mapping""" + resp = self.soapRequest(self.upnp_schema + ':1', 'AddPortMapping', [ - ('NewRemoteHost', ''), - ('NewExternalPort', str(externalPort)), - ('NewProtocol', protocol), - ('NewInternalPort', str(internalPort)), - ('NewInternalClient', internalClient), - ('NewEnabled', str(enabled)), - ('NewPortMappingDescription', str(description)), - ('NewLeaseDuration', str(leaseDuration)) - ]) + ('NewRemoteHost', ''), + ('NewExternalPort', str(externalPort)), + ('NewProtocol', protocol), + ('NewInternalPort', str(internalPort)), + ('NewInternalClient', internalClient), + ('NewEnabled', str(enabled)), + ('NewPortMappingDescription', str(description)), + ('NewLeaseDuration', str(leaseDuration)) + ]) self.extPort = externalPort - logger.info("Successfully established UPnP mapping for %s:%i on external port %i", internalClient, internalPort, externalPort) + logger.info("Successfully established UPnP mapping for %s:%i on external port %i", + internalClient, internalPort, externalPort) return resp def DeletePortMapping(self, externalPort, protocol): - from debug import logger + """Delete UPnP port mapping""" + resp = self.soapRequest(self.upnp_schema + ':1', 'DeletePortMapping', [ - ('NewRemoteHost', ''), - ('NewExternalPort', str(externalPort)), - ('NewProtocol', protocol), - ]) + ('NewRemoteHost', ''), + ('NewExternalPort', str(externalPort)), + ('NewProtocol', protocol), + ]) logger.info("Removed UPnP mapping on external port %i", externalPort) return resp def GetExternalIPAddress(self): - from xml.dom.minidom import parseString + """Get the external address""" + resp = self.soapRequest(self.upnp_schema + ':1', 'GetExternalIPAddress') dom = parseString(resp) return dom.getElementsByTagName('NewExternalIPAddress')[0].childNodes[0].data - + def soapRequest(self, service, action, arguments=None): - from xml.dom.minidom import parseString - from debug import logger + """Make a request to a router""" + conn = httplib.HTTPConnection(self.routerPath.hostname, self.routerPath.port) conn.request( 'POST', @@ -155,8 +182,8 @@ class Router: { 'SOAPAction': '"urn:schemas-upnp-org:service:%s#%s"' % (service, action), 'Content-Type': 'text/xml' - } - ) + } + ) resp = conn.getresponse() conn.close() if resp.status == 500: @@ -164,21 +191,24 @@ class Router: try: dom = parseString(respData) errinfo = dom.getElementsByTagName('errorDescription') - if len(errinfo) > 0: + if errinfo: logger.error("UPnP error: %s", respData) raise UPnPError(errinfo[0].childNodes[0].data) except: - raise UPnPError("Unable to parse SOAP error: %s" %(respData)) + raise UPnPError("Unable to parse SOAP error: %s" % (respData)) return resp + class uPnPThread(threading.Thread, StoppableThread): + """Start a thread to handle UPnP activity""" + SSDP_ADDR = "239.255.255.250" GOOGLE_DNS = "8.8.8.8" SSDP_PORT = 1900 SSDP_MX = 2 SSDP_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1" - def __init__ (self): + def __init__(self): threading.Thread.__init__(self, name="uPnPThread") try: self.extPort = BMConfigParser().getint('bitmessagesettings', 'extport') @@ -194,8 +224,8 @@ class uPnPThread(threading.Thread, StoppableThread): self.initStop() def run(self): - from debug import logger - + """Start the thread to manage UPnP activity""" + logger.debug("Starting UPnP thread") logger.debug("Local IP: %s", self.localIP) lastSent = 0 @@ -209,9 +239,11 @@ class uPnPThread(threading.Thread, StoppableThread): if not bound: time.sleep(1) + # pylint: disable=attribute-defined-outside-init self.localPort = BMConfigParser().getint('bitmessagesettings', 'port') + while state.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'): - if time.time() - lastSent > self.sendSleep and len(self.routers) == 0: + if time.time() - lastSent > self.sendSleep and not self.routers: try: self.sendSearchRouter() except: @@ -219,7 +251,7 @@ class uPnPThread(threading.Thread, StoppableThread): lastSent = time.time() try: while state.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'): - resp,(ip,port) = self.sock.recvfrom(1000) + resp, (ip, _) = self.sock.recvfrom(1000) if resp is None: continue newRouter = Router(resp, ip) @@ -230,14 +262,11 @@ class uPnPThread(threading.Thread, StoppableThread): logger.debug("Found UPnP router at %s", ip) self.routers.append(newRouter) self.createPortMapping(newRouter) - queues.UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow",'UPnP port mapping established on port %1').arg(str(self.extPort)))) - # retry connections so that the submitted port is refreshed - with shared.alreadyAttemptedConnectionsListLock: - shared.alreadyAttemptedConnectionsList.clear() - shared.alreadyAttemptedConnectionsListResetTime = int( - time.time()) + queues.UISignalQueue.put(('updateStatusBar', tr._translate( + "MainWindow", 'UPnP port mapping established on port %1' + ).arg(str(self.extPort)))) break - except socket.timeout as e: + except socket.timeout: pass except: logger.error("Failure running UPnP router search.", exc_info=True) @@ -259,22 +288,25 @@ class uPnPThread(threading.Thread, StoppableThread): self.deletePortMapping(router) shared.extPort = None if deleted: - queues.UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow",'UPnP port mapping removed'))) + queues.UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow", 'UPnP port mapping removed'))) logger.debug("UPnP thread done") def getLocalIP(self): + """Get the local IP of the node""" + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) s.connect((uPnPThread.GOOGLE_DNS, 1)) return s.getsockname()[0] def sendSearchRouter(self): - from debug import logger + """Querying for UPnP services""" + ssdpRequest = "M-SEARCH * HTTP/1.1\r\n" + \ - "HOST: %s:%d\r\n" % (uPnPThread.SSDP_ADDR, uPnPThread.SSDP_PORT) + \ - "MAN: \"ssdp:discover\"\r\n" + \ - "MX: %d\r\n" % (uPnPThread.SSDP_MX, ) + \ - "ST: %s\r\n" % (uPnPThread.SSDP_ST, ) + "\r\n" + "HOST: %s:%d\r\n" % (uPnPThread.SSDP_ADDR, uPnPThread.SSDP_PORT) + \ + "MAN: \"ssdp:discover\"\r\n" + \ + "MX: %d\r\n" % (uPnPThread.SSDP_MX, ) + \ + "ST: %s\r\n" % (uPnPThread.SSDP_ST, ) + "\r\n" try: logger.debug("Sending UPnP query") @@ -283,19 +315,23 @@ class uPnPThread(threading.Thread, StoppableThread): logger.exception("UPnP send query failed") def createPortMapping(self, router): - from debug import logger + """Add a port mapping""" for i in range(50): try: - routerIP, = unpack('>I', socket.inet_aton(router.address)) localIP = self.localIP if i == 0: - extPort = self.localPort # try same port first + extPort = self.localPort # try same port first elif i == 1 and self.extPort: - extPort = self.extPort # try external port from last time next + extPort = self.extPort # try external port from last time next else: extPort = randint(32767, 65535) - logger.debug("Attempt %i, requesting UPnP mapping for %s:%i on external port %i", i, localIP, self.localPort, extPort) + logger.debug( + "Attempt %i, requesting UPnP mapping for %s:%i on external port %i", + i, + localIP, + self.localPort, + extPort) router.AddPortMapping(extPort, self.localPort, localIP, 'TCP', 'BitMessage') shared.extPort = extPort self.extPort = extPort @@ -306,7 +342,5 @@ class uPnPThread(threading.Thread, StoppableThread): logger.debug("UPnP error: ", exc_info=True) def deletePortMapping(self, router): + """Delete a port mapping""" router.DeletePortMapping(router.extPort, 'TCP') - - -