Changes based on style and lint checks. (final_code_quality_5) #1363

Merged
coffeedogs merged 1 commits from final_code_quality_5 into v0.6 2018-10-31 15:11:22 +01:00
5 changed files with 516 additions and 273 deletions

View File

@ -1,3 +1,8 @@
"""
src/defaults.py
===============
"""
# sanity check, prevent doing ridiculous PoW
# 20 million PoWs equals approximately 2 days on dev's dual R9 290
ridiculousDifficulty = 20000000
@ -7,7 +12,13 @@ ridiculousDifficulty = 20000000
# namecoin integration to "namecoind".
namecoinDefaultRpcPort = "8336"
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
networkDefaultProofOfWorkNonceTrialsPerByte = 1000 #The amount of work that should be performed (and demanded) per byte of the payload.
networkDefaultPayloadLengthExtraBytes = 1000 #To make sending short messages a little more difficult, this value is added to the payload length for use in calculating the proof of work target.
# If changed, these values will cause particularly unexpected behavior:
# You won't be able to either send or receive messages because the proof
# of work you do (or demand) won't match that done or demanded by others.
# Don't change them!
# The amount of work that should be performed (and demanded) per byte of the payload.
networkDefaultProofOfWorkNonceTrialsPerByte = 1000
# To make sending short messages a little more difficult, this value is
# added to the payload length for use in calculating the proof of work
# target.
networkDefaultPayloadLengthExtraBytes = 1000

View File

@ -1,6 +1,11 @@
# -*- Mode: Python -*-
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# pylint: disable=too-many-statements,too-many-branches,no-self-use,too-many-lines,attribute-defined-outside-init
# pylint: disable=global-statement
"""
src/network/asyncore_pollchoose.py
==================================
# ======================================================================
# Copyright 1996 by Sam Rushing
@ -25,7 +30,7 @@
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
"""Basic infrastructure for asynchronous socket service clients and servers.
Basic infrastructure for asynchronous socket service clients and servers.
There are only two ways to have a program on a single processor do "more
than one thing at a time". Multi-threaded programming is the simplest and
@ -46,22 +51,20 @@ many of the difficult problems for you, making the task of building
sophisticated high-performance network servers and clients a snap.
"""
# randomise object order for bandwidth balancing
import random
import os
import select
import socket
import sys
import time
from threading import current_thread
import warnings
from errno import (
EADDRINUSE, EAGAIN, EALREADY, EBADF, ECONNABORTED, ECONNREFUSED, ECONNRESET, EHOSTUNREACH, EINPROGRESS, EINTR,
EINVAL, EISCONN, ENETUNREACH, ENOTCONN, ENOTSOCK, EPIPE, ESHUTDOWN, ETIMEDOUT, EWOULDBLOCK, errorcode
)
from threading import current_thread
import os
import helper_random
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
ECONNREFUSED, EHOSTUNREACH, ENETUNREACH, ENOTSOCK, EINTR, ETIMEDOUT, \
EADDRINUSE, \
errorcode
try:
from errno import WSAEWOULDBLOCK
except (ImportError, AttributeError):
@ -75,13 +78,15 @@ try:
except (ImportError, AttributeError):
WSAECONNRESET = ECONNRESET
try:
from errno import WSAEADDRINUSE
# Desirable side-effects on Windows; imports winsock error numbers
from errno import WSAEADDRINUSE # pylint: disable=unused-import
except (ImportError, AttributeError):
WSAEADDRINUSE = EADDRINUSE
_DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
EBADF, ECONNREFUSED, EHOSTUNREACH, ENETUNREACH, ETIMEDOUT,
WSAECONNRESET))
_DISCONNECTED = frozenset((
ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, EBADF, ECONNREFUSED,
EHOSTUNREACH, ENETUNREACH, ETIMEDOUT, WSAECONNRESET))
OP_READ = 1
OP_WRITE = 2
@ -91,17 +96,21 @@ try:
except NameError:
socket_map = {}
def _strerror(err):
try:
return os.strerror(err)
except (ValueError, OverflowError, NameError):
if err in errorcode:
return errorcode[err]
return "Unknown error %s" %err
return "Unknown error %s" % err
class ExitNow(Exception):
"""We don't use directly but may be necessary as we replace asyncore due to some library raising or expecting it"""
pass
_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
maxDownloadRate = 0
@ -113,28 +122,38 @@ uploadTimestamp = 0
uploadBucket = 0
sentBytes = 0
def read(obj):
"""Event to read from the object, i.e. its network socket."""
if not can_receive():
return
try:
obj.handle_read_event()
except _reraised_exceptions:
raise
except:
except BaseException:
obj.handle_error()
def write(obj):
"""Event to write to the object, i.e. its network socket."""
if not can_send():
return
try:
obj.handle_write_event()
except _reraised_exceptions:
raise
except:
except BaseException:
obj.handle_error()
def set_rates(download, upload):
"""Set throttling rates"""
global maxDownloadRate, maxUploadRate, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp
maxDownloadRate = float(download) * 1024
maxUploadRate = float(upload) * 1024
downloadBucket = maxDownloadRate
@ -142,14 +161,24 @@ def set_rates(download, upload):
downloadTimestamp = time.time()
uploadTimestamp = time.time()
def can_receive():
"""Predicate indicating whether the download throttle is in effect"""
return maxDownloadRate == 0 or downloadBucket > 0
def can_send():
"""Predicate indicating whether the upload throttle is in effect"""
return maxUploadRate == 0 or uploadBucket > 0
def update_received(download=0):
"""Update the receiving throttle"""
global receivedBytes, downloadBucket, downloadTimestamp
currentTimestamp = time.time()
receivedBytes += download
if maxDownloadRate > 0:
@ -160,8 +189,12 @@ def update_received(download=0):
downloadBucket -= download
downloadTimestamp = currentTimestamp
def update_sent(upload=0):
"""Update the sending throttle"""
global sentBytes, uploadBucket, uploadTimestamp
currentTimestamp = time.time()
sentBytes += upload
if maxUploadRate > 0:
@ -172,15 +205,21 @@ def update_sent(upload=0):
uploadBucket -= upload
uploadTimestamp = currentTimestamp
def _exception(obj):
"""Handle exceptions as appropriate"""
try:
obj.handle_expt_event()
except _reraised_exceptions:
raise
except:
except BaseException:
obj.handle_error()
def readwrite(obj, flags):
"""Read and write any pending data to/from the object"""
try:
if flags & select.POLLIN and can_receive():
obj.handle_read_event()
@ -197,15 +236,20 @@ def readwrite(obj, flags):
obj.handle_close()
except _reraised_exceptions:
raise
except:
except BaseException:
obj.handle_error()
def select_poller(timeout=0.0, map=None):
"""A poller which uses select(), available on most platforms."""
# pylint: disable=redefined-builtin
if map is None:
map = socket_map
if map:
r = []; w = []; e = []
r = []
w = []
e = []
for fd, obj in list(map.items()):
is_r = obj.readable()
is_w = obj.writable()
@ -251,13 +295,16 @@ def select_poller(timeout=0.0, map=None):
else:
current_thread().stop.wait(timeout)
def poll_poller(timeout=0.0, map=None):
"""A poller which uses poll(), available on most UNIXen."""
# pylint: disable=redefined-builtin
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
timeout = int(timeout * 1000)
try:
poll_poller.pollster
except AttributeError:
@ -301,12 +348,16 @@ def poll_poller(timeout=0.0, map=None):
else:
current_thread().stop.wait(timeout)
# Aliases for backward compatibility
poll = select_poller
poll2 = poll3 = poll_poller
def epoll_poller(timeout=0.0, map=None):
"""A poller which uses epoll(), supported on Linux 2.5.44 and newer."""
# pylint: disable=redefined-builtin
if map is None:
map = socket_map
try:
@ -346,7 +397,7 @@ def epoll_poller(timeout=0.0, map=None):
if e.errno != EINTR:
raise
r = []
except select.error, err:
except select.error as err:
if err.args[0] != EINTR:
raise
r = []
@ -354,12 +405,15 @@ def epoll_poller(timeout=0.0, map=None):
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
readwrite(obj, flags)
else:
current_thread().stop.wait(timeout)
def kqueue_poller(timeout=0.0, map=None):
"""A poller which uses kqueue(), BSD specific."""
# pylint: disable=redefined-builtin,no-member
if map is None:
map = socket_map
try:
@ -408,7 +462,7 @@ def kqueue_poller(timeout=0.0, map=None):
for event in events:
fd = event.ident
obj = map.get(fd)
obj = map.get(fd)
if obj is None:
continue
if event.flags & select.KQ_EV_ERROR:
@ -425,13 +479,15 @@ def kqueue_poller(timeout=0.0, map=None):
current_thread().stop.wait(timeout)
def loop(timeout=30.0, use_poll=False, map=None, count=None,
poller=None):
def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None):
"""Poll in a loop, until count or timeout is reached"""
# pylint: disable=redefined-builtin
if map is None:
map = socket_map
if count is None:
count = True
# code which grants backward compatibility with "use_poll"
count = True
# code which grants backward compatibility with "use_poll"
# argument which should no longer be used in favor of
# "poller"
@ -460,10 +516,13 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None,
break
# then poll
poller(subtimeout, map)
if type(count) is int:
if isinstance(count, int):
count = count - 1
class dispatcher:
"""Dispatcher for socket objects"""
# pylint: disable=too-many-public-methods,too-many-instance-attributes,old-style-class
debug = False
connected = False
@ -478,6 +537,7 @@ class dispatcher:
minTx = 1500
def __init__(self, sock=None, map=None):
# pylint: disable=redefined-builtin
if map is None:
self._map = socket_map
else:
@ -510,7 +570,7 @@ class dispatcher:
self.socket = None
def __repr__(self):
status = [self.__class__.__module__+"."+self.__class__.__name__]
status = [self.__class__.__module__ + "." + self.__class__.__name__]
if self.accepting and self.addr:
status.append('listening')
elif self.connected:
@ -525,7 +585,9 @@ class dispatcher:
__str__ = __repr__
def add_channel(self, map=None):
#self.log_info('adding channel %s' % self)
"""Add a channel"""
# pylint: disable=redefined-builtin
if map is None:
map = self._map
map[self._fileno] = self
@ -533,11 +595,13 @@ class dispatcher:
self.poller_filter = 0
def del_channel(self, map=None):
"""Delete a channel"""
# pylint: disable=redefined-builtin
fd = self._fileno
if map is None:
map = self._map
if fd in map:
#self.log_info('closing channel %d:%s' % (fd, self))
del map[fd]
if self._fileno:
try:
@ -564,25 +628,29 @@ class dispatcher:
self.poller_registered = False
def create_socket(self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM):
"""Create a socket"""
self.family_and_type = family, socket_type
sock = socket.socket(family, socket_type)
sock.setblocking(0)
self.set_socket(sock)
def set_socket(self, sock, map=None):
"""Set socket"""
# pylint: disable=redefined-builtin
self.socket = sock
## self.__dict__['socket'] = sock
self._fileno = sock.fileno()
self.add_channel(map)
def set_reuse_addr(self):
# try to re-use a server port if possible
"""try to re-use a server port if possible"""
try:
self.socket.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR,
self.socket.getsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR) | 1
)
)
except socket.error:
pass
@ -593,11 +661,13 @@ class dispatcher:
# ==================================================
def readable(self):
"""Predicate to indicate download throttle status"""
if maxDownloadRate > 0:
return downloadBucket > dispatcher.minTx
return True
def writable(self):
"""Predicate to indicate upload throttle status"""
if maxUploadRate > 0:
return uploadBucket > dispatcher.minTx
return True
@ -607,21 +677,24 @@ class dispatcher:
# ==================================================
def listen(self, num):
"""Listen on a port"""
self.accepting = True
if os.name == 'nt' and num > 5:
num = 5
return self.socket.listen(num)
def bind(self, addr):
"""Bind to an address"""
self.addr = addr
return self.socket.bind(addr)
def connect(self, address):
"""Connect to an address"""
self.connected = False
self.connecting = True
err = self.socket.connect_ex(address)
if err in (EINPROGRESS, EALREADY, EWOULDBLOCK, WSAEWOULDBLOCK) \
or err == EINVAL and os.name in ('nt', 'ce'):
or err == EINVAL and os.name in ('nt', 'ce'):
self.addr = address
return
if err in (0, EISCONN):
@ -631,7 +704,7 @@ class dispatcher:
raise socket.error(err, errorcode[err])
def accept(self):
# XXX can return either an address pair or None
"""Accept incoming connections. Returns either an address pair or None."""
try:
conn, addr = self.socket.accept()
except TypeError:
@ -645,6 +718,7 @@ class dispatcher:
return conn, addr
def send(self, data):
"""Send data"""
try:
result = self.socket.send(data)
return result
@ -658,6 +732,7 @@ class dispatcher:
raise
def recv(self, buffer_size):
"""Receive data"""
try:
data = self.socket.recv(buffer_size)
if not data:
@ -665,8 +740,7 @@ class dispatcher:
# a read condition, and having recv() return 0.
self.handle_close()
return b''
else:
return data
return data
except socket.error as why:
# winsock sometimes raises ENOTCONN
if why.args[0] in (EAGAIN, EWOULDBLOCK, WSAEWOULDBLOCK):
@ -678,6 +752,7 @@ class dispatcher:
raise
def close(self):
"""Close connection"""
self.connected = False
self.accepting = False
self.connecting = False
@ -695,10 +770,10 @@ class dispatcher:
retattr = getattr(self.socket, attr)
except AttributeError:
raise AttributeError("%s instance has no attribute '%s'"
%(self.__class__.__name__, attr))
% (self.__class__.__name__, attr))
else:
msg = "%(me)s.%(attr)s is deprecated; use %(me)s.socket.%(attr)s " \
"instead" % {'me' : self.__class__.__name__, 'attr' : attr}
"instead" % {'me': self.__class__.__name__, 'attr': attr}
warnings.warn(msg, DeprecationWarning, stacklevel=2)
return retattr
@ -707,13 +782,16 @@ class dispatcher:
# and 'log_info' is for informational, warning and error logging.
def log(self, message):
"""Log a message to stderr"""
sys.stderr.write('log: %s\n' % str(message))
def log_info(self, message, log_type='info'):
"""Conditionally print a message"""
if log_type not in self.ignore_log_types:
print('%s: %s' % (log_type, message))
print '%s: %s' % (log_type, message)
def handle_read_event(self):
"""Handle a read event"""
if self.accepting:
# accepting sockets are never connected, they "spawn" new
# sockets that are connected
@ -726,6 +804,7 @@ class dispatcher:
self.handle_read()
def handle_connect_event(self):
"""Handle a connection event"""
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
raise socket.error(err, _strerror(err))
@ -734,6 +813,7 @@ class dispatcher:
self.connecting = False
def handle_write_event(self):
"""Handle a write event"""
if self.accepting:
# Accepting sockets shouldn't get a write event.
# We will pretend it didn't happen.
@ -745,6 +825,7 @@ class dispatcher:
self.handle_write()
def handle_expt_event(self):
"""Handle expected exceptions"""
# handle_expt_event() is called if there might be an error on the
# socket, or if there is OOB data
# check for the error condition first
@ -763,12 +844,13 @@ class dispatcher:
self.handle_expt()
def handle_error(self):
nil, t, v, tbinfo = compact_traceback()
"""Handle unexpected exceptions"""
_, t, v, tbinfo = compact_traceback()
# sometimes a user repr method will crash.
try:
self_repr = repr(self)
except:
except BaseException:
self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
self.log_info(
@ -777,89 +859,110 @@ class dispatcher:
t,
v,
tbinfo
),
),
'error'
)
)
self.handle_close()
def handle_expt(self):
self.log_info('unhandled incoming priority event', 'warning')
def handle_read(self):
self.log_info('unhandled read event', 'warning')
def handle_write(self):
self.log_info('unhandled write event', 'warning')
def handle_connect(self):
self.log_info('unhandled connect event', 'warning')
def handle_accept(self):
"""Handle an accept event"""
pair = self.accept()
if pair is not None:
self.handle_accepted(*pair)
def handle_expt(self):
"""Log that the subclass does not implement handle_expt"""
self.log_info('unhandled incoming priority event', 'warning')
def handle_read(self):
"""Log that the subclass does not implement handle_read"""
self.log_info('unhandled read event', 'warning')
def handle_write(self):
"""Log that the subclass does not implement handle_write"""
self.log_info('unhandled write event', 'warning')
def handle_connect(self):
"""Log that the subclass does not implement handle_connect"""
self.log_info('unhandled connect event', 'warning')
def handle_accepted(self, sock, addr):
"""Log that the subclass does not implement handle_accepted"""
sock.close()
self.log_info('unhandled accepted event on %s' % (addr), 'warning')
def handle_close(self):
"""Log that the subclass does not implement handle_close"""
self.log_info('unhandled close event', 'warning')
self.close()
# ---------------------------------------------------------------------------
# adds simple buffered output capability, useful for simple clients.
# [for more sophisticated usage use asynchat.async_chat]
# ---------------------------------------------------------------------------
class dispatcher_with_send(dispatcher):
"""
adds simple buffered output capability, useful for simple clients.
[for more sophisticated usage use asynchat.async_chat]
"""
# pylint: disable=redefined-builtin
def __init__(self, sock=None, map=None):
# pylint: disable=redefined-builtin
dispatcher.__init__(self, sock, map)
self.out_buffer = b''
def initiate_send(self):
"""Initiate a send"""
num_sent = 0
num_sent = dispatcher.send(self, self.out_buffer[:512])
self.out_buffer = self.out_buffer[num_sent:]
def handle_write(self):
"""Handle a write event"""
self.initiate_send()
def writable(self):
return (not self.connected) or len(self.out_buffer)
"""Predicate to indicate if the object is writable"""
return not self.connected or len(self.out_buffer)
def send(self, data):
"""Send data"""
if self.debug:
self.log_info('sending %s' % repr(data))
self.out_buffer = self.out_buffer + data
self.initiate_send()
# ---------------------------------------------------------------------------
# used for debugging.
# ---------------------------------------------------------------------------
def compact_traceback():
"""Return a compact traceback"""
t, v, tb = sys.exc_info()
tbinfo = []
if not tb: # Must have a traceback
if not tb: # Must have a traceback
raise AssertionError("traceback does not exist")
while tb:
tbinfo.append((
tb.tb_frame.f_code.co_filename,
tb.tb_frame.f_code.co_name,
str(tb.tb_lineno)
))
))
tb = tb.tb_next
# just to be safe
del tb
file, function, line = tbinfo[-1]
filename, function, line = tbinfo[-1]
info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
return (file, function, line), t, v, info
return (filename, function, line), t, v, info
def close_all(map=None, ignore_all=False):
"""Close all connections"""
# pylint: disable=redefined-builtin
if map is None:
map = socket_map
for x in list(map.values()):
@ -872,11 +975,12 @@ def close_all(map=None, ignore_all=False):
raise
except _reraised_exceptions:
raise
except:
except BaseException:
if not ignore_all:
raise
map.clear()
# Asynchronous File I/O:
#
# After a little research (reading man pages on various unixen, and
@ -890,27 +994,34 @@ def close_all(map=None, ignore_all=False):
#
# Regardless, this is useful for pipes, and stdin/stdout...
if os.name == 'posix':
import fcntl
class file_wrapper:
# Here we override just enough to make a file
# look like a socket for the purposes of asyncore.
# The passed fd is automatically os.dup()'d
"""
Here we override just enough to make a file look like a socket for the purposes of asyncore.
The passed fd is automatically os.dup()'d
"""
# pylint: disable=old-style-class
def __init__(self, fd):
self.fd = os.dup(fd)
def recv(self, *args):
"""Fake recv()"""
return os.read(self.fd, *args)
def send(self, *args):
"""Fake send()"""
return os.write(self.fd, *args)
def getsockopt(self, level, optname, buflen=None):
"""Fake getsockopt()"""
if (level == socket.SOL_SOCKET and
optname == socket.SO_ERROR and
not buflen):
optname == socket.SO_ERROR and
not buflen):
return 0
raise NotImplementedError("Only asyncore specific behaviour "
"implemented.")
@ -919,14 +1030,19 @@ if os.name == 'posix':
write = send
def close(self):
"""Fake close()"""
os.close(self.fd)
def fileno(self):
"""Fake fileno()"""
return self.fd
class file_dispatcher(dispatcher):
"""A dispatcher for file_wrapper objects"""
def __init__(self, fd, map=None):
# pylint: disable=redefined-builtin
dispatcher.__init__(self, None, map)
self.connected = True
try:
@ -940,6 +1056,7 @@ if os.name == 'posix':
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
def set_file(self, fd):
"""Set file"""
self.socket = file_wrapper(fd)
self._fileno = self.socket.fileno()
self.add_channel()

View File

@ -1,41 +1,42 @@
import base64
from binascii import hexlify
import hashlib
import math
import time
from pprint import pprint
import socket
import struct
import random
import traceback
# pylint: disable=too-many-ancestors
"""
src/network/tcp.py
==================
"""
from addresses import calculateInventoryHash
from debug import logger
from helper_random import randomBytes
import helper_random
from inventory import Inventory
import knownnodes
from network.advanceddispatcher import AdvancedDispatcher
from network.bmproto import BMProtoError, BMProtoInsufficientDataError, BMProtoExcessiveDataError, BMProto
from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError
import network.connectionpool
from network.dandelion import Dandelion
from network.node import Node
import network.asyncore_pollchoose as asyncore
from network.proxy import Proxy, ProxyError, GeneralProxyError
from network.objectracker import ObjectTracker
from network.socks5 import Socks5Connection, Socks5Resolver, Socks5AuthError, Socks5Error
from network.socks4a import Socks4aConnection, Socks4aResolver, Socks4aError
from network.tls import TLSDispatcher
import math
import random
import socket
import time
import addresses
from bmconfigparser import BMConfigParser
from queues import invQueue, objectProcessorQueue, portCheckerQueue, UISignalQueue, receiveDataQueue
import helper_random
import knownnodes
import network.asyncore_pollchoose as asyncore
import network.connectionpool
import protocol
import shared
import state
import protocol
from bmconfigparser import BMConfigParser
from debug import logger
from helper_random import randomBytes
from inventory import Inventory
from network.advanceddispatcher import AdvancedDispatcher
from network.bmproto import BMProto
from network.dandelion import Dandelion
from network.objectracker import ObjectTracker
from network.socks4a import Socks4aConnection
from network.socks5 import Socks5Connection
from network.tls import TLSDispatcher
from queues import UISignalQueue, invQueue, receiveDataQueue
class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instance-attributes
"""
.. todo:: Look to understand and/or fix the non-parent-init-called
"""
class TCPConnection(BMProto, TLSDispatcher):
def __init__(self, address=None, sock=None):
BMProto.__init__(self, address=address, sock=sock)
self.verackReceived = False
@ -67,18 +68,25 @@ class TCPConnection(BMProto, TLSDispatcher):
self.connect(self.destination)
logger.debug("Connecting to %s:%i", self.destination.host, self.destination.port)
encodedAddr = protocol.encodeHost(self.destination.host)
if protocol.checkIPAddress(encodedAddr, True) and not protocol.checkSocksIP(self.destination.host):
self.local = True
else:
self.local = False
#shared.connectedHostsList[self.destination] = 0
ObjectTracker.__init__(self)
self.local = all([
protocol.checkIPAddress(encodedAddr, True),
not protocol.checkSocksIP(self.destination.host)
])
ObjectTracker.__init__(self) # pylint: disable=non-parent-init-called
self.bm_proto_reset()
self.set_state("bm_header", expectBytes=protocol.Header.size)
def antiIntersectionDelay(self, initial = False):
def antiIntersectionDelay(self, initial=False):
"""
This is a defense against the so called intersection attacks.
It is called when you notice peer is requesting non-existing objects, or right after the connection is
established. It will estimate how long an object will take to propagate across the network, and skip processing
"getdata" requests until then. This means an attacker only has one shot per IP to perform the attack.
"""
# estimated time for a small object to propagate across the whole network
delay = math.ceil(math.log(max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) + 2, 20)) * (0.2 + invQueue.queueCount/2.0)
max_known_nodes = max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes)
delay = math.ceil(math.log(max_known_nodes + 2, 20)) * (0.2 + invQueue.queueCount / 2.0)
# take the stream with maximum amount of nodes
# +2 is to avoid problems with log(0) and log(1)
# 20 is avg connected nodes count
@ -93,12 +101,17 @@ class TCPConnection(BMProto, TLSDispatcher):
self.skipUntil = time.time() + delay
def state_connection_fully_established(self):
"""
State after the bitmessage protocol handshake is completed (version/verack exchange, and if both side support
TLS, the TLS handshake as well).
"""
self.set_connection_fully_established()
self.set_state("bm_header")
self.bm_proto_reset()
return True
def set_connection_fully_established(self):
"""Initiate inventory synchronisation."""
if not self.isOutbound and not self.local:
shared.clientHasReceivedIncomingConnections = True
PeterSurda commented 2018-10-12 10:49:47 +02:00 (Migrated from github.com)
Review

Send a partial list of known addresses to peer.

Send a partial list of known addresses to peer.
PeterSurda commented 2018-10-12 10:51:33 +02:00 (Migrated from github.com)
Review

Send hashes of all inventory objects, chunked as the protocol has a per-command limit.

Send hashes of all inventory objects, chunked as the protocol has a per-command limit.
PeterSurda commented 2018-10-12 10:51:50 +02:00 (Migrated from github.com)
Review

Send one chunk of inv entries in one command

Send one chunk of inv entries in one command
UISignalQueue.put(('setStatusIcon', 'green'))
@ -113,50 +126,50 @@ class TCPConnection(BMProto, TLSDispatcher):
self.sendBigInv()
def sendAddr(self):
"""Send a partial list of known addresses to peer."""
# We are going to share a maximum number of 1000 addrs (per overlapping
# stream) with our peer. 500 from overlapping streams, 250 from the
# left child stream, and 250 from the right child stream.
maxAddrCount = BMConfigParser().safeGetInt("bitmessagesettings", "maxaddrperstreamsend", 500)
# init
addressCount = 0
payload = b''
templist = []
addrs = {}
for stream in self.streams:
with knownnodes.knownNodesLock:
if len(knownnodes.knownNodes[stream]) > 0:
if knownnodes.knownNodes[stream]:
filtered = {k: v for k, v in knownnodes.knownNodes[stream].items()
if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
elemCount = len(filtered)
if elemCount > maxAddrCount:
elemCount = maxAddrCount
# only if more recent than 3 hours
addrs[stream] = helper_random.randomsample(filtered.items(), elemCount)
# sent 250 only if the remote isn't interested in it
if len(knownnodes.knownNodes[stream * 2]) > 0 and stream not in self.streams:
filtered = {k: v for k, v in knownnodes.knownNodes[stream*2].items()
if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
if knownnodes.knownNodes[stream * 2] and stream not in self.streams:
filtered = {k: v for k, v in knownnodes.knownNodes[stream * 2].items()
PeterSurda commented 2018-10-28 13:57:41 +01:00 (Migrated from github.com)
Review

It looks like ObjectTracker is used as a mixin, but it has an __init__. Other than suppressing the pylint warning, I'm not sure there's anything else to do. We could rename the __init__ to _fake_init, but then we would have to suppress attribute-defined-outside-init. I would leave it as it is unless there's something I'm missing.

It looks like `ObjectTracker` is used as a mixin, but it has an `__init__`. Other than suppressing the pylint warning, I'm not sure there's anything else to do. We could rename the `__init__` to `_fake_init`, but then we would have to suppress `attribute-defined-outside-init`. I would leave it as it is unless there's something I'm missing.
coffeedogs commented 2018-10-31 15:03:09 +01:00 (Migrated from github.com)
Review

I would expect a mixin class to be defined as a parent: class TCPConnection(BMProto, TLSDispatcher, ObjectTracker): and for the __init__s to be called in an appropriate order, each seeing *args and **kwargs and each updating self as they go, passing off initialisation to the next parent class using super(ThisClass, self).__init__(*args, **kwargs).

However, if this is working as intended, perhaps re-initialisation is both happening and required? Either way, fixing this bad code smell will require some consideration, refactoring and testing.

I would expect a mixin class to be defined as a parent: `class TCPConnection(BMProto, TLSDispatcher, ObjectTracker):` and for the `__init__`s to be called in an appropriate order, each seeing `*args` and `**kwargs` and each updating self as they go, passing off initialisation to the next parent class using `super(ThisClass, self).__init__(*args, **kwargs)`. However, if this is working as intended, perhaps re-initialisation is both happening and required? Either way, fixing this bad code smell will require some consideration, refactoring and testing.
g1itch commented 2018-10-31 15:31:04 +01:00 (Migrated from github.com)
Review

super() wont work without refactoring, I tried.

`super()` wont work without refactoring, I tried.
if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
elemCount = len(filtered)
if elemCount > maxAddrCount / 2:
elemCount = int(maxAddrCount / 2)
addrs[stream * 2] = helper_random.randomsample(filtered.items(), elemCount)
if len(knownnodes.knownNodes[(stream * 2) + 1]) > 0 and stream not in self.streams:
filtered = {k: v for k, v in knownnodes.knownNodes[stream*2+1].items()
if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
if knownnodes.knownNodes[(stream * 2) + 1] and stream not in self.streams:
filtered = {k: v for k, v in knownnodes.knownNodes[stream * 2 + 1].items()
if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
elemCount = len(filtered)
if elemCount > maxAddrCount / 2:
elemCount = int(maxAddrCount / 2)
addrs[stream * 2 + 1] = helper_random.randomsample(filtered.items(), elemCount)
for substream in addrs.keys():
for substream in addrs:
for peer, params in addrs[substream]:
templist.append((substream, peer, params["lastseen"]))
if len(templist) > 0:
if templist:
self.append_write_buf(BMProto.assembleAddr(templist))
def sendBigInv(self):
"""Send hashes of all inventory objects, chunked as the protocol has a per-command limit."""
def sendChunk():
"""Send one chunk of inv entries in one command"""
if objectCount == 0:
return
logger.debug('Sending huge inv message with %i objects to just this one peer', objectCount)
@ -172,13 +185,12 @@ class TCPConnection(BMProto, TLSDispatcher):
if Dandelion().hasHash(objHash):
continue
bigInvList[objHash] = 0
#self.objectsNewToThem[objHash] = time.time()
objectCount = 0
payload = b''
# Now let us start appending all of these hashes together. They will be
# sent out in a big inv message to our new peer.
for hash, storedValue in bigInvList.items():
payload += hash
for obj_hash, _ in bigInvList.items():
payload += obj_hash
objectCount += 1
PeterSurda commented 2018-10-12 10:52:22 +02:00 (Migrated from github.com)
Review

Callback for TCP connection being established.

Callback for TCP connection being established.
PeterSurda commented 2018-10-12 10:53:27 +02:00 (Migrated from github.com)
Review

Callback for reading from a socket

Callback for reading from a socket
# Remove -1 below when sufficient time has passed for users to
@ -193,20 +205,26 @@ class TCPConnection(BMProto, TLSDispatcher):
sendChunk()
def handle_connect(self):
"""Callback for TCP connection being established."""
try:
AdvancedDispatcher.handle_connect(self)
except socket.error as e:
if e.errno in asyncore._DISCONNECTED:
logger.debug("%s:%i: Connection failed: %s" % (self.destination.host, self.destination.port, str(e)))
if e.errno in asyncore._DISCONNECTED: # pylint: disable=protected-access
logger.debug("%s:%i: Connection failed: %s", self.destination.host, self.destination.port, str(e))
return
self.nodeid = randomBytes(8)
self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, \
network.connectionpool.BMConnectionPool().streams, False, nodeid=self.nodeid))
#print "%s:%i: Sending version" % (self.destination.host, self.destination.port)
self.append_write_buf(
protocol.assembleVersionMessage(
self.destination.host,
self.destination.port,
network.connectionpool.BMConnectionPool().streams,
False,
nodeid=self.nodeid))
self.connectedAt = time.time()
receiveDataQueue.put(self.destination)
def handle_read(self):
PeterSurda commented 2018-10-12 10:53:40 +02:00 (Migrated from github.com)
Review

Callback for writing to a socket

Callback for writing to a socket
PeterSurda commented 2018-10-12 10:53:56 +02:00 (Migrated from github.com)
Review

Callback for connection being closed.

Callback for connection being closed.
"""Callback for reading from a socket"""
TLSDispatcher.handle_read(self)
if self.isOutbound and self.fullyEstablished:
for s in self.streams:
@ -218,9 +236,11 @@ class TCPConnection(BMProto, TLSDispatcher):
receiveDataQueue.put(self.destination)
def handle_write(self):
"""Callback for writing to a socket"""
TLSDispatcher.handle_write(self)
PeterSurda commented 2018-10-12 10:54:22 +02:00 (Migrated from github.com)
Review

SOCKS5 wrapper for TCP connections

SOCKS5 wrapper for TCP connections
PeterSurda commented 2018-10-12 10:55:00 +02:00 (Migrated from github.com)
Review

State when SOCKS5 connection succeeds, we need to send a Bitmessage handshake to peer.

State when SOCKS5 connection succeeds, we need to send a Bitmessage handshake to peer.
PeterSurda commented 2018-10-12 10:55:16 +02:00 (Migrated from github.com)
Review

SOCKS4a wrapper for TCP connections

SOCKS4a wrapper for TCP connections
PeterSurda commented 2018-10-12 10:55:34 +02:00 (Migrated from github.com)
Review

State when SOCKS4a connection succeeds, we need to send a Bitmessage handshake to peer.

State when SOCKS4a connection succeeds, we need to send a Bitmessage handshake to peer.
PeterSurda commented 2018-10-12 10:56:11 +02:00 (Migrated from github.com)
Review

TCP connection server for Bitmessage protocol

TCP connection server for Bitmessage protocol
def handle_close(self):
"""Callback for connection being closed."""
if self.isOutbound and not self.fullyEstablished:
knownnodes.decreaseRating(self.destination)
if self.fullyEstablished:
@ -231,37 +251,55 @@ class TCPConnection(BMProto, TLSDispatcher):
class Socks5BMConnection(Socks5Connection, TCPConnection):
"""SOCKS5 wrapper for TCP connections"""
def __init__(self, address):
Socks5Connection.__init__(self, address=address)
TCPConnection.__init__(self, address=address, sock=self.socket)
self.set_state("init")
def state_proxy_handshake_done(self):
"""State when SOCKS5 connection succeeds, we need to send a Bitmessage handshake to peer."""
Socks5Connection.state_proxy_handshake_done(self)
self.nodeid = randomBytes(8)
self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, \
network.connectionpool.BMConnectionPool().streams, False, nodeid=self.nodeid))
self.append_write_buf(
protocol.assembleVersionMessage(
self.destination.host,
self.destination.port,
network.connectionpool.BMConnectionPool().streams,
False,
nodeid=self.nodeid))
self.set_state("bm_header", expectBytes=protocol.Header.size)
return True
class Socks4aBMConnection(Socks4aConnection, TCPConnection):
"""SOCKS4a wrapper for TCP connections"""
def __init__(self, address):
Socks4aConnection.__init__(self, address=address)
TCPConnection.__init__(self, address=address, sock=self.socket)
self.set_state("init")
def state_proxy_handshake_done(self):
"""State when SOCKS4a connection succeeds, we need to send a Bitmessage handshake to peer."""
Socks4aConnection.state_proxy_handshake_done(self)
self.nodeid = randomBytes(8)
self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, \
network.connectionpool.BMConnectionPool().streams, False, nodeid=self.nodeid))
self.append_write_buf(
protocol.assembleVersionMessage(
self.destination.host,
self.destination.port,
network.connectionpool.BMConnectionPool().streams,
False,
nodeid=self.nodeid))
self.set_state("bm_header", expectBytes=protocol.Header.size)
return True
class TCPServer(AdvancedDispatcher):
def __init__(self, host='127.0.0.1', port=8444):
"""TCP connection server for Bitmessage protocol"""
def __init__(self, host='127.0.0.1', port=8444): # pylint: disable=redefined-outer-name
if not hasattr(self, '_map'):
AdvancedDispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
@ -284,20 +322,22 @@ class TCPServer(AdvancedDispatcher):
self.listen(5)
def is_bound(self):
"""Is the socket bound?"""
try:
return self.bound
except AttributeError:
return False
def handle_accept(self):
"""Incoming connection callback"""
pair = self.accept()
if pair is not None:
sock, addr = pair
sock, _ = pair
state.ownAddresses[state.Peer(sock.getsockname()[0], sock.getsockname()[1])] = True
if len(network.connectionpool.BMConnectionPool().inboundConnections) + \
len(network.connectionpool.BMConnectionPool().outboundConnections) > \
BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections") + \
BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections") + 10:
len(network.connectionpool.BMConnectionPool().outboundConnections) > \
BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections") + \
BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections") + 10:
# 10 is a sort of buffer, in between it will go through the version handshake
# and return an error to the peer
logger.warning("Server full, dropping connection")
@ -314,17 +354,7 @@ if __name__ == "__main__":
for host in (("127.0.0.1", 8448),):
direct = TCPConnection(host)
while len(asyncore.socket_map) > 0:
while asyncore.socket_map:
print "loop, state = %s" % (direct.state)
asyncore.loop(timeout=10, count=1)
continue
proxy = Socks5BMConnection(host)
while len(asyncore.socket_map) > 0:
# print "loop, state = %s" % (proxy.state)
asyncore.loop(timeout=10, count=1)
proxy = Socks4aBMConnection(host)
while len(asyncore.socket_map) > 0:
# print "loop, state = %s" % (proxy.state)
asyncore.loop(timeout=10, count=1)

View File

@ -1,60 +1,71 @@
#import shared
#import time
#from multiprocessing import Pool, cpu_count
# pylint: disable=too-many-branches,too-many-statements,protected-access
"""
src/proofofwork.py
==================
"""
import ctypes
import hashlib
from struct import unpack, pack
from subprocess import call
import os
import sys
import time
from struct import pack, unpack
from subprocess import call
import openclpow
import paths
import queues
import state
import tr
from bmconfigparser import BMConfigParser
from debug import logger
import paths
import openclpow
import queues
import tr
import os
import ctypes
import state
bitmsglib = 'bitmsghash.so'
bmpow = None
def _set_idle():
if 'linux' in sys.platform:
os.nice(20)
else:
try:
# pylint: disable=no-member,import-error
sys.getwindowsversion()
import win32api,win32process,win32con # @UnresolvedImport
import win32api
import win32process
import win32con
pid = win32api.GetCurrentProcessId()
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, pid)
win32process.SetPriorityClass(handle, win32process.IDLE_PRIORITY_CLASS)
except:
#Windows 64-bit
# Windows 64-bit
pass
def _pool_worker(nonce, initialHash, target, pool_size):
_set_idle()
trialValue = float('inf')
while trialValue > target:
nonce += pool_size
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
trialValue, = unpack('>Q', hashlib.sha512(hashlib.sha512(
pack('>Q', nonce) + initialHash).digest()).digest()[0:8])
return [trialValue, nonce]
def _doSafePoW(target, initialHash):
logger.debug("Safe PoW start")
nonce = 0
trialValue = float('inf')
while trialValue > target and state.shutdown == 0:
nonce += 1
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
trialValue, = unpack('>Q', hashlib.sha512(hashlib.sha512(
pack('>Q', nonce) + initialHash).digest()).digest()[0:8])
if state.shutdown != 0:
raise StopIteration("Interrupted")
raise StopIteration("Interrupted") # pylint: misplaced-bare-raise
logger.debug("Safe PoW done")
return [trialValue, nonce]
def _doFastPoW(target, initialHash):
logger.debug("Fast PoW start")
from multiprocessing import Pool, cpu_count
@ -96,7 +107,8 @@ def _doFastPoW(target, initialHash):
logger.debug("Fast PoW done")
return result[0], result[1]
time.sleep(0.2)
def _doCPoW(target, initialHash):
h = initialHash
m = target
@ -104,33 +116,47 @@ def _doCPoW(target, initialHash):
out_m = ctypes.c_ulonglong(m)
logger.debug("C PoW start")
nonce = bmpow(out_h, out_m)
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
trialValue, = unpack('>Q', hashlib.sha512(hashlib.sha512(pack('>Q', nonce) + initialHash).digest()).digest()[0:8])
if state.shutdown != 0:
raise StopIteration("Interrupted")
logger.debug("C PoW done")
return [trialValue, nonce]
def _doGPUPoW(target, initialHash):
logger.debug("GPU PoW start")
nonce = openclpow.do_opencl_pow(initialHash.encode("hex"), target)
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
#print "{} - value {} < {}".format(nonce, trialValue, target)
trialValue, = unpack('>Q', hashlib.sha512(hashlib.sha512(pack('>Q', nonce) + initialHash).digest()).digest()[0:8])
if trialValue > target:
deviceNames = ", ".join(gpu.name for gpu in openclpow.enabledGpus)
queues.UISignalQueue.put(('updateStatusBar', (tr._translate("MainWindow",'Your GPU(s) did not calculate correctly, disabling OpenCL. Please report to the developers.'), 1)))
logger.error("Your GPUs (%s) did not calculate correctly, disabling OpenCL. Please report to the developers.", deviceNames)
queues.UISignalQueue.put((
'updateStatusBar', (
tr._translate(
"MainWindow",
'Your GPU(s) did not calculate correctly, disabling OpenCL. Please report to the developers.'
),
1)))
logger.error(
"Your GPUs (%s) did not calculate correctly, disabling OpenCL. Please report to the developers.",
deviceNames)
openclpow.enabledGpus = []
raise Exception("GPU did not calculate correctly.")
if state.shutdown != 0:
raise StopIteration("Interrupted")
logger.debug("GPU PoW done")
return [trialValue, nonce]
def estimate(difficulty, format = False):
def estimate(difficulty, format=False): # pylint: disable=redefined-builtin
"""
.. todo: fix unused variable
"""
ret = difficulty / 10
if ret < 1:
ret = 1
if format:
# pylint: disable=unused-variable
out = str(int(ret)) + " seconds"
if ret > 60:
ret /= 60
@ -148,25 +174,46 @@ def estimate(difficulty, format = False):
if ret > 366:
ret /= 366
out = str(int(ret)) + " years"
else:
return ret
ret = None # Ensure legacy behaviour
return ret
def getPowType():
"""Get the proof of work implementation"""
if openclpow.openclEnabled():
return "OpenCL"
if bmpow:
return "C"
return "python"
def notifyBuild(tried=False):
"""Notify the user of the success or otherwise of building the PoW C module"""
if bmpow:
queues.UISignalQueue.put(('updateStatusBar', (tr._translate("proofofwork", "C PoW module built successfully."), 1)))
queues.UISignalQueue.put(('updateStatusBar', (tr._translate(
"proofofwork", "C PoW module built successfully."), 1)))
elif tried:
queues.UISignalQueue.put(('updateStatusBar', (tr._translate("proofofwork", "Failed to build C PoW module. Please build it manually."), 1)))
queues.UISignalQueue.put(
(
'updateStatusBar', (
tr._translate(
"proofofwork",
"Failed to build C PoW module. Please build it manually."
),
1
)
)
)
else:
queues.UISignalQueue.put(('updateStatusBar', (tr._translate("proofofwork", "C PoW module unavailable. Please build it."), 1)))
queues.UISignalQueue.put(('updateStatusBar', (tr._translate(
"proofofwork", "C PoW module unavailable. Please build it."), 1)))
def buildCPoW():
"""Attempt to build the PoW C module"""
if bmpow is not None:
return
if paths.frozen is not None:
@ -190,29 +237,27 @@ def buildCPoW():
except:
notifyBuild(True)
def run(target, initialHash):
"""Run the proof of work thread"""
if state.shutdown != 0:
raise
raise # pylint: disable=misplaced-bare-raise
target = int(target)
if openclpow.openclEnabled():
# trialvalue1, nonce1 = _doGPUPoW(target, initialHash)
# trialvalue, nonce = _doFastPoW(target, initialHash)
# print "GPU: %s, %s" % (trialvalue1, nonce1)
# print "Fast: %s, %s" % (trialvalue, nonce)
# return [trialvalue, nonce]
try:
return _doGPUPoW(target, initialHash)
except StopIteration:
raise
except:
pass # fallback
pass # fallback
if bmpow:
try:
return _doCPoW(target, initialHash)
except StopIteration:
raise
except:
pass # fallback
pass # fallback
if paths.frozen == "macosx_app" or not paths.frozen:
# on my (Peter Surda) Windows 10, Windows Defender
# does not like this and fights with PyBitmessage
@ -225,24 +270,30 @@ def run(target, initialHash):
raise
except:
logger.error("Fast PoW got exception:", exc_info=True)
pass #fallback
try:
return _doSafePoW(target, initialHash)
except StopIteration:
raise
except:
pass #fallback
pass # fallback
def resetPoW():
"""Initialise the OpenCL PoW"""
openclpow.initCL()
# init
def init():
global bitmsglib, bso, bmpow
"""Initialise PoW"""
# pylint: disable=global-statement
global bitmsglib, bmpow
openclpow.initCL()
if "win32" == sys.platform:
if sys.platform == "win32":
if ctypes.sizeof(ctypes.c_voidp) == 4:
bitmsglib = 'bitmsghash32.dll'
else:

View File

@ -1,21 +1,33 @@
# A simple upnp module to forward port for BitMessage
# Reference: http://mattscodecave.com/posts/using-python-and-upnp-to-forward-a-port
# pylint: disable=too-many-statements,too-many-branches,protected-access,no-self-use
"""
src/upnp.py
===========
A simple upnp module to forward port for BitMessage
Reference: http://mattscodecave.com/posts/using-python-and-upnp-to-forward-a-port
"""
import httplib
from random import randint
import socket
from struct import unpack, pack
import threading
import time
from bmconfigparser import BMConfigParser
from network.connectionpool import BMConnectionPool
from helper_threading import *
import urllib2
from random import randint
from urlparse import urlparse
from xml.dom.minidom import Document, parseString
import queues
import shared
import state
import tr
from bmconfigparser import BMConfigParser
from debug import logger
from helper_threading import StoppableThread
from network.connectionpool import BMConnectionPool
def createRequestXML(service, action, arguments=None):
from xml.dom.minidom import Document
"""Router UPnP requests are XML formatted"""
doc = Document()
@ -63,22 +75,24 @@ def createRequestXML(service, action, arguments=None):
# our tree is ready, conver it to a string
return doc.toxml()
class UPnPError(Exception):
def __init__(self, message):
self.message
class Router:
class UPnPError(Exception):
"""Handle a UPnP error"""
def __init__(self, message):
super(UPnPError, self).__init__()
logger.error(message)
class Router: # pylint: disable=old-style-class
"""Encapulate routing"""
name = ""
path = ""
address = None
routerPath = None
extPort = None
def __init__(self, ssdpResponse, address):
import urllib2
from xml.dom.minidom import parseString
from urlparse import urlparse
from debug import logger
self.address = address
@ -92,9 +106,9 @@ class Router:
try:
self.routerPath = urlparse(header['location'])
if not self.routerPath or not hasattr(self.routerPath, "hostname"):
logger.error ("UPnP: no hostname: %s", header['location'])
logger.error("UPnP: no hostname: %s", header['location'])
except KeyError:
logger.error ("UPnP: missing location header")
logger.error("UPnP: missing location header")
# get the profile xml file and read it into a variable
directory = urllib2.urlopen(header['location']).read()
@ -108,45 +122,58 @@ class Router:
for service in service_types:
if service.childNodes[0].data.find('WANIPConnection') > 0 or \
service.childNodes[0].data.find('WANPPPConnection') > 0:
service.childNodes[0].data.find('WANPPPConnection') > 0:
self.path = service.parentNode.getElementsByTagName('controlURL')[0].childNodes[0].data
self.upnp_schema = service.childNodes[0].data.split(':')[-2]
def AddPortMapping(self, externalPort, internalPort, internalClient, protocol, description, leaseDuration = 0, enabled = 1):
from debug import logger
def AddPortMapping(
self,
externalPort,
internalPort,
internalClient,
protocol,
description,
leaseDuration=0,
enabled=1,
): # pylint: disable=too-many-arguments
"""Add UPnP port mapping"""
resp = self.soapRequest(self.upnp_schema + ':1', 'AddPortMapping', [
('NewRemoteHost', ''),
('NewExternalPort', str(externalPort)),
('NewProtocol', protocol),
('NewInternalPort', str(internalPort)),
('NewInternalClient', internalClient),
('NewEnabled', str(enabled)),
('NewPortMappingDescription', str(description)),
('NewLeaseDuration', str(leaseDuration))
])
('NewRemoteHost', ''),
('NewExternalPort', str(externalPort)),
('NewProtocol', protocol),
('NewInternalPort', str(internalPort)),
('NewInternalClient', internalClient),
('NewEnabled', str(enabled)),
('NewPortMappingDescription', str(description)),
('NewLeaseDuration', str(leaseDuration))
])
self.extPort = externalPort
logger.info("Successfully established UPnP mapping for %s:%i on external port %i", internalClient, internalPort, externalPort)
logger.info("Successfully established UPnP mapping for %s:%i on external port %i",
internalClient, internalPort, externalPort)
return resp
def DeletePortMapping(self, externalPort, protocol):
from debug import logger
"""Delete UPnP port mapping"""
resp = self.soapRequest(self.upnp_schema + ':1', 'DeletePortMapping', [
('NewRemoteHost', ''),
('NewExternalPort', str(externalPort)),
('NewProtocol', protocol),
])
('NewRemoteHost', ''),
('NewExternalPort', str(externalPort)),
('NewProtocol', protocol),
])
logger.info("Removed UPnP mapping on external port %i", externalPort)
return resp
def GetExternalIPAddress(self):
from xml.dom.minidom import parseString
"""Get the external address"""
resp = self.soapRequest(self.upnp_schema + ':1', 'GetExternalIPAddress')
dom = parseString(resp)
return dom.getElementsByTagName('NewExternalIPAddress')[0].childNodes[0].data
def soapRequest(self, service, action, arguments=None):
from xml.dom.minidom import parseString
from debug import logger
"""Make a request to a router"""
conn = httplib.HTTPConnection(self.routerPath.hostname, self.routerPath.port)
conn.request(
'POST',
@ -155,8 +182,8 @@ class Router:
{
'SOAPAction': '"urn:schemas-upnp-org:service:%s#%s"' % (service, action),
'Content-Type': 'text/xml'
}
)
}
)
resp = conn.getresponse()
conn.close()
if resp.status == 500:
@ -164,21 +191,24 @@ class Router:
try:
dom = parseString(respData)
errinfo = dom.getElementsByTagName('errorDescription')
if len(errinfo) > 0:
if errinfo:
logger.error("UPnP error: %s", respData)
raise UPnPError(errinfo[0].childNodes[0].data)
except:
raise UPnPError("Unable to parse SOAP error: %s" %(respData))
raise UPnPError("Unable to parse SOAP error: %s" % (respData))
return resp
class uPnPThread(threading.Thread, StoppableThread):
"""Start a thread to handle UPnP activity"""
SSDP_ADDR = "239.255.255.250"
GOOGLE_DNS = "8.8.8.8"
SSDP_PORT = 1900
SSDP_MX = 2
SSDP_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
def __init__ (self):
def __init__(self):
threading.Thread.__init__(self, name="uPnPThread")
try:
self.extPort = BMConfigParser().getint('bitmessagesettings', 'extport')
@ -194,8 +224,8 @@ class uPnPThread(threading.Thread, StoppableThread):
self.initStop()
def run(self):
from debug import logger
"""Start the thread to manage UPnP activity"""
logger.debug("Starting UPnP thread")
logger.debug("Local IP: %s", self.localIP)
lastSent = 0
@ -209,9 +239,11 @@ class uPnPThread(threading.Thread, StoppableThread):
if not bound:
time.sleep(1)
# pylint: disable=attribute-defined-outside-init
self.localPort = BMConfigParser().getint('bitmessagesettings', 'port')
while state.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'):
if time.time() - lastSent > self.sendSleep and len(self.routers) == 0:
if time.time() - lastSent > self.sendSleep and not self.routers:
try:
self.sendSearchRouter()
except:
@ -219,7 +251,7 @@ class uPnPThread(threading.Thread, StoppableThread):
lastSent = time.time()
try:
while state.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'):
resp,(ip,port) = self.sock.recvfrom(1000)
resp, (ip, _) = self.sock.recvfrom(1000)
if resp is None:
continue
newRouter = Router(resp, ip)
@ -230,14 +262,11 @@ class uPnPThread(threading.Thread, StoppableThread):
logger.debug("Found UPnP router at %s", ip)
g1itch commented 2018-10-15 17:23:54 +02:00 (Migrated from github.com)
Review

This statement can be probably removed. I cannot find where this shared.alreadyAttemptedConnectionsList used.

This statement can be probably removed. I cannot find where this `shared.alreadyAttemptedConnectionsList` used.
coffeedogs commented 2018-10-18 21:13:48 +02:00 (Migrated from github.com)
Review

OK, removed that context.
Edit: Actually, removed the whole list / lock, will also remove from shared.py when I commit the changed to that file.

OK, removed that context. Edit: Actually, removed the whole list / lock, will also remove from shared.py when I commit the changed to that file.
self.routers.append(newRouter)
self.createPortMapping(newRouter)
queues.UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow",'UPnP port mapping established on port %1').arg(str(self.extPort))))
# retry connections so that the submitted port is refreshed
with shared.alreadyAttemptedConnectionsListLock:
shared.alreadyAttemptedConnectionsList.clear()
shared.alreadyAttemptedConnectionsListResetTime = int(
time.time())
queues.UISignalQueue.put(('updateStatusBar', tr._translate(
"MainWindow", 'UPnP port mapping established on port %1'
).arg(str(self.extPort))))
break
except socket.timeout as e:
except socket.timeout:
pass
except:
logger.error("Failure running UPnP router search.", exc_info=True)
@ -259,22 +288,25 @@ class uPnPThread(threading.Thread, StoppableThread):
self.deletePortMapping(router)
shared.extPort = None
if deleted:
queues.UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow",'UPnP port mapping removed')))
queues.UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow", 'UPnP port mapping removed')))
logger.debug("UPnP thread done")
def getLocalIP(self):
"""Get the local IP of the node"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.connect((uPnPThread.GOOGLE_DNS, 1))
return s.getsockname()[0]
def sendSearchRouter(self):
from debug import logger
"""Querying for UPnP services"""
ssdpRequest = "M-SEARCH * HTTP/1.1\r\n" + \
"HOST: %s:%d\r\n" % (uPnPThread.SSDP_ADDR, uPnPThread.SSDP_PORT) + \
"MAN: \"ssdp:discover\"\r\n" + \
"MX: %d\r\n" % (uPnPThread.SSDP_MX, ) + \
"ST: %s\r\n" % (uPnPThread.SSDP_ST, ) + "\r\n"
"HOST: %s:%d\r\n" % (uPnPThread.SSDP_ADDR, uPnPThread.SSDP_PORT) + \
"MAN: \"ssdp:discover\"\r\n" + \
"MX: %d\r\n" % (uPnPThread.SSDP_MX, ) + \
"ST: %s\r\n" % (uPnPThread.SSDP_ST, ) + "\r\n"
try:
logger.debug("Sending UPnP query")
@ -283,19 +315,23 @@ class uPnPThread(threading.Thread, StoppableThread):
logger.exception("UPnP send query failed")
def createPortMapping(self, router):
from debug import logger
"""Add a port mapping"""
for i in range(50):
PeterSurda commented 2018-10-12 11:24:00 +02:00 (Migrated from github.com)
Review

This doesn't do anything so it can be removed.

This doesn't do anything so it can be removed.
coffeedogs commented 2018-10-15 12:50:16 +02:00 (Migrated from github.com)
Review

OK, removed. Where I found unused variables involving unpack I left in the unpacks in case there was some state to do with unpacking lying around.

OK, removed. Where I found unused variables involving unpack I left in the unpacks in case there was some state to do with unpacking lying around.
try:
routerIP, = unpack('>I', socket.inet_aton(router.address))
localIP = self.localIP
if i == 0:
extPort = self.localPort # try same port first
extPort = self.localPort # try same port first
elif i == 1 and self.extPort:
extPort = self.extPort # try external port from last time next
extPort = self.extPort # try external port from last time next
else:
extPort = randint(32767, 65535)
logger.debug("Attempt %i, requesting UPnP mapping for %s:%i on external port %i", i, localIP, self.localPort, extPort)
logger.debug(
"Attempt %i, requesting UPnP mapping for %s:%i on external port %i",
i,
localIP,
self.localPort,
extPort)
router.AddPortMapping(extPort, self.localPort, localIP, 'TCP', 'BitMessage')
shared.extPort = extPort
self.extPort = extPort
@ -306,7 +342,5 @@ class uPnPThread(threading.Thread, StoppableThread):
logger.debug("UPnP error: ", exc_info=True)
def deletePortMapping(self, router):
"""Delete a port mapping"""
router.DeletePortMapping(router.extPort, 'TCP')