Compare commits

...

5 Commits

6 changed files with 71 additions and 89 deletions

View File

@ -1,7 +1,6 @@
""" """
Improved version of asyncore dispatcher Improved version of asyncore dispatcher
""" """
# pylint: disable=attribute-defined-outside-init
import socket import socket
import threading import threading
import time import time
@ -31,6 +30,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
def __init__(self, sock=None): def __init__(self, sock=None):
if not hasattr(self, '_map'): if not hasattr(self, '_map'):
asyncore.dispatcher.__init__(self, sock) asyncore.dispatcher.__init__(self, sock)
self.connectedAt = 0
self.close_reason = None self.close_reason = None
self.read_buf = bytearray() self.read_buf = bytearray()
self.write_buf = bytearray() self.write_buf = bytearray()
@ -42,6 +42,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
self.readLock = threading.RLock() self.readLock = threading.RLock()
self.writeLock = threading.RLock() self.writeLock = threading.RLock()
self.processingLock = threading.RLock() self.processingLock = threading.RLock()
self.uploadChunk = self.downloadChunk = 0
def append_write_buf(self, data): def append_write_buf(self, data):
"""Append binary data to the end of stream write buffer.""" """Append binary data to the end of stream write buffer."""

View File

@ -1,7 +1,7 @@
""" """
Bitmessage Protocol Class BMProto defines bitmessage's network protocol workflow.
""" """
# pylint: disable=attribute-defined-outside-init, too-few-public-methods
import base64 import base64
import hashlib import hashlib
import logging import logging
@ -66,6 +66,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.pendingUpload = RandomTrackingDict() self.pendingUpload = RandomTrackingDict()
# canonical identifier of network group # canonical identifier of network group
self.network_group = None self.network_group = None
# userAgent initialization
self.userAgent = ''
def bm_proto_reset(self): def bm_proto_reset(self):
"""Reset the bitmessage object parser""" """Reset the bitmessage object parser"""
@ -100,7 +102,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
length=protocol.Header.size, expectBytes=self.payloadLength) length=protocol.Header.size, expectBytes=self.payloadLength)
return True return True
def state_bm_command(self): # pylint: disable=too-many-branches def state_bm_command(self): # pylint: disable=too-many-branches
"""Process incoming command""" """Process incoming command"""
self.payload = self.read_buf[:self.payloadLength] self.payload = self.read_buf[:self.payloadLength]
if self.checksum != hashlib.sha512(self.payload).digest()[0:4]: if self.checksum != hashlib.sha512(self.payload).digest()[0:4]:
@ -185,7 +187,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return Node(services, host, port) return Node(services, host, port)
# pylint: disable=too-many-branches, too-many-statements
def decode_payload_content(self, pattern="v"): def decode_payload_content(self, pattern="v"):
""" """
Decode the payload depending on pattern: Decode the payload depending on pattern:
@ -201,8 +202,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
0-9 = length of the next item 0-9 = length of the next item
, = end of array , = end of array
""" """
# pylint: disable=too-many-branches,too-many-statements
# pylint: disable=inconsistent-return-statements
def decode_simple(self, char="v"): def decode_simple(self, char="v"):
"""Decode the payload using one char pattern""" """Decode the payload using one char pattern"""
if char == "v": if char == "v":
@ -221,6 +222,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.payloadOffset += 8 self.payloadOffset += 8
return struct.unpack(">Q", self.payload[ return struct.unpack(">Q", self.payload[
self.payloadOffset - 8:self.payloadOffset])[0] self.payloadOffset - 8:self.payloadOffset])[0]
return None
size = None size = None
isArray = False isArray = False
@ -254,10 +256,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
]) ])
parserStack[-2][4] = len(parserStack[-2][3]) parserStack[-2][4] = len(parserStack[-2][3])
else: else:
for j in range(parserStack[-1][4], len(parserStack[-1][3])): j = 0
for j in range(
parserStack[-1][4], len(parserStack[-1][3])):
if parserStack[-1][3][j] not in "lL0123456789": if parserStack[-1][3][j] not in "lL0123456789":
break break
# pylint: disable=undefined-loop-variable
parserStack.append([ parserStack.append([
size, size, isArray, size, size, isArray,
parserStack[-1][3][parserStack[-1][4]:j + 1], 0, [] parserStack[-1][3][parserStack[-1][4]:j + 1], 0, []
@ -268,7 +271,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
elif i == "s": elif i == "s":
# if parserStack[-2][2]: # if parserStack[-2][2]:
# parserStack[-1][5].append(self.payload[ # parserStack[-1][5].append(self.payload[
# self.payloadOffset:self.payloadOffset + parserStack[-1][0]]) # self.payloadOffset:self.payloadOffset
# + parserStack[-1][0]])
# else: # else:
parserStack[-1][5] = self.payload[ parserStack[-1][5] = self.payload[
self.payloadOffset:self.payloadOffset + parserStack[-1][0]] self.payloadOffset:self.payloadOffset + parserStack[-1][0]]
@ -339,6 +343,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return True return True
def _command_inv(self, dandelion=False): def _command_inv(self, dandelion=False):
"""
Common inv announce implementation:
both inv and dinv depending on *dandelion* kwarg
"""
items = self.decode_payload_content("l32s") items = self.decode_payload_content("l32s")
if len(items) > MAX_OBJECT_COUNT: if len(items) > MAX_OBJECT_COUNT:
@ -376,10 +384,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
nonce, expiresTime, objectType, version, streamNumber, nonce, expiresTime, objectType, version, streamNumber,
self.payload, self.payloadOffset) self.payload, self.payloadOffset)
if len(self.payload) - self.payloadOffset > MAX_OBJECT_PAYLOAD_SIZE: payload_len = len(self.payload) - self.payloadOffset
if payload_len > MAX_OBJECT_PAYLOAD_SIZE:
logger.info( logger.info(
'The payload length of this object is too large (%d bytes).' 'The payload length of this object is too large'
' Ignoring it.', len(self.payload) - self.payloadOffset) ' (%d bytes). Ignoring it.', payload_len)
raise BMProtoExcessiveDataError() raise BMProtoExcessiveDataError()
try: try:
@ -434,9 +443,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def bm_command_addr(self): def bm_command_addr(self):
"""Incoming addresses, process them""" """Incoming addresses, process them"""
# pylint: disable=redefined-outer-name # not using services
addresses = self._decode_addr() for seenTime, stream, _, ip, port in self._decode_addr():
for seenTime, stream, _, ip, port in addresses:
ip = str(ip) ip = str(ip)
if ( if (
stream not in state.streamsInWhichIAmParticipating stream not in state.streamsInWhichIAmParticipating
@ -446,8 +454,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
continue continue
decodedIP = protocol.checkIPAddress(ip) decodedIP = protocol.checkIPAddress(ip)
if ( if (
decodedIP decodedIP and time.time() - seenTime > 0
and time.time() - seenTime > 0
and seenTime > time.time() - ADDRESS_ALIVE and seenTime > time.time() - ADDRESS_ALIVE
and port > 0 and port > 0
): ):
@ -475,7 +482,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.append_write_buf(protocol.CreatePacket('pong')) self.append_write_buf(protocol.CreatePacket('pong'))
return True return True
def bm_command_pong(self): # pylint: disable=no-self-use @staticmethod
def bm_command_pong():
""" """
Incoming pong. Incoming pong.
Ignore it. PyBitmessage pings connections after about 5 minutes Ignore it. PyBitmessage pings connections after about 5 minutes
@ -545,9 +553,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
length=self.payloadLength, expectBytes=0) length=self.payloadLength, expectBytes=0)
return False return False
# pylint: disable=too-many-return-statements
def peerValidityChecks(self): def peerValidityChecks(self):
"""Check the validity of the peer""" """Check the validity of the peer"""
# pylint: disable=too-many-return-statements
if self.remoteProtocolVersion < 3: if self.remoteProtocolVersion < 3:
self.append_write_buf(protocol.assembleErrorMessage( self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your is using an old protocol. Closing connection.", errorText="Your is using an old protocol. Closing connection.",
@ -562,7 +570,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
" compared to mine. Closing connection.", fatal=2)) " compared to mine. Closing connection.", fatal=2))
logger.info( logger.info(
"%s's time is too far in the future (%s seconds)." "%s's time is too far in the future (%s seconds)."
" Closing connection to it.", self.destination, self.timeOffset) " Closing connection to it.",
self.destination, self.timeOffset)
BMProto.timeOffsetWrongCount += 1 BMProto.timeOffsetWrongCount += 1
return False return False
elif self.timeOffset < -MAX_TIME_OFFSET: elif self.timeOffset < -MAX_TIME_OFFSET:
@ -570,8 +579,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
errorText="Your time is too far in the past compared to mine." errorText="Your time is too far in the past compared to mine."
" Closing connection.", fatal=2)) " Closing connection.", fatal=2))
logger.info( logger.info(
"%s's time is too far in the past (timeOffset %s seconds)." "%s's time is too far in the past"
" Closing connection to it.", self.destination, self.timeOffset) " (timeOffset %s seconds). Closing connection to it.",
self.destination, self.timeOffset)
BMProto.timeOffsetWrongCount += 1 BMProto.timeOffsetWrongCount += 1
return False return False
else: else:
@ -584,7 +594,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
'Closed connection to %s because there is no overlapping' 'Closed connection to %s because there is no overlapping'
' interest in streams.', self.destination) ' interest in streams.', self.destination)
return False return False
if self.destination in connectionpool.BMConnectionPool().inboundConnections: if connectionpool.BMConnectionPool().inboundConnections.get(
self.destination):
try: try:
if not protocol.checkSocksIP(self.destination.host): if not protocol.checkSocksIP(self.destination.host):
self.append_write_buf(protocol.assembleErrorMessage( self.append_write_buf(protocol.assembleErrorMessage(
@ -594,7 +605,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
'Closed connection to %s because we are already' 'Closed connection to %s because we are already'
' connected to that IP.', self.destination) ' connected to that IP.', self.destination)
return False return False
except Exception: except: # TODO: exception types
pass pass
if not self.isOutbound: if not self.isOutbound:
# incoming from a peer we're connected to as outbound, # incoming from a peer we're connected to as outbound,
@ -614,8 +625,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
'Closed connection to %s due to server full' 'Closed connection to %s due to server full'
' or duplicate inbound/outbound.', self.destination) ' or duplicate inbound/outbound.', self.destination)
return False return False
if connectionpool.BMConnectionPool().isAlreadyConnected( if connectionpool.BMConnectionPool().isAlreadyConnected(self.nonce):
self.nonce):
self.append_write_buf(protocol.assembleErrorMessage( self.append_write_buf(protocol.assembleErrorMessage(
errorText="I'm connected to myself. Closing connection.", errorText="I'm connected to myself. Closing connection.",
fatal=2)) fatal=2))
@ -628,7 +638,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
@staticmethod @staticmethod
def stopDownloadingObject(hashId, forwardAnyway=False): def stopDownloadingObject(hashId, forwardAnyway=False):
"""Stop downloading an object""" """Stop downloading object *hashId*"""
for connection in connectionpool.BMConnectionPool().connections(): for connection in connectionpool.BMConnectionPool().connections():
try: try:
del connection.objectsNewToMe[hashId] del connection.objectsNewToMe[hashId]
@ -658,7 +668,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
except AttributeError: except AttributeError:
try: try:
logger.debug( logger.debug(
'%(host)s:%(port)i: closing', self.destination._asdict()) '%s:%i: closing',
self.destination.host, self.destination.port)
except AttributeError: except AttributeError:
logger.debug('Disconnected socket closing') logger.debug('Disconnected socket closing')
AdvancedDispatcher.handle_close(self) AdvancedDispatcher.handle_close(self)

View File

@ -61,9 +61,9 @@ class Proxy(AdvancedDispatcher):
@proxy.setter @proxy.setter
def proxy(self, address): def proxy(self, address):
"""Set proxy IP and port""" """Set proxy IP and port"""
if (not isinstance(address, tuple) or len(address) < 2 or if (not isinstance(address, tuple) or len(address) < 2
not isinstance(address[0], str) or or not isinstance(address[0], str)
not isinstance(address[1], int)): or not isinstance(address[1], int)):
raise ValueError raise ValueError
self.__class__._proxy = address self.__class__._proxy = address
@ -113,7 +113,6 @@ class Proxy(AdvancedDispatcher):
self.destination = address self.destination = address
self.isOutbound = True self.isOutbound = True
self.fullyEstablished = False self.fullyEstablished = False
self.connectedAt = 0
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
if BMConfigParser().safeGetBoolean( if BMConfigParser().safeGetBoolean(
"bitmessagesettings", "socksauthentication"): "bitmessagesettings", "socksauthentication"):
@ -145,6 +144,5 @@ class Proxy(AdvancedDispatcher):
def state_proxy_handshake_done(self): def state_proxy_handshake_done(self):
"""Handshake is complete at this point""" """Handshake is complete at this point"""
# pylint: disable=attribute-defined-outside-init
self.connectedAt = time.time() self.connectedAt = time.time()
return False return False

View File

@ -51,7 +51,6 @@ class TCPConnection(BMProto, TLSDispatcher):
self.verackSent = False self.verackSent = False
self.streams = [0] self.streams = [0]
self.fullyEstablished = False self.fullyEstablished = False
self.connectedAt = 0
self.skipUntil = 0 self.skipUntil = 0
if address is None and sock is not None: if address is None and sock is not None:
self.destination = Peer(*sock.getpeername()) self.destination = Peer(*sock.getpeername())

View File

@ -16,7 +16,6 @@ logger = logging.getLogger('default')
_DISCONNECTED_SSL = frozenset((ssl.SSL_ERROR_EOF,)) _DISCONNECTED_SSL = frozenset((ssl.SSL_ERROR_EOF,))
# sslProtocolVersion
if sys.version_info >= (2, 7, 13): if sys.version_info >= (2, 7, 13):
# this means TLSv1 or higher # this means TLSv1 or higher
# in the future change to # in the future change to
@ -27,14 +26,16 @@ elif sys.version_info >= (2, 7, 9):
# SSLv2 and 3 are excluded with an option after context is created # SSLv2 and 3 are excluded with an option after context is created
sslProtocolVersion = ssl.PROTOCOL_SSLv23 sslProtocolVersion = ssl.PROTOCOL_SSLv23
else: else:
# this means TLSv1, there is no way to set "TLSv1 or higher" or # this means TLSv1, there is no way to set "TLSv1 or higher"
# "TLSv1.2" in < 2.7.9 # or "TLSv1.2" in < 2.7.9
sslProtocolVersion = ssl.PROTOCOL_TLSv1 sslProtocolVersion = ssl.PROTOCOL_TLSv1
# ciphers # ciphers
if ssl.OPENSSL_VERSION_NUMBER >= 0x10100000 and not \ if (
ssl.OPENSSL_VERSION.startswith("LibreSSL"): ssl.OPENSSL_VERSION_NUMBER >= 0x10100000
and not ssl.OPENSSL_VERSION.startswith(b"LibreSSL")
):
sslProtocolCiphers = "AECDH-AES256-SHA@SECLEVEL=0" sslProtocolCiphers = "AECDH-AES256-SHA@SECLEVEL=0"
else: else:
sslProtocolCiphers = "AECDH-AES256-SHA" sslProtocolCiphers = "AECDH-AES256-SHA"
@ -47,16 +48,10 @@ class TLSDispatcher(AdvancedDispatcher):
def __init__(self, _=None, sock=None, certfile=None, keyfile=None, def __init__(self, _=None, sock=None, certfile=None, keyfile=None,
server_side=False, ciphers=sslProtocolCiphers): server_side=False, ciphers=sslProtocolCiphers):
self.want_read = self.want_write = True self.want_read = self.want_write = True
if certfile is None: self.certfile = certfile or os.path.join(
self.certfile = os.path.join( paths.codePath(), 'sslkeys', 'cert.pem')
paths.codePath(), 'sslkeys', 'cert.pem') self.keyfile = keyfile or os.path.join(
else: paths.codePath(), 'sslkeys', 'key.pem')
self.certfile = certfile
if keyfile is None:
self.keyfile = os.path.join(
paths.codePath(), 'sslkeys', 'key.pem')
else:
self.keyfile = keyfile
self.server_side = server_side self.server_side = server_side
self.ciphers = ciphers self.ciphers = ciphers
self.tlsStarted = False self.tlsStarted = False
@ -66,7 +61,6 @@ class TLSDispatcher(AdvancedDispatcher):
def state_tls_init(self): def state_tls_init(self):
"""Prepare sockets for TLS handshake""" """Prepare sockets for TLS handshake"""
# pylint: disable=attribute-defined-outside-init
self.isSSL = True self.isSSL = True
self.tlsStarted = True self.tlsStarted = True
# Once the connection has been established, # Once the connection has been established,
@ -96,8 +90,6 @@ class TLSDispatcher(AdvancedDispatcher):
self.want_read = self.want_write = True self.want_read = self.want_write = True
self.set_state("tls_handshake") self.set_state("tls_handshake")
return False return False
# if hasattr(self.socket, "context"):
# self.socket.context.set_ecdh_curve("secp256k1")
@staticmethod @staticmethod
def state_tls_handshake(): def state_tls_handshake():
@ -112,9 +104,9 @@ class TLSDispatcher(AdvancedDispatcher):
try: try:
if self.tlsStarted and not self.tlsDone and not self.write_buf: if self.tlsStarted and not self.tlsDone and not self.write_buf:
return self.want_write return self.want_write
return AdvancedDispatcher.writable(self)
except AttributeError: except AttributeError:
return AdvancedDispatcher.writable(self) pass
return AdvancedDispatcher.writable(self)
def readable(self): def readable(self):
"""Handle readable check for TLS-enabled sockets""" """Handle readable check for TLS-enabled sockets"""
@ -126,14 +118,14 @@ class TLSDispatcher(AdvancedDispatcher):
return self.want_read return self.want_read
# prior to TLS handshake, # prior to TLS handshake,
# receiveDataThread should emulate synchronous behaviour # receiveDataThread should emulate synchronous behaviour
elif not self.fullyEstablished and ( if not self.fullyEstablished and (
self.expectBytes == 0 or not self.write_buf_empty()): self.expectBytes == 0 or not self.write_buf_empty()):
return False return False
return AdvancedDispatcher.readable(self)
except AttributeError: except AttributeError:
return AdvancedDispatcher.readable(self) pass
return AdvancedDispatcher.readable(self)
def handle_read(self): # pylint: disable=inconsistent-return-statements def handle_read(self):
""" """
Handle reads for sockets during TLS handshake. Requires special Handle reads for sockets during TLS handshake. Requires special
treatment as during the handshake, buffers must remain empty treatment as during the handshake, buffers must remain empty
@ -142,29 +134,20 @@ class TLSDispatcher(AdvancedDispatcher):
try: try:
# wait for write buffer flush # wait for write buffer flush
if self.tlsStarted and not self.tlsDone and not self.write_buf: if self.tlsStarted and not self.tlsDone and not self.write_buf:
# logger.debug(
# "%s:%i TLS handshaking (read)", self.destination.host,
# self.destination.port)
self.tls_handshake() self.tls_handshake()
else: else:
# logger.debug( AdvancedDispatcher.handle_read(self)
# "%s:%i Not TLS handshaking (read)", self.destination.host,
# self.destination.port)
return AdvancedDispatcher.handle_read(self)
except AttributeError: except AttributeError:
return AdvancedDispatcher.handle_read(self) AdvancedDispatcher.handle_read(self)
except ssl.SSLError as err: except ssl.SSLError as err:
self.close_reason = "SSL Error in handle_read"
if err.errno == ssl.SSL_ERROR_WANT_READ: if err.errno == ssl.SSL_ERROR_WANT_READ:
return return
elif err.errno in _DISCONNECTED_SSL: if err.errno not in _DISCONNECTED_SSL:
self.handle_close() logger.info("SSL Error: %s", err)
return self.close_reason = "SSL Error in handle_read"
logger.info("SSL Error: %s", err)
self.handle_close() self.handle_close()
return
def handle_write(self): # pylint: disable=inconsistent-return-statements def handle_write(self):
""" """
Handle writes for sockets during TLS handshake. Requires special Handle writes for sockets during TLS handshake. Requires special
treatment as during the handshake, buffers must remain empty treatment as during the handshake, buffers must remain empty
@ -173,27 +156,18 @@ class TLSDispatcher(AdvancedDispatcher):
try: try:
# wait for write buffer flush # wait for write buffer flush
if self.tlsStarted and not self.tlsDone and not self.write_buf: if self.tlsStarted and not self.tlsDone and not self.write_buf:
# logger.debug(
# "%s:%i TLS handshaking (write)", self.destination.host,
# self.destination.port)
self.tls_handshake() self.tls_handshake()
else: else:
# logger.debug( AdvancedDispatcher.handle_write(self)
# "%s:%i Not TLS handshaking (write)", self.destination.host,
# self.destination.port)
return AdvancedDispatcher.handle_write(self)
except AttributeError: except AttributeError:
return AdvancedDispatcher.handle_write(self) AdvancedDispatcher.handle_write(self)
except ssl.SSLError as err: except ssl.SSLError as err:
self.close_reason = "SSL Error in handle_write"
if err.errno == ssl.SSL_ERROR_WANT_WRITE: if err.errno == ssl.SSL_ERROR_WANT_WRITE:
return 0 return
elif err.errno in _DISCONNECTED_SSL: if err.errno not in _DISCONNECTED_SSL:
self.handle_close() logger.info("SSL Error: %s", err)
return 0 self.close_reason = "SSL Error in handle_write"
logger.info("SSL Error: %s", err)
self.handle_close() self.handle_close()
return
def tls_handshake(self): def tls_handshake(self):
"""Perform TLS handshake and handle its stages""" """Perform TLS handshake and handle its stages"""

View File

@ -28,7 +28,6 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
# .. todo:: sort out streams # .. todo:: sort out streams
self.streams = [1] self.streams = [1]
self.fullyEstablished = True self.fullyEstablished = True
self.connectedAt = 0
self.skipUntil = 0 self.skipUntil = 0
if sock is None: if sock is None:
if host is None: if host is None: