More formatting in network package #1734
|
@ -1,7 +1,6 @@
|
||||||
"""
|
"""
|
||||||
Improved version of asyncore dispatcher
|
Improved version of asyncore dispatcher
|
||||||
"""
|
"""
|
||||||
# pylint: disable=attribute-defined-outside-init
|
|
||||||
import socket
|
import socket
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
@ -31,6 +30,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
||||||
def __init__(self, sock=None):
|
def __init__(self, sock=None):
|
||||||
if not hasattr(self, '_map'):
|
if not hasattr(self, '_map'):
|
||||||
asyncore.dispatcher.__init__(self, sock)
|
asyncore.dispatcher.__init__(self, sock)
|
||||||
|
self.connectedAt = 0
|
||||||
self.close_reason = None
|
self.close_reason = None
|
||||||
self.read_buf = bytearray()
|
self.read_buf = bytearray()
|
||||||
self.write_buf = bytearray()
|
self.write_buf = bytearray()
|
||||||
|
@ -42,6 +42,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
||||||
self.readLock = threading.RLock()
|
self.readLock = threading.RLock()
|
||||||
self.writeLock = threading.RLock()
|
self.writeLock = threading.RLock()
|
||||||
self.processingLock = threading.RLock()
|
self.processingLock = threading.RLock()
|
||||||
|
self.uploadChunk = self.downloadChunk = 0
|
||||||
|
|
||||||
def append_write_buf(self, data):
|
def append_write_buf(self, data):
|
||||||
"""Append binary data to the end of stream write buffer."""
|
"""Append binary data to the end of stream write buffer."""
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
"""
|
"""
|
||||||
Bitmessage Protocol
|
Class BMProto defines bitmessage's network protocol workflow.
|
||||||
"""
|
"""
|
||||||
# pylint: disable=attribute-defined-outside-init, too-few-public-methods
|
|
||||||
import base64
|
import base64
|
||||||
import hashlib
|
import hashlib
|
||||||
import logging
|
import logging
|
||||||
|
@ -66,6 +66,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
self.pendingUpload = RandomTrackingDict()
|
self.pendingUpload = RandomTrackingDict()
|
||||||
# canonical identifier of network group
|
# canonical identifier of network group
|
||||||
self.network_group = None
|
self.network_group = None
|
||||||
|
# userAgent initialization
|
||||||
|
self.userAgent = ''
|
||||||
|
|
||||||
def bm_proto_reset(self):
|
def bm_proto_reset(self):
|
||||||
"""Reset the bitmessage object parser"""
|
"""Reset the bitmessage object parser"""
|
||||||
|
@ -100,7 +102,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
length=protocol.Header.size, expectBytes=self.payloadLength)
|
length=protocol.Header.size, expectBytes=self.payloadLength)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def state_bm_command(self): # pylint: disable=too-many-branches
|
def state_bm_command(self): # pylint: disable=too-many-branches
|
||||||
"""Process incoming command"""
|
"""Process incoming command"""
|
||||||
self.payload = self.read_buf[:self.payloadLength]
|
self.payload = self.read_buf[:self.payloadLength]
|
||||||
if self.checksum != hashlib.sha512(self.payload).digest()[0:4]:
|
if self.checksum != hashlib.sha512(self.payload).digest()[0:4]:
|
||||||
|
@ -185,7 +187,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
|
|
||||||
return Node(services, host, port)
|
return Node(services, host, port)
|
||||||
|
|
||||||
# pylint: disable=too-many-branches, too-many-statements
|
# pylint: disable=too-many-branches,too-many-statements
|
||||||
def decode_payload_content(self, pattern="v"):
|
def decode_payload_content(self, pattern="v"):
|
||||||
"""
|
"""
|
||||||
Decode the payload depending on pattern:
|
Decode the payload depending on pattern:
|
||||||
|
@ -202,7 +204,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
, = end of array
|
, = end of array
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# pylint: disable=inconsistent-return-statements
|
|
||||||
def decode_simple(self, char="v"):
|
def decode_simple(self, char="v"):
|
||||||
"""Decode the payload using one char pattern"""
|
"""Decode the payload using one char pattern"""
|
||||||
if char == "v":
|
if char == "v":
|
||||||
|
@ -221,6 +222,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
self.payloadOffset += 8
|
self.payloadOffset += 8
|
||||||
return struct.unpack(">Q", self.payload[
|
return struct.unpack(">Q", self.payload[
|
||||||
self.payloadOffset - 8:self.payloadOffset])[0]
|
self.payloadOffset - 8:self.payloadOffset])[0]
|
||||||
|
return None
|
||||||
|
|
||||||
size = None
|
size = None
|
||||||
isArray = False
|
isArray = False
|
||||||
|
@ -254,10 +256,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
])
|
])
|
||||||
parserStack[-2][4] = len(parserStack[-2][3])
|
parserStack[-2][4] = len(parserStack[-2][3])
|
||||||
else:
|
else:
|
||||||
for j in range(parserStack[-1][4], len(parserStack[-1][3])):
|
j = 0
|
||||||
|
for j in range(
|
||||||
|
parserStack[-1][4], len(parserStack[-1][3])):
|
||||||
if parserStack[-1][3][j] not in "lL0123456789":
|
if parserStack[-1][3][j] not in "lL0123456789":
|
||||||
break
|
break
|
||||||
# pylint: disable=undefined-loop-variable
|
|
||||||
parserStack.append([
|
parserStack.append([
|
||||||
size, size, isArray,
|
size, size, isArray,
|
||||||
parserStack[-1][3][parserStack[-1][4]:j + 1], 0, []
|
parserStack[-1][3][parserStack[-1][4]:j + 1], 0, []
|
||||||
|
@ -268,7 +271,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
elif i == "s":
|
elif i == "s":
|
||||||
# if parserStack[-2][2]:
|
# if parserStack[-2][2]:
|
||||||
# parserStack[-1][5].append(self.payload[
|
# parserStack[-1][5].append(self.payload[
|
||||||
# self.payloadOffset:self.payloadOffset + parserStack[-1][0]])
|
# self.payloadOffset:self.payloadOffset
|
||||||
|
# + parserStack[-1][0]])
|
||||||
# else:
|
# else:
|
||||||
parserStack[-1][5] = self.payload[
|
parserStack[-1][5] = self.payload[
|
||||||
self.payloadOffset:self.payloadOffset + parserStack[-1][0]]
|
self.payloadOffset:self.payloadOffset + parserStack[-1][0]]
|
||||||
|
@ -339,6 +343,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _command_inv(self, dandelion=False):
|
def _command_inv(self, dandelion=False):
|
||||||
|
"""
|
||||||
|
Common inv announce implementation:
|
||||||
|
both inv and dinv depending on *dandelion* kwarg
|
||||||
|
"""
|
||||||
items = self.decode_payload_content("l32s")
|
items = self.decode_payload_content("l32s")
|
||||||
|
|
||||||
if len(items) > MAX_OBJECT_COUNT:
|
if len(items) > MAX_OBJECT_COUNT:
|
||||||
|
@ -376,10 +384,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
nonce, expiresTime, objectType, version, streamNumber,
|
nonce, expiresTime, objectType, version, streamNumber,
|
||||||
self.payload, self.payloadOffset)
|
self.payload, self.payloadOffset)
|
||||||
|
|
||||||
if len(self.payload) - self.payloadOffset > MAX_OBJECT_PAYLOAD_SIZE:
|
payload_len = len(self.payload) - self.payloadOffset
|
||||||
|
if payload_len > MAX_OBJECT_PAYLOAD_SIZE:
|
||||||
logger.info(
|
logger.info(
|
||||||
'The payload length of this object is too large (%d bytes).'
|
'The payload length of this object is too large'
|
||||||
' Ignoring it.', len(self.payload) - self.payloadOffset)
|
' (%d bytes). Ignoring it.', payload_len)
|
||||||
raise BMProtoExcessiveDataError()
|
raise BMProtoExcessiveDataError()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -434,9 +443,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
|
|
||||||
def bm_command_addr(self):
|
def bm_command_addr(self):
|
||||||
"""Incoming addresses, process them"""
|
"""Incoming addresses, process them"""
|
||||||
# pylint: disable=redefined-outer-name
|
# not using services
|
||||||
addresses = self._decode_addr()
|
for seenTime, stream, _, ip, port in self._decode_addr():
|
||||||
for seenTime, stream, _, ip, port in addresses:
|
|
||||||
ip = str(ip)
|
ip = str(ip)
|
||||||
if (
|
if (
|
||||||
stream not in state.streamsInWhichIAmParticipating
|
stream not in state.streamsInWhichIAmParticipating
|
||||||
|
@ -446,8 +454,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
continue
|
continue
|
||||||
decodedIP = protocol.checkIPAddress(ip)
|
decodedIP = protocol.checkIPAddress(ip)
|
||||||
if (
|
if (
|
||||||
decodedIP
|
decodedIP and time.time() - seenTime > 0
|
||||||
and time.time() - seenTime > 0
|
|
||||||
and seenTime > time.time() - ADDRESS_ALIVE
|
and seenTime > time.time() - ADDRESS_ALIVE
|
||||||
and port > 0
|
and port > 0
|
||||||
):
|
):
|
||||||
|
@ -475,7 +482,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
self.append_write_buf(protocol.CreatePacket('pong'))
|
self.append_write_buf(protocol.CreatePacket('pong'))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def bm_command_pong(self): # pylint: disable=no-self-use
|
@staticmethod
|
||||||
|
def bm_command_pong():
|
||||||
"""
|
"""
|
||||||
Incoming pong.
|
Incoming pong.
|
||||||
Ignore it. PyBitmessage pings connections after about 5 minutes
|
Ignore it. PyBitmessage pings connections after about 5 minutes
|
||||||
|
@ -562,7 +570,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
" compared to mine. Closing connection.", fatal=2))
|
" compared to mine. Closing connection.", fatal=2))
|
||||||
logger.info(
|
logger.info(
|
||||||
"%s's time is too far in the future (%s seconds)."
|
"%s's time is too far in the future (%s seconds)."
|
||||||
" Closing connection to it.", self.destination, self.timeOffset)
|
" Closing connection to it.",
|
||||||
|
self.destination, self.timeOffset)
|
||||||
BMProto.timeOffsetWrongCount += 1
|
BMProto.timeOffsetWrongCount += 1
|
||||||
return False
|
return False
|
||||||
elif self.timeOffset < -MAX_TIME_OFFSET:
|
elif self.timeOffset < -MAX_TIME_OFFSET:
|
||||||
|
@ -570,8 +579,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
errorText="Your time is too far in the past compared to mine."
|
errorText="Your time is too far in the past compared to mine."
|
||||||
" Closing connection.", fatal=2))
|
" Closing connection.", fatal=2))
|
||||||
logger.info(
|
logger.info(
|
||||||
"%s's time is too far in the past (timeOffset %s seconds)."
|
"%s's time is too far in the past"
|
||||||
" Closing connection to it.", self.destination, self.timeOffset)
|
" (timeOffset %s seconds). Closing connection to it.",
|
||||||
|
self.destination, self.timeOffset)
|
||||||
BMProto.timeOffsetWrongCount += 1
|
BMProto.timeOffsetWrongCount += 1
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
|
@ -584,7 +594,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
'Closed connection to %s because there is no overlapping'
|
'Closed connection to %s because there is no overlapping'
|
||||||
' interest in streams.', self.destination)
|
' interest in streams.', self.destination)
|
||||||
return False
|
return False
|
||||||
if self.destination in connectionpool.BMConnectionPool().inboundConnections:
|
if connectionpool.BMConnectionPool().inboundConnections.get(
|
||||||
|
self.destination):
|
||||||
try:
|
try:
|
||||||
if not protocol.checkSocksIP(self.destination.host):
|
if not protocol.checkSocksIP(self.destination.host):
|
||||||
self.append_write_buf(protocol.assembleErrorMessage(
|
self.append_write_buf(protocol.assembleErrorMessage(
|
||||||
|
@ -594,7 +605,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
'Closed connection to %s because we are already'
|
'Closed connection to %s because we are already'
|
||||||
' connected to that IP.', self.destination)
|
' connected to that IP.', self.destination)
|
||||||
return False
|
return False
|
||||||
except Exception:
|
except Exception: # TODO: exception types
|
||||||
pass
|
pass
|
||||||
if not self.isOutbound:
|
if not self.isOutbound:
|
||||||
# incoming from a peer we're connected to as outbound,
|
# incoming from a peer we're connected to as outbound,
|
||||||
|
@ -614,8 +625,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
'Closed connection to %s due to server full'
|
'Closed connection to %s due to server full'
|
||||||
' or duplicate inbound/outbound.', self.destination)
|
' or duplicate inbound/outbound.', self.destination)
|
||||||
return False
|
return False
|
||||||
if connectionpool.BMConnectionPool().isAlreadyConnected(
|
if connectionpool.BMConnectionPool().isAlreadyConnected(self.nonce):
|
||||||
self.nonce):
|
|
||||||
self.append_write_buf(protocol.assembleErrorMessage(
|
self.append_write_buf(protocol.assembleErrorMessage(
|
||||||
errorText="I'm connected to myself. Closing connection.",
|
errorText="I'm connected to myself. Closing connection.",
|
||||||
fatal=2))
|
fatal=2))
|
||||||
|
@ -628,7 +638,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def stopDownloadingObject(hashId, forwardAnyway=False):
|
def stopDownloadingObject(hashId, forwardAnyway=False):
|
||||||
"""Stop downloading an object"""
|
"""Stop downloading object *hashId*"""
|
||||||
for connection in connectionpool.BMConnectionPool().connections():
|
for connection in connectionpool.BMConnectionPool().connections():
|
||||||
try:
|
try:
|
||||||
del connection.objectsNewToMe[hashId]
|
del connection.objectsNewToMe[hashId]
|
||||||
|
@ -658,7 +668,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
try:
|
try:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
'%(host)s:%(port)i: closing', self.destination._asdict())
|
'%s:%i: closing',
|
||||||
|
self.destination.host, self.destination.port)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
logger.debug('Disconnected socket closing')
|
logger.debug('Disconnected socket closing')
|
||||||
AdvancedDispatcher.handle_close(self)
|
AdvancedDispatcher.handle_close(self)
|
||||||
|
|
|
@ -61,9 +61,9 @@ class Proxy(AdvancedDispatcher):
|
||||||
@proxy.setter
|
@proxy.setter
|
||||||
def proxy(self, address):
|
def proxy(self, address):
|
||||||
"""Set proxy IP and port"""
|
"""Set proxy IP and port"""
|
||||||
if (not isinstance(address, tuple) or len(address) < 2 or
|
if (not isinstance(address, tuple) or len(address) < 2
|
||||||
not isinstance(address[0], str) or
|
or not isinstance(address[0], str)
|
||||||
not isinstance(address[1], int)):
|
or not isinstance(address[1], int)):
|
||||||
raise ValueError
|
raise ValueError
|
||||||
self.__class__._proxy = address
|
self.__class__._proxy = address
|
||||||
|
|
||||||
|
@ -113,7 +113,6 @@ class Proxy(AdvancedDispatcher):
|
||||||
self.destination = address
|
self.destination = address
|
||||||
self.isOutbound = True
|
self.isOutbound = True
|
||||||
self.fullyEstablished = False
|
self.fullyEstablished = False
|
||||||
self.connectedAt = 0
|
|
||||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
if BMConfigParser().safeGetBoolean(
|
if BMConfigParser().safeGetBoolean(
|
||||||
"bitmessagesettings", "socksauthentication"):
|
"bitmessagesettings", "socksauthentication"):
|
||||||
|
@ -145,6 +144,5 @@ class Proxy(AdvancedDispatcher):
|
||||||
|
|
||||||
def state_proxy_handshake_done(self):
|
def state_proxy_handshake_done(self):
|
||||||
"""Handshake is complete at this point"""
|
"""Handshake is complete at this point"""
|
||||||
# pylint: disable=attribute-defined-outside-init
|
|
||||||
self.connectedAt = time.time()
|
self.connectedAt = time.time()
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -51,7 +51,6 @@ class TCPConnection(BMProto, TLSDispatcher):
|
||||||
self.verackSent = False
|
self.verackSent = False
|
||||||
self.streams = [0]
|
self.streams = [0]
|
||||||
self.fullyEstablished = False
|
self.fullyEstablished = False
|
||||||
self.connectedAt = 0
|
|
||||||
self.skipUntil = 0
|
self.skipUntil = 0
|
||||||
if address is None and sock is not None:
|
if address is None and sock is not None:
|
||||||
self.destination = Peer(*sock.getpeername())
|
self.destination = Peer(*sock.getpeername())
|
||||||
|
|
|
@ -16,7 +16,6 @@ logger = logging.getLogger('default')
|
||||||
|
|
||||||
_DISCONNECTED_SSL = frozenset((ssl.SSL_ERROR_EOF,))
|
_DISCONNECTED_SSL = frozenset((ssl.SSL_ERROR_EOF,))
|
||||||
|
|
||||||
# sslProtocolVersion
|
|
||||||
if sys.version_info >= (2, 7, 13):
|
if sys.version_info >= (2, 7, 13):
|
||||||
# this means TLSv1 or higher
|
# this means TLSv1 or higher
|
||||||
# in the future change to
|
# in the future change to
|
||||||
|
@ -27,14 +26,16 @@ elif sys.version_info >= (2, 7, 9):
|
||||||
# SSLv2 and 3 are excluded with an option after context is created
|
# SSLv2 and 3 are excluded with an option after context is created
|
||||||
sslProtocolVersion = ssl.PROTOCOL_SSLv23
|
sslProtocolVersion = ssl.PROTOCOL_SSLv23
|
||||||
else:
|
else:
|
||||||
# this means TLSv1, there is no way to set "TLSv1 or higher" or
|
# this means TLSv1, there is no way to set "TLSv1 or higher"
|
||||||
# "TLSv1.2" in < 2.7.9
|
# or "TLSv1.2" in < 2.7.9
|
||||||
sslProtocolVersion = ssl.PROTOCOL_TLSv1
|
sslProtocolVersion = ssl.PROTOCOL_TLSv1
|
||||||
|
|
||||||
|
|
||||||
# ciphers
|
# ciphers
|
||||||
if ssl.OPENSSL_VERSION_NUMBER >= 0x10100000 and not \
|
if (
|
||||||
ssl.OPENSSL_VERSION.startswith("LibreSSL"):
|
ssl.OPENSSL_VERSION_NUMBER >= 0x10100000
|
||||||
|
and not ssl.OPENSSL_VERSION.startswith(b"LibreSSL")
|
||||||
|
):
|
||||||
sslProtocolCiphers = "AECDH-AES256-SHA@SECLEVEL=0"
|
sslProtocolCiphers = "AECDH-AES256-SHA@SECLEVEL=0"
|
||||||
else:
|
else:
|
||||||
sslProtocolCiphers = "AECDH-AES256-SHA"
|
sslProtocolCiphers = "AECDH-AES256-SHA"
|
||||||
|
@ -47,16 +48,10 @@ class TLSDispatcher(AdvancedDispatcher):
|
||||||
def __init__(self, _=None, sock=None, certfile=None, keyfile=None,
|
def __init__(self, _=None, sock=None, certfile=None, keyfile=None,
|
||||||
server_side=False, ciphers=sslProtocolCiphers):
|
server_side=False, ciphers=sslProtocolCiphers):
|
||||||
self.want_read = self.want_write = True
|
self.want_read = self.want_write = True
|
||||||
if certfile is None:
|
self.certfile = certfile or os.path.join(
|
||||||
self.certfile = os.path.join(
|
paths.codePath(), 'sslkeys', 'cert.pem')
|
||||||
paths.codePath(), 'sslkeys', 'cert.pem')
|
self.keyfile = keyfile or os.path.join(
|
||||||
else:
|
paths.codePath(), 'sslkeys', 'key.pem')
|
||||||
self.certfile = certfile
|
|
||||||
if keyfile is None:
|
|
||||||
self.keyfile = os.path.join(
|
|
||||||
paths.codePath(), 'sslkeys', 'key.pem')
|
|
||||||
else:
|
|
||||||
self.keyfile = keyfile
|
|
||||||
self.server_side = server_side
|
self.server_side = server_side
|
||||||
self.ciphers = ciphers
|
self.ciphers = ciphers
|
||||||
self.tlsStarted = False
|
self.tlsStarted = False
|
||||||
|
@ -66,7 +61,6 @@ class TLSDispatcher(AdvancedDispatcher):
|
||||||
|
|
||||||
def state_tls_init(self):
|
def state_tls_init(self):
|
||||||
"""Prepare sockets for TLS handshake"""
|
"""Prepare sockets for TLS handshake"""
|
||||||
# pylint: disable=attribute-defined-outside-init
|
|
||||||
self.isSSL = True
|
self.isSSL = True
|
||||||
self.tlsStarted = True
|
self.tlsStarted = True
|
||||||
# Once the connection has been established,
|
# Once the connection has been established,
|
||||||
|
@ -96,8 +90,6 @@ class TLSDispatcher(AdvancedDispatcher):
|
||||||
self.want_read = self.want_write = True
|
self.want_read = self.want_write = True
|
||||||
self.set_state("tls_handshake")
|
self.set_state("tls_handshake")
|
||||||
return False
|
return False
|
||||||
# if hasattr(self.socket, "context"):
|
|
||||||
# self.socket.context.set_ecdh_curve("secp256k1")
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def state_tls_handshake():
|
def state_tls_handshake():
|
||||||
|
@ -112,9 +104,9 @@ class TLSDispatcher(AdvancedDispatcher):
|
||||||
try:
|
try:
|
||||||
if self.tlsStarted and not self.tlsDone and not self.write_buf:
|
if self.tlsStarted and not self.tlsDone and not self.write_buf:
|
||||||
return self.want_write
|
return self.want_write
|
||||||
return AdvancedDispatcher.writable(self)
|
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
return AdvancedDispatcher.writable(self)
|
pass
|
||||||
|
return AdvancedDispatcher.writable(self)
|
||||||
|
|
||||||
def readable(self):
|
def readable(self):
|
||||||
"""Handle readable check for TLS-enabled sockets"""
|
"""Handle readable check for TLS-enabled sockets"""
|
||||||
|
@ -126,14 +118,14 @@ class TLSDispatcher(AdvancedDispatcher):
|
||||||
return self.want_read
|
return self.want_read
|
||||||
# prior to TLS handshake,
|
# prior to TLS handshake,
|
||||||
# receiveDataThread should emulate synchronous behaviour
|
# receiveDataThread should emulate synchronous behaviour
|
||||||
elif not self.fullyEstablished and (
|
if not self.fullyEstablished and (
|
||||||
self.expectBytes == 0 or not self.write_buf_empty()):
|
self.expectBytes == 0 or not self.write_buf_empty()):
|
||||||
return False
|
return False
|
||||||
return AdvancedDispatcher.readable(self)
|
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
return AdvancedDispatcher.readable(self)
|
pass
|
||||||
|
return AdvancedDispatcher.readable(self)
|
||||||
|
|
||||||
def handle_read(self): # pylint: disable=inconsistent-return-statements
|
def handle_read(self):
|
||||||
"""
|
"""
|
||||||
Handle reads for sockets during TLS handshake. Requires special
|
Handle reads for sockets during TLS handshake. Requires special
|
||||||
treatment as during the handshake, buffers must remain empty
|
treatment as during the handshake, buffers must remain empty
|
||||||
|
@ -142,29 +134,20 @@ class TLSDispatcher(AdvancedDispatcher):
|
||||||
try:
|
try:
|
||||||
# wait for write buffer flush
|
# wait for write buffer flush
|
||||||
if self.tlsStarted and not self.tlsDone and not self.write_buf:
|
if self.tlsStarted and not self.tlsDone and not self.write_buf:
|
||||||
# logger.debug(
|
|
||||||
# "%s:%i TLS handshaking (read)", self.destination.host,
|
|
||||||
# self.destination.port)
|
|
||||||
self.tls_handshake()
|
self.tls_handshake()
|
||||||
else:
|
else:
|
||||||
# logger.debug(
|
AdvancedDispatcher.handle_read(self)
|
||||||
# "%s:%i Not TLS handshaking (read)", self.destination.host,
|
|
||||||
# self.destination.port)
|
|
||||||
return AdvancedDispatcher.handle_read(self)
|
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
return AdvancedDispatcher.handle_read(self)
|
AdvancedDispatcher.handle_read(self)
|
||||||
except ssl.SSLError as err:
|
except ssl.SSLError as err:
|
||||||
self.close_reason = "SSL Error in handle_read"
|
|
||||||
if err.errno == ssl.SSL_ERROR_WANT_READ:
|
if err.errno == ssl.SSL_ERROR_WANT_READ:
|
||||||
return
|
return
|
||||||
elif err.errno in _DISCONNECTED_SSL:
|
if err.errno not in _DISCONNECTED_SSL:
|
||||||
self.handle_close()
|
logger.info("SSL Error: %s", err)
|
||||||
return
|
self.close_reason = "SSL Error in handle_read"
|
||||||
logger.info("SSL Error: %s", err)
|
|
||||||
self.handle_close()
|
self.handle_close()
|
||||||
return
|
|
||||||
|
|
||||||
def handle_write(self): # pylint: disable=inconsistent-return-statements
|
def handle_write(self):
|
||||||
"""
|
"""
|
||||||
Handle writes for sockets during TLS handshake. Requires special
|
Handle writes for sockets during TLS handshake. Requires special
|
||||||
treatment as during the handshake, buffers must remain empty
|
treatment as during the handshake, buffers must remain empty
|
||||||
|
@ -173,27 +156,18 @@ class TLSDispatcher(AdvancedDispatcher):
|
||||||
try:
|
try:
|
||||||
# wait for write buffer flush
|
# wait for write buffer flush
|
||||||
if self.tlsStarted and not self.tlsDone and not self.write_buf:
|
if self.tlsStarted and not self.tlsDone and not self.write_buf:
|
||||||
# logger.debug(
|
|
||||||
# "%s:%i TLS handshaking (write)", self.destination.host,
|
|
||||||
# self.destination.port)
|
|
||||||
self.tls_handshake()
|
self.tls_handshake()
|
||||||
else:
|
else:
|
||||||
# logger.debug(
|
AdvancedDispatcher.handle_write(self)
|
||||||
# "%s:%i Not TLS handshaking (write)", self.destination.host,
|
|
||||||
# self.destination.port)
|
|
||||||
return AdvancedDispatcher.handle_write(self)
|
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
return AdvancedDispatcher.handle_write(self)
|
AdvancedDispatcher.handle_write(self)
|
||||||
except ssl.SSLError as err:
|
except ssl.SSLError as err:
|
||||||
self.close_reason = "SSL Error in handle_write"
|
|
||||||
if err.errno == ssl.SSL_ERROR_WANT_WRITE:
|
if err.errno == ssl.SSL_ERROR_WANT_WRITE:
|
||||||
return 0
|
return
|
||||||
elif err.errno in _DISCONNECTED_SSL:
|
if err.errno not in _DISCONNECTED_SSL:
|
||||||
self.handle_close()
|
logger.info("SSL Error: %s", err)
|
||||||
return 0
|
self.close_reason = "SSL Error in handle_write"
|
||||||
logger.info("SSL Error: %s", err)
|
|
||||||
self.handle_close()
|
self.handle_close()
|
||||||
return
|
|
||||||
|
|
||||||
def tls_handshake(self):
|
def tls_handshake(self):
|
||||||
"""Perform TLS handshake and handle its stages"""
|
"""Perform TLS handshake and handle its stages"""
|
||||||
|
|
|
@ -28,7 +28,6 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
|
||||||
# .. todo:: sort out streams
|
# .. todo:: sort out streams
|
||||||
self.streams = [1]
|
self.streams = [1]
|
||||||
self.fullyEstablished = True
|
self.fullyEstablished = True
|
||||||
self.connectedAt = 0
|
|
||||||
self.skipUntil = 0
|
self.skipUntil = 0
|
||||||
if sock is None:
|
if sock is None:
|
||||||
if host is None:
|
if host is None:
|
||||||
|
|
Reference in New Issue
Block a user