From cba749088a297327175af68613ca1944275d2b8b Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sat, 10 Jun 2017 10:13:49 +0200 Subject: [PATCH] Asyncore updates - mainly work on proxy support, but it's still not fully working - minor bugfixes --- src/network/asyncore_pollchoose.py | 2 +- src/network/bmproto.py | 9 ++++++--- src/network/connectionpool.py | 3 +++ src/network/objectracker.py | 1 + src/network/proxy.py | 20 +++++++++++++++++++- src/network/socks4a.py | 2 +- src/network/socks5.py | 8 +++----- src/network/tcp.py | 28 ++++++++++++++++++---------- src/network/udp.py | 1 + 9 files changed, 53 insertions(+), 21 deletions(-) diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index 8131b2b2..c212effe 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -57,7 +57,7 @@ import warnings import os from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \ ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \ - ECONNREFUSED, EHOSTUNREACH, ENETUNREACH, ENOTSOCK, \ + ECONNREFUSED, EHOSTUNREACH, ENETUNREACH, ENOTSOCK, EINTR, \ errorcode try: from errno import WSAEWOULDBLOCK diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 97c354cd..cd36726e 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -282,11 +282,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker): except (BMObjectExpiredError, BMObjectUnwantedStreamError): for connection in network.connectionpool.BMConnectionPool().inboundConnections.values() + network.connectionpool.BMConnectionPool().outboundConnections.values(): try: - del connection.objectsNewtoThem[hashId] + del connection.objectsNewtoThem[self.object.inventoryHash] except KeyError: pass try: - del connection.objectsNewToMe[hashId] + del connection.objectsNewToMe[self.object.inventoryHash] except KeyError: pass if not BMConfigParser().get("inventory", "acceptmismatch"): @@ -459,7 +459,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def handle_close(self, reason=None): self.set_state("close") if reason is None: - logger.debug("%s:%i: closing", self.destination.host, self.destination.port) + try: + logger.debug("%s:%i: closing", self.destination.host, self.destination.port) + except AttributeError: + logger.debug("Disconnected socket closing") else: logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, reason) AdvancedDispatcher.handle_close(self) diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 6aa6e49b..1517f4d7 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -7,6 +7,7 @@ import re from bmconfigparser import BMConfigParser from debug import logger import helper_bootstrap +from network.proxy import Proxy import network.bmproto import network.tcp import network.udp @@ -129,6 +130,8 @@ class BMConnectionPool(object): if not self.bootstrapped: helper_bootstrap.dns() self.bootstrapped = True + Proxy.proxy = (BMConfigParser().safeGet("bitmessagesettings", "sockshostname"), + BMConfigParser().safeGetInt("bitmessagesettings", "socksport")) established = sum(1 for c in self.outboundConnections.values() if (c.connected and c.fullyEstablished)) pending = len(self.outboundConnections) - established if established < BMConfigParser().safeGetInt("bitmessagesettings", "maxoutboundconnections"): diff --git a/src/network/objectracker.py b/src/network/objectracker.py index d073d78a..0c4a8d56 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -54,6 +54,7 @@ class ObjectTracker(object): def clean(self): if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod: if haveBloom: + # FIXME if PendingDownloadQueue().size() == 0: self.initInvBloom() self.initAddrBloom() diff --git a/src/network/proxy.py b/src/network/proxy.py index e3b5acee..3b7cc848 100644 --- a/src/network/proxy.py +++ b/src/network/proxy.py @@ -1,7 +1,10 @@ import socket +import state + from advanceddispatcher import AdvancedDispatcher import asyncore_pollchoose as asyncore +import network.connectionpool class ProxyError(Exception): pass class GeneralProxyError(ProxyError): pass @@ -32,10 +35,25 @@ class Proxy(AdvancedDispatcher): self.__class__._auth = authTuple def __init__(self, address): - if type(address) != tuple or (len(address) < 2) or (type(str(address[0])) != type('')) or (type(address[1]) != int): + if not isinstance(address, state.Peer): raise ValueError AdvancedDispatcher.__init__(self) self.destination = address + self.isOutbound = True self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(self.proxy) print "connecting in background to %s:%i" % (self.proxy[0], self.proxy[1]) + + def handle_connect(self): + try: + AdvancedDispatcher.handle_connect(self) + except socket.error as e: + if e.errno in asyncore._DISCONNECTED: + logger.debug("%s:%i: Connection failed: %s" % (self.destination.host, self.destination.port, str(e))) + return + + def state_proxy_handshake_done(self): + self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False)) + self.connectedAt = time.time() + return False + diff --git a/src/network/socks4a.py b/src/network/socks4a.py index 4b6b64fa..08bd18cf 100644 --- a/src/network/socks4a.py +++ b/src/network/socks4a.py @@ -43,7 +43,7 @@ class Socks4a(Proxy): self.__proxypeername = (socket.inet_ntoa(self.ipaddr), self.destination[1]) else: self.__proxypeername = (self.destination[0], self.destport) - self.set_state("socks_handshake_done", 8) + self.set_state("proxy_handshake_done", 8) def proxy_sock_name(self): return socket.inet_ntoa(self.__proxysockname[0]) diff --git a/src/network/socks5.py b/src/network/socks5.py index 0d1717d4..6745308c 100644 --- a/src/network/socks5.py +++ b/src/network/socks5.py @@ -4,6 +4,7 @@ import struct from advanceddispatcher import AdvancedDispatcher import asyncore_pollchoose as asyncore from proxy import Proxy, ProxyError, GeneralProxyError +import network.connectionpool class Socks5AuthError(ProxyError): pass class Socks5Error(ProxyError): pass @@ -103,7 +104,7 @@ class Socks5(Proxy): def state_proxy_addr_2_2(self): if not self.read_buf_sufficient(self.address_length): return False - self.boundaddr = read_buf + self.boundaddr = self.read_buf self.set_state("proxy_port", self.address_length) def state_proxy_port(self): @@ -115,14 +116,11 @@ class Socks5(Proxy): self.__proxypeername = (socket.inet_ntoa(self.ipaddr), self.destination[1]) else: self.__proxypeername = (self.destination[0], self.destport) - self.set_state("socks_handshake_done", 2) + self.set_state("proxy_handshake_done", 2) def proxy_sock_name(self): return socket.inet_ntoa(self.__proxysockname[0]) - def state_socks_handshake_done(self): - return False - class Socks5Connection(Socks5): def __init__(self, address): diff --git a/src/network/tcp.py b/src/network/tcp.py index 986a4b63..badb6774 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -49,6 +49,10 @@ class TCPConnection(BMProto, TLSDispatcher): TLSDispatcher.__init__(self, sock, server_side=True) self.connectedAt = time.time() logger.debug("Received connection from %s:%i", self.destination.host, self.destination.port) + elif address is not None and sock is not None: + TLSDispatcher.__init__(self, sock, server_side=False) + self.isOutbound = True + logger.debug("Outbound proxy connection to %s:%i", self.destination.host, self.destination.port) else: self.destination = address self.isOutbound = True @@ -159,33 +163,37 @@ class TCPConnection(BMProto, TLSDispatcher): self.connectedAt = time.time() def handle_read(self): - try: - TLSDispatcher.handle_read(self) - except socket.error as e: - logger.debug("%s:%i: Handle read fail: %s" % (self.destination.host, self.destination.port, str(e))) +# try: + TLSDispatcher.handle_read(self) +# except socket.error as e: +# logger.debug("%s:%i: Handle read fail: %s" % (self.destination.host, self.destination.port, str(e))) def handle_write(self): - try: - TLSDispatcher.handle_write(self) - except socket.error as e: - logger.debug("%s:%i: Handle write fail: %s" % (self.destination.host, self.destination.port, str(e))) +# try: + TLSDispatcher.handle_write(self) +# except socket.error as e: +# logger.debug("%s:%i: Handle write fail: %s" % (self.destination.host, self.destination.port, str(e))) class Socks5BMConnection(Socks5Connection, TCPConnection): def __init__(self, address): Socks5Connection.__init__(self, address=address) + TCPConnection.__init__(self, address=address, sock=self.socket) + self.set_state("init") def state_socks_handshake_done(self): - TCPConnection.state_init(self) + self.set_state("bm_header", expectBytes=protocol.Header.size) return False class Socks4aBMConnection(Socks4aConnection, TCPConnection): def __init__(self, address): Socks4aConnection.__init__(self, address=address) + TCPConnection.__init__(self, address=address, sock=self.socket) + self.set_state("init") def state_socks_handshake_done(self): - TCPConnection.state_init(self) + self.set_state("bm_header", expectBytes=protocol.Header.size) return False diff --git a/src/network/udp.py b/src/network/udp.py index 8c710997..241bf9d7 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -3,6 +3,7 @@ from binascii import hexlify import hashlib import math import time +import Queue import socket import struct import random