From 95095526639b7a127be9c7fcb9c2cb2d25ab5e1c Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Thu, 1 Dec 2016 16:48:04 +0100 Subject: [PATCH] New network backend (WIP, not integrated) - current snapshot of the new network backend code - not working yet, just base classes and no integration --- src/network/http.py | 49 +++++++++ src/network/httpd.py | 148 ++++++++++++++++++++++++++ src/network/https.py | 53 ++++++++++ src/network/proxy.py | 241 +++++++++++++++++++++++++++++++++++++++++++ src/network/tls.py | 89 ++++++++++++++++ 5 files changed, 580 insertions(+) create mode 100644 src/network/http.py create mode 100644 src/network/httpd.py create mode 100644 src/network/https.py create mode 100644 src/network/proxy.py create mode 100644 src/network/tls.py diff --git a/src/network/http.py b/src/network/http.py new file mode 100644 index 00000000..56d24915 --- /dev/null +++ b/src/network/http.py @@ -0,0 +1,49 @@ +import asyncore +import socket +import time + +requestCount = 0 +parallel = 50 +duration = 60 + + +class HTTPClient(asyncore.dispatcher): + port = 12345 + + def __init__(self, host, path, connect=True): + if not hasattr(self, '_map'): + asyncore.dispatcher.__init__(self) + if connect: + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect((host, HTTPClient.port)) + self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % path + + def handle_close(self): + global requestCount + requestCount += 1 + self.close() + + def handle_read(self): +# print self.recv(8192) + self.recv(8192) + + def writable(self): + return (len(self.buffer) > 0) + + def handle_write(self): + sent = self.send(self.buffer) + self.buffer = self.buffer[sent:] + +if __name__ == "__main__": + # initial fill + for i in range(parallel): + HTTPClient('127.0.0.1', '/') + start = time.time() + while (time.time() - start < duration): + if (len(asyncore.socket_map) < parallel): + for i in range(parallel - len(asyncore.socket_map)): + HTTPClient('127.0.0.1', '/') + print "Active connections: %i" % (len(asyncore.socket_map)) + asyncore.loop(count=len(asyncore.socket_map)/2) + if requestCount % 100 == 0: + print "Processed %i total messages" % (requestCount) diff --git a/src/network/httpd.py b/src/network/httpd.py new file mode 100644 index 00000000..b8b6ba21 --- /dev/null +++ b/src/network/httpd.py @@ -0,0 +1,148 @@ +import asyncore +import socket + +from tls import TLSHandshake + +class HTTPRequestHandler(asyncore.dispatcher): + response = """HTTP/1.0 200 OK\r +Date: Sun, 23 Oct 2016 18:02:00 GMT\r +Content-Type: text/html; charset=UTF-8\r +Content-Encoding: UTF-8\r +Content-Length: 136\r +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT\r +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux)\r +ETag: "3f80f-1b6-3e1cb03b"\r +Accept-Ranges: bytes\r +Connection: close\r +\r + + + An Example Page + + + Hello World, this is a very simple HTML document. + +""" + + def __init__(self, sock): + if not hasattr(self, '_map'): + asyncore.dispatcher.__init__(self, sock) + self.inbuf = "" + self.ready = True + self.busy = False + self.respos = 0 + + def handle_close(self): + self.close() + + def readable(self): + return self.ready + + def writable(self): + return self.busy + + def handle_read(self): + self.inbuf += self.recv(8192) + if self.inbuf[-4:] == "\r\n\r\n": + self.busy = True + self.ready = False + self.inbuf = "" + elif self.inbuf == "": + pass + + def handle_write(self): + if self.busy and self.respos < len(HTTPRequestHandler.response): + written = 0 + written = self.send(HTTPRequestHandler.response[self.respos:65536]) + self.respos += written + elif self.busy: + self.busy = False + self.ready = True + self.close() + + +class HTTPSRequestHandler(HTTPRequestHandler, TLSHandshake): + def __init__(self, sock): + if not hasattr(self, '_map'): + asyncore.dispatcher.__init__(self, sock) +# self.tlsDone = False + TLSHandshake.__init__(self, sock=sock, certfile='/home/shurdeek/src/PyBitmessage/src/sslkeys/cert.pem', keyfile='/home/shurdeek/src/PyBitmessage/src/sslkeys/key.pem', server_side=True) + HTTPRequestHandler.__init__(self, sock) + + def handle_connect(self): + TLSHandshake.handle_connect(self) + + def handle_close(self): + if self.tlsDone: + HTTPRequestHandler.close(self) + else: + TLSHandshake.close(self) + + def readable(self): + if self.tlsDone: + return HTTPRequestHandler.readable(self) + else: + return TLSHandshake.readable(self) + + def handle_read(self): + if self.tlsDone: + HTTPRequestHandler.handle_read(self) + else: + TLSHandshake.handle_read(self) + + def writable(self): + if self.tlsDone: + return HTTPRequestHandler.writable(self) + else: + return TLSHandshake.writable(self) + + def handle_write(self): + if self.tlsDone: + HTTPRequestHandler.handle_write(self) + else: + TLSHandshake.handle_write(self) + + +class HTTPServer(asyncore.dispatcher): + port = 12345 + + def __init__(self): + if not hasattr(self, '_map'): + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind(('127.0.0.1', HTTPServer.port)) + self.connections = 0 + self.listen(5) + + def handle_accept(self): + pair = self.accept() + if pair is not None: + sock, addr = pair +# print 'Incoming connection from %s' % repr(addr) + self.connections += 1 +# if self.connections % 1000 == 0: +# print "Processed %i connections, active %i" % (self.connections, len(asyncore.socket_map)) + HTTPRequestHandler(sock) + + +class HTTPSServer(HTTPServer): + port = 12345 + + def __init__(self): + if not hasattr(self, '_map'): + HTTPServer.__init__(self) + + def handle_accept(self): + pair = self.accept() + if pair is not None: + sock, addr = pair +# print 'Incoming connection from %s' % repr(addr) + self.connections += 1 +# if self.connections % 1000 == 0: +# print "Processed %i connections, active %i" % (self.connections, len(asyncore.socket_map)) + HTTPSRequestHandler(sock) + +if __name__ == "__main__": + client = HTTPSServer() + asyncore.loop() diff --git a/src/network/https.py b/src/network/https.py new file mode 100644 index 00000000..9744a6dc --- /dev/null +++ b/src/network/https.py @@ -0,0 +1,53 @@ +import asyncore + +from http import HTTPClient +from tls import TLSHandshake + +# self.sslSock = ssl.wrap_socket(self.sock, keyfile = os.path.join(shared.codePath(), 'sslkeys', 'key.pem'), certfile = os.path.join(shared.codePath(), 'sslkeys', 'cert.pem'), server_side = not self.initiatedConnection, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False, ciphers='AECDH-AES256-SHA') + + +class HTTPSClient(HTTPClient, TLSHandshake): + def __init__(self, host, path): + if not hasattr(self, '_map'): + asyncore.dispatcher.__init__(self) + self.tlsDone = False +# TLSHandshake.__init__(self, address=(host, 443), certfile='/home/shurdeek/src/PyBitmessage/sslsrc/keys/cert.pem', keyfile='/home/shurdeek/src/PyBitmessage/src/sslkeys/key.pem', server_side=False, ciphers='AECDH-AES256-SHA') + HTTPClient.__init__(self, host, path, connect=False) + TLSHandshake.__init__(self, address=(host, 443), server_side=False) + + def handle_connect(self): + TLSHandshake.handle_connect(self) + + def handle_close(self): + if self.tlsDone: + HTTPClient.close(self) + else: + TLSHandshake.close(self) + + def readable(self): + if self.tlsDone: + return HTTPClient.readable(self) + else: + return TLSHandshake.readable(self) + + def handle_read(self): + if self.tlsDone: + HTTPClient.handle_read(self) + else: + TLSHandshake.handle_read(self) + + def writable(self): + if self.tlsDone: + return HTTPClient.writable(self) + else: + return TLSHandshake.writable(self) + + def handle_write(self): + if self.tlsDone: + HTTPClient.handle_write(self) + else: + TLSHandshake.handle_write(self) + +if __name__ == "__main__": + client = HTTPSClient('anarchy.economicsofbitcoin.com', '/') + asyncore.loop() diff --git a/src/network/proxy.py b/src/network/proxy.py new file mode 100644 index 00000000..aab28c50 --- /dev/null +++ b/src/network/proxy.py @@ -0,0 +1,241 @@ +# SOCKS5 only + +import asyncore +import socket +import struct + + +class Proxy(asyncore.dispatcher): + # these are global, and if you change config during runtime, all active/new + # instances should change too + _proxy = ["", 1080] + _auth = None + _buf_len = 131072 + _remote_dns = True + + @property + def proxy(self): + return self.__class__._proxy + + @proxy.setter + def proxy(self, address): + if (not type(address) in (list,tuple)) or (len(address) < 2) or (type(address[0]) != type('')) or (type(address[1]) != int): + raise + self.__class__._proxy = address + + @property + def auth(self): + return self.__class__._auth + + @auth.setter + def auth(self, authTuple): + self.__class__._auth = authTuple + + def __init__(self, address=None): + if (not type(address) in (list,tuple)) or (len(address) < 2) or (type(address[0]) != type('')) or (type(address[1]) != int): + raise + asyncore.dispatcher.__init__(self, self.sock) + self.destination = address + self.read_buf = "" + self.write_buf = "" + self.stage = "init" + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.sslSocket.setblocking(0) + self.connect(self.proxy) + + def process(self): + try: + getattr(self, "state_" + str(self.stage))() + except AttributeError: + # missing stage + raise + + def set_state(self, state): + self.state = state + self.read_buf = "" + + def writable(self): + return len(self.write_buf) > 0 + + def readable(self): + return len(self.read_buf) < Proxy._buf_len + + def handle_read(self): + self.read_buf += self.recv(Proxy._buf_len) + self.process() + + def handle_write(self): + written = self.send(self.write_buf) + self.write_buf = self.write_buf[written:] + self.process() + + +class SOCKS5(Proxy): + def __init__(self, address=None, sock=None): + Proxy.__init__(self, address) + self.state = 0 + + def handle_connect(self): + self.process() + + def state_init(self): + if self._auth: + self.write_buf += struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02) + else: + self.write_buf += struct.pack('BBB', 0x05, 0x01, 0x00) + self.set_state("auth_1") + + def state_auth_1(self): + if len(self.read_buf) < 2: + return + ret = struct.unpack('BB', self.read_buf) + self.read_buf = self.read_buf[2:] + if ret[0] != 5: + # general error + raise + elif ret[1] == 0: + # no auth required + self.set_state("auth_done") + elif ret[1] == 2: + # username/password + self.write_buf += struct.pack('BB', 1, len(self._auth[0])) + \ + self._auth[0] + struct.pack('B', len(self._auth[1])) + \ + self._auth[1] + self.set_state("auth_1") + else: + if ret[1] == 0xff: + # auth error + raise + else: + # other error + raise + + def state_auth_needed(self): + if len(self.read_buf) < 2: + return + ret = struct.unpack('BB', self.read_buf) + if ret[0] != 1: + # general error + raise + if ret[1] != 0: + # auth error + raise + # all ok + self.set_state = ("auth_done") + + +class SOCKS5Connection(SOCKS5): + def __init__(self, address): + SOCKS5.__init__(self, address) + + def state_auth_done(self): + # Now we can request the actual connection + self.write_buf += struct.pack('BBB', 0x05, 0x01, 0x00) + # If the given destination address is an IP address, we'll + # use the IPv4 address request even if remote resolving was specified. + try: + ipaddr = socket.inet_aton(self.destination[0]) + self.write_buf += chr(0x01).encode() + ipaddr + except socket.error: + # Well it's not an IP number, so it's probably a DNS name. + if Proxy._remote_dns: + # Resolve remotely + ipaddr = None + self.write_buf += chr(0x03).encode() + chr(len(self.destination[0])).encode() + self.destination[0] + else: + # Resolve locally + ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0])) + self.write_buf += chr(0x01).encode() + ipaddr + self.write_buf += struct.pack(">H", self.destination[1]) + self.set_state = ("pre_connect") + + def state_pre_connect(self): + if len(self.read_buf) < 4: + return + # Get the response + if self.read_buf[0:1] != chr(0x05).encode(): + # general error + self.close() + raise + elif self.read_buf[1:2] != chr(0x00).encode(): + # Connection failed + self.close() + if ord(self.read_buf[1:2])<=8: + # socks 5 erro + raise + #raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])])) + else: + raise + #raise Socks5Error((9, _socks5errors[9])) + # Get the bound address/port + elif self_read_buf[3:4] == chr(0x01).encode(): + self.set_state("proxy_addr_long") + elif resp[3:4] == chr(0x03).encode(): + self.set_state("proxy_addr_short") + else: + self.close() + raise GeneralProxyError((1,_generalerrors[1])) + boundport = struct.unpack(">H", self.__recvall(2))[0] + self.__proxysockname = (boundaddr, boundport) + if ipaddr != None: + self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) + else: + self.__proxypeername = (destaddr, destport) + + def state_proxy_addr_long(self): + if len(self.read_buf) < 4: + return + self.boundaddr = self.read_buf[0:4] + self.set_state("proxy_port") + + def state_proxy_addr_short(self): + if len(self.read_buf) < 1: + return + self.boundaddr = self.read_buf[0:1] + self.set_state("proxy_port") + + def state_proxy_port(self): + if len(self.read_buf) < 2: + return + self.boundport = struct.unpack(">H", self.read_buf[0:2])[0] + self.__proxysockname = (self.boundaddr, self.boundport) + if ipaddr != None: + self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) + else: + self.__proxypeername = (destaddr, destport) + + +class SOCKS5Resolver(SOCKS5): + def __init__(self, destpair): + SOCKS5.__init__(self, destpair) + + def state_auth_done(self): + # Now we can request the actual connection + req = struct.pack('BBB', 0x05, 0xF0, 0x00) + req += chr(0x03).encode() + chr(len(host)).encode() + host + req = req + struct.pack(">H", 8444) + self.sendall(req) + # Get the response + ip = "" + resp = self.__recvall(4) + if resp[0:1] != chr(0x05).encode(): + self.close() + raise GeneralProxyError((1, _generalerrors[1])) + elif resp[1:2] != chr(0x00).encode(): + # Connection failed + self.close() + if ord(resp[1:2])<=8: + raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])])) + else: + raise Socks5Error((9, _socks5errors[9])) + # Get the bound address/port + elif resp[3:4] == chr(0x01).encode(): + ip = socket.inet_ntoa(self.__recvall(4)) + elif resp[3:4] == chr(0x03).encode(): + resp = resp + self.recv(1) + ip = self.__recvall(ord(resp[4:5])) + else: + self.close() + raise GeneralProxyError((1,_generalerrors[1])) + boundport = struct.unpack(">H", self.__recvall(2))[0] + return ip diff --git a/src/network/tls.py b/src/network/tls.py new file mode 100644 index 00000000..f690acc9 --- /dev/null +++ b/src/network/tls.py @@ -0,0 +1,89 @@ +""" +SSL/TLS negotiation. +""" + +import asyncore +import socket +import ssl + + +class TLSHandshake(asyncore.dispatcher): + """ + Negotiates a SSL/TLS connection before handing itself spawning a + dispatcher that can deal with the overlying protocol as soon as the + handshake has been completed. + + `handoff` is a function/method called when the handshake has completed. + `address` is a tuple consisting of hostname/address and port to connect to + if nothing is passed in `sock`, which can take an already-connected socket. + `certfile` can take a path to a certificate bundle, and `server_side` + indicates whether the socket is intended to be a server-side or client-side + socket. + """ + + def __init__(self, address=None, sock=None, + certfile=None, keyfile=None, server_side=False, ciphers=None, init_parent=True): + if not hasattr(self, '_map'): + asyncore.dispatcher.__init__(self, sock) + self.want_read = self.want_write = True + self.certfile = certfile + self.keyfile = keyfile + self.server_side = server_side + self.ciphers = ciphers + self.tlsDone = False + if sock is None: + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) +# logger.info('Connecting to %s%d', address[0], address[1]) + self.connect(address) + elif self.connected: + # Initiate the handshake for an already-connected socket. + self.handle_connect() + + def handle_connect(self): + # Once the connection has been established, it's safe to wrap the + # socket. + self.sslSocket = ssl.wrap_socket(self.socket, + server_side=self.server_side, + ssl_version=ssl.PROTOCOL_TLSv1, + certfile=self.certfile, + keyfile=self.keyfile, + ciphers=self.ciphers, + do_handshake_on_connect=False) + self.sslSocket.setblocking(0) + self.want_read = self.want_write = True +# if hasattr(self.socket, "context"): +# self.socket.context.set_ecdh_curve("secp256k1") + + def writable(self): + return self.want_write + + def readable(self): + return self.want_read + + def handle_read(self): + if not self.tlsDone: + self._handshake() + + def handle_write(self): + if not self.tlsDone: + self._handshake() + + def _handshake(self): + """ + Perform the handshake. + """ + try: + self.sslSocket.do_handshake() + except ssl.SSLError, err: + self.want_read = self.want_write = False + if err.args[0] == ssl.SSL_ERROR_WANT_READ: + self.want_read = True + elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: + self.want_write = True + else: + raise + else: + # The handshake has completed, so remove this channel and... + self.del_channel() + self.set_socket(self.sslSocket) + self.tlsDone = True