From 998935be5f91b02279eaa343960298a68f415ce2 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Fri, 10 Mar 2017 23:11:57 +0100 Subject: [PATCH] New network subsystem, WIP - finished proxy design - socks4a and socks5 implemented - authentication not tested - resolver for both socks4a and socks5 - http client example using the proxy --- src/network/advanceddispatcher.py | 23 +- src/network/asyncore_pollchoose.py | 723 +++++++++++++++++++++++++++++ src/network/http-old.py | 49 ++ src/network/http.py | 111 +++-- src/network/proxy.py | 173 +------ src/network/socks4a.py | 104 +++++ src/network/socks5.py | 170 +++++++ 7 files changed, 1146 insertions(+), 207 deletions(-) create mode 100644 src/network/asyncore_pollchoose.py create mode 100644 src/network/http-old.py create mode 100644 src/network/socks4a.py create mode 100644 src/network/socks5.py diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index 8258412a..9ec7f496 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -1,12 +1,13 @@ -import asyncore +import asyncore_pollchoose as asyncore class AdvancedDispatcher(asyncore.dispatcher): _buf_len = 131072 - def __init__(self, sock): - asyncore.dispatcher.__init__(self, sock) - self.read_buf = "" - self.write_buf = "" + def __init__(self): + if not hasattr(self, '_map'): + asyncore.dispatcher.__init__(self) + self.read_buf = b"" + self.write_buf = b"" self.state = "init" def slice_read_buf(self, length=0): @@ -22,7 +23,7 @@ class AdvancedDispatcher(asyncore.dispatcher): return True def process(self): - if len(self.read_buf) == 0: + if self.state != "init" and len(self.read_buf) == 0: return while True: try: @@ -37,10 +38,10 @@ class AdvancedDispatcher(asyncore.dispatcher): self.state = state def writable(self): - return len(self.write_buf) > 0 + return self.connecting or len(self.write_buf) > 0 def readable(self): - return len(self.read_buf) < AdvancedDispatcher._buf_len + return self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len def handle_read(self): self.read_buf += self.recv(AdvancedDispatcher._buf_len) @@ -49,4 +50,8 @@ class AdvancedDispatcher(asyncore.dispatcher): def handle_write(self): written = self.send(self.write_buf) self.slice_write_buf(written) -# self.process() + + def handle_connect(self): + self.process() + + diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py new file mode 100644 index 00000000..19ec9f42 --- /dev/null +++ b/src/network/asyncore_pollchoose.py @@ -0,0 +1,723 @@ +# -*- Mode: Python -*- +# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp +# Author: Sam Rushing + +# ====================================================================== +# Copyright 1996 by Sam Rushing +# +# All Rights Reserved +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose and without fee is hereby +# granted, provided that the above copyright notice appear in all +# copies and that both that copyright notice and this permission +# notice appear in supporting documentation, and that the name of Sam +# Rushing not be used in advertising or publicity pertaining to +# distribution of the software without specific, written prior +# permission. +# +# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, +# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN +# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR +# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS +# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, +# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# ====================================================================== + +"""Basic infrastructure for asynchronous socket service clients and servers. + +There are only two ways to have a program on a single processor do "more +than one thing at a time". Multi-threaded programming is the simplest and +most popular way to do it, but there is another very different technique, +that lets you have nearly all the advantages of multi-threading, without +actually using multiple threads. it's really only practical if your program +is largely I/O bound. If your program is CPU bound, then pre-emptive +scheduled threads are probably what you really need. Network servers are +rarely CPU-bound, however. + +If your operating system supports the select() system call in its I/O +library (and nearly all do), then you can use it to juggle multiple +communication channels at once; doing other work while your I/O is taking +place in the "background." Although this strategy can seem strange and +complex, especially at first, it is in many ways easier to understand and +control than multi-threaded programming. The module documented here solves +many of the difficult problems for you, making the task of building +sophisticated high-performance network servers and clients a snap. +""" + +import select +import socket +import sys +import time +import warnings + +import os +from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \ + ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \ + errorcode + +_DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, + EBADF)) + +try: + socket_map +except NameError: + socket_map = {} + +def _strerror(err): + try: + return os.strerror(err) + except (ValueError, OverflowError, NameError): + if err in errorcode: + return errorcode[err] + return "Unknown error %s" %err + +class ExitNow(Exception): + pass + +_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit) + +def read(obj): + try: + obj.handle_read_event() + except _reraised_exceptions: + raise + except: + obj.handle_error() + +def write(obj): + try: + obj.handle_write_event() + except _reraised_exceptions: + raise + except: + obj.handle_error() + +def _exception(obj): + try: + obj.handle_expt_event() + except _reraised_exceptions: + raise + except: + obj.handle_error() + +def readwrite(obj, flags): + try: + if flags & select.POLLIN: + obj.handle_read_event() + if flags & select.POLLOUT: + obj.handle_write_event() + if flags & select.POLLPRI: + obj.handle_expt_event() + if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL): + obj.handle_close() + except socket.error as e: + if e.args[0] not in _DISCONNECTED: + obj.handle_error() + else: + obj.handle_close() + except _reraised_exceptions: + raise + except: + obj.handle_error() + +def select_poller(timeout=0.0, map=None): + """A poller which uses select(), available on most platforms.""" + if map is None: + map = socket_map + if map: + r = []; w = []; e = [] + for fd, obj in list(map.items()): + is_r = obj.readable() + is_w = obj.writable() + if is_r: + r.append(fd) + # accepting sockets should not be writable + if is_w and not obj.accepting: + w.append(fd) + if is_r or is_w: + e.append(fd) + if [] == r == w == e: + time.sleep(timeout) + return + + try: + r, w, e = select.select(r, w, e, timeout) + except KeyboardInterrupt: + return + + for fd in r: + obj = map.get(fd) + if obj is None: + continue + read(obj) + + for fd in w: + obj = map.get(fd) + if obj is None: + continue + write(obj) + + for fd in e: + obj = map.get(fd) + if obj is None: + continue + _exception(obj) + +def poll_poller(timeout=0.0, map=None): + """A poller which uses poll(), available on most UNIXen.""" + if map is None: + map = socket_map + if timeout is not None: + # timeout is in milliseconds + timeout = int(timeout*1000) + pollster = select.poll() + if map: + for fd, obj in list(map.items()): + flags = 0 + if obj.readable(): + flags |= select.POLLIN | select.POLLPRI + # accepting sockets should not be writable + if obj.writable() and not obj.accepting: + flags |= select.POLLOUT + if flags: + pollster.register(fd, flags) + try: + r = pollster.poll(timeout) + except KeyboardInterrupt: + r = [] + for fd, flags in r: + obj = map.get(fd) + if obj is None: + continue + readwrite(obj, flags) + +# Aliases for backward compatibility +poll = select_poller +poll2 = poll3 = poll_poller + +def epoll_poller(timeout=0.0, map=None): + """A poller which uses epoll(), supported on Linux 2.5.44 and newer.""" + if map is None: + map = socket_map + pollster = select.epoll() + if map: + for fd, obj in map.items(): + flags = 0 + if obj.readable(): + flags |= select.POLLIN | select.POLLPRI + if obj.writable(): + flags |= select.POLLOUT + if flags: + # Only check for exceptions if object was either readable + # or writable. + flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL + pollster.register(fd, flags) + try: + r = pollster.poll(timeout) + except select.error, err: + if err.args[0] != EINTR: + raise + r = [] + for fd, flags in r: + obj = map.get(fd) + if obj is None: + continue + readwrite(obj, flags) + +def kqueue_poller(timeout=0.0, map=None): + """A poller which uses kqueue(), BSD specific.""" + if map is None: + map = socket_map + if map: + kqueue = select.kqueue() + flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE + selectables = 0 + for fd, obj in map.items(): + filter = 0 + if obj.readable(): + filter |= select.KQ_FILTER_READ + if obj.writable(): + filter |= select.KQ_FILTER_WRITE + if filter: + ev = select.kevent(fd, filter=filter, flags=flags) + kqueue.control([ev], 0) + selectables += 1 + + events = kqueue.control(None, selectables, timeout) + for event in events: + fd = event.ident + obj = map.get(fd) + if obj is None: + continue + if event.filter == select.KQ_FILTER_READ: + read(obj) + if event.filter == select.KQ_FILTER_WRITE: + write(obj) + kqueue.close() + + +def loop(timeout=30.0, use_poll=False, map=None, count=None, + poller=select_poller): + if map is None: + map = socket_map + # code which grants backward compatibility with "use_poll" + # argument which should no longer be used in favor of + # "poller" + if use_poll and hasattr(select, 'poll'): + poller = poll_poller + else: + poller = select_poller + + if count is None: + while map: + poller(timeout, map) + else: + while map and count > 0: + poller(timeout, map) + count = count - 1 + +class dispatcher: + + debug = False + connected = False + accepting = False + connecting = False + closing = False + addr = None + ignore_log_types = frozenset(['warning']) + + def __init__(self, sock=None, map=None): + if map is None: + self._map = socket_map + else: + self._map = map + + self._fileno = None + + if sock: + # Set to nonblocking just to make sure for cases where we + # get a socket from a blocking source. + sock.setblocking(0) + self.set_socket(sock, map) + self.connected = True + # The constructor no longer requires that the socket + # passed be connected. + try: + self.addr = sock.getpeername() + except socket.error as err: + if err.args[0] in (ENOTCONN, EINVAL): + # To handle the case where we got an unconnected + # socket. + self.connected = False + else: + # The socket is broken in some unknown way, alert + # the user and remove it from the map (to prevent + # polling of broken sockets). + self.del_channel(map) + raise + else: + self.socket = None + + def __repr__(self): + status = [self.__class__.__module__+"."+self.__class__.__name__] + if self.accepting and self.addr: + status.append('listening') + elif self.connected: + status.append('connected') + if self.addr is not None: + try: + status.append('%s:%d' % self.addr) + except TypeError: + status.append(repr(self.addr)) + return '<%s at %#x>' % (' '.join(status), id(self)) + + __str__ = __repr__ + + def add_channel(self, map=None): + #self.log_info('adding channel %s' % self) + if map is None: + map = self._map + map[self._fileno] = self + + def del_channel(self, map=None): + fd = self._fileno + if map is None: + map = self._map + if fd in map: + #self.log_info('closing channel %d:%s' % (fd, self)) + del map[fd] + self._fileno = None + + def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM): + self.family_and_type = family, type + sock = socket.socket(family, type) + sock.setblocking(0) + self.set_socket(sock) + + def set_socket(self, sock, map=None): + self.socket = sock +## self.__dict__['socket'] = sock + self._fileno = sock.fileno() + self.add_channel(map) + + def set_reuse_addr(self): + # try to re-use a server port if possible + try: + self.socket.setsockopt( + socket.SOL_SOCKET, socket.SO_REUSEADDR, + self.socket.getsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR) | 1 + ) + except socket.error: + pass + + # ================================================== + # predicates for select() + # these are used as filters for the lists of sockets + # to pass to select(). + # ================================================== + + def readable(self): + return True + + def writable(self): + return True + + # ================================================== + # socket object methods. + # ================================================== + + def listen(self, num): + self.accepting = True + if os.name == 'nt' and num > 5: + num = 5 + return self.socket.listen(num) + + def bind(self, addr): + self.addr = addr + return self.socket.bind(addr) + + def connect(self, address): + self.connected = False + self.connecting = True + err = self.socket.connect_ex(address) + if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \ + or err == EINVAL and os.name in ('nt', 'ce'): + self.addr = address + return + if err in (0, EISCONN): + self.addr = address + self.handle_connect_event() + else: + raise socket.error(err, errorcode[err]) + + def accept(self): + # XXX can return either an address pair or None + try: + conn, addr = self.socket.accept() + except TypeError: + return None + except socket.error as why: + if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN): + return None + else: + raise + else: + return conn, addr + + def send(self, data): + try: + result = self.socket.send(data) + return result + except socket.error as why: + if why.args[0] == EWOULDBLOCK: + return 0 + elif why.args[0] in _DISCONNECTED: + self.handle_close() + return 0 + else: + raise + + def recv(self, buffer_size): + try: + data = self.socket.recv(buffer_size) + if not data: + # a closed connection is indicated by signaling + # a read condition, and having recv() return 0. + self.handle_close() + return b'' + else: + return data + except socket.error as why: + # winsock sometimes raises ENOTCONN + if why.args[0] in _DISCONNECTED: + self.handle_close() + return b'' + else: + raise + + def close(self): + self.connected = False + self.accepting = False + self.connecting = False + self.del_channel() + try: + self.socket.close() + except socket.error as why: + if why.args[0] not in (ENOTCONN, EBADF): + raise + + # cheap inheritance, used to pass all other attribute + # references to the underlying socket object. + def __getattr__(self, attr): + try: + retattr = getattr(self.socket, attr) + except AttributeError: + raise AttributeError("%s instance has no attribute '%s'" + %(self.__class__.__name__, attr)) + else: + msg = "%(me)s.%(attr)s is deprecated; use %(me)s.socket.%(attr)s " \ + "instead" % {'me' : self.__class__.__name__, 'attr' : attr} + warnings.warn(msg, DeprecationWarning, stacklevel=2) + return retattr + + # log and log_info may be overridden to provide more sophisticated + # logging and warning methods. In general, log is for 'hit' logging + # and 'log_info' is for informational, warning and error logging. + + def log(self, message): + sys.stderr.write('log: %s\n' % str(message)) + + def log_info(self, message, type='info'): + if type not in self.ignore_log_types: + print('%s: %s' % (type, message)) + + def handle_read_event(self): + if self.accepting: + # accepting sockets are never connected, they "spawn" new + # sockets that are connected + self.handle_accept() + elif not self.connected: + if self.connecting: + self.handle_connect_event() + self.handle_read() + else: + self.handle_read() + + def handle_connect_event(self): + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if err != 0: + raise socket.error(err, _strerror(err)) + self.handle_connect() + self.connected = True + self.connecting = False + + def handle_write_event(self): + if self.accepting: + # Accepting sockets shouldn't get a write event. + # We will pretend it didn't happen. + return + + if not self.connected: + if self.connecting: + self.handle_connect_event() + self.handle_write() + + def handle_expt_event(self): + # handle_expt_event() is called if there might be an error on the + # socket, or if there is OOB data + # check for the error condition first + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if err != 0: + # we can get here when select.select() says that there is an + # exceptional condition on the socket + # since there is an error, we'll go ahead and close the socket + # like we would in a subclassed handle_read() that received no + # data + self.handle_close() + else: + self.handle_expt() + + def handle_error(self): + nil, t, v, tbinfo = compact_traceback() + + # sometimes a user repr method will crash. + try: + self_repr = repr(self) + except: + self_repr = '<__repr__(self) failed for object at %0x>' % id(self) + + self.log_info( + 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( + self_repr, + t, + v, + tbinfo + ), + 'error' + ) + self.handle_close() + + def handle_expt(self): + self.log_info('unhandled incoming priority event', 'warning') + + def handle_read(self): + self.log_info('unhandled read event', 'warning') + + def handle_write(self): + self.log_info('unhandled write event', 'warning') + + def handle_connect(self): + self.log_info('unhandled connect event', 'warning') + + def handle_accept(self): + pair = self.accept() + if pair is not None: + self.handle_accepted(*pair) + + def handle_accepted(self, sock, addr): + sock.close() + self.log_info('unhandled accepted event', 'warning') + + def handle_close(self): + self.log_info('unhandled close event', 'warning') + self.close() + +# --------------------------------------------------------------------------- +# adds simple buffered output capability, useful for simple clients. +# [for more sophisticated usage use asynchat.async_chat] +# --------------------------------------------------------------------------- + +class dispatcher_with_send(dispatcher): + + def __init__(self, sock=None, map=None): + dispatcher.__init__(self, sock, map) + self.out_buffer = b'' + + def initiate_send(self): + num_sent = 0 + num_sent = dispatcher.send(self, self.out_buffer[:512]) + self.out_buffer = self.out_buffer[num_sent:] + + def handle_write(self): + self.initiate_send() + + def writable(self): + return (not self.connected) or len(self.out_buffer) + + def send(self, data): + if self.debug: + self.log_info('sending %s' % repr(data)) + self.out_buffer = self.out_buffer + data + self.initiate_send() + +# --------------------------------------------------------------------------- +# used for debugging. +# --------------------------------------------------------------------------- + +def compact_traceback(): + t, v, tb = sys.exc_info() + tbinfo = [] + if not tb: # Must have a traceback + raise AssertionError("traceback does not exist") + while tb: + tbinfo.append(( + tb.tb_frame.f_code.co_filename, + tb.tb_frame.f_code.co_name, + str(tb.tb_lineno) + )) + tb = tb.tb_next + + # just to be safe + del tb + + file, function, line = tbinfo[-1] + info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) + return (file, function, line), t, v, info + +def close_all(map=None, ignore_all=False): + if map is None: + map = socket_map + for x in list(map.values()): + try: + x.close() + except OSError as x: + if x.args[0] == EBADF: + pass + elif not ignore_all: + raise + except _reraised_exceptions: + raise + except: + if not ignore_all: + raise + map.clear() + +# Asynchronous File I/O: +# +# After a little research (reading man pages on various unixen, and +# digging through the linux kernel), I've determined that select() +# isn't meant for doing asynchronous file i/o. +# Heartening, though - reading linux/mm/filemap.c shows that linux +# supports asynchronous read-ahead. So _MOST_ of the time, the data +# will be sitting in memory for us already when we go to read it. +# +# What other OS's (besides NT) support async file i/o? [VMS?] +# +# Regardless, this is useful for pipes, and stdin/stdout... + +if os.name == 'posix': + import fcntl + + class file_wrapper: + # Here we override just enough to make a file + # look like a socket for the purposes of asyncore. + # The passed fd is automatically os.dup()'d + + def __init__(self, fd): + self.fd = os.dup(fd) + + def recv(self, *args): + return os.read(self.fd, *args) + + def send(self, *args): + return os.write(self.fd, *args) + + def getsockopt(self, level, optname, buflen=None): + if (level == socket.SOL_SOCKET and + optname == socket.SO_ERROR and + not buflen): + return 0 + raise NotImplementedError("Only asyncore specific behaviour " + "implemented.") + + read = recv + write = send + + def close(self): + os.close(self.fd) + + def fileno(self): + return self.fd + + class file_dispatcher(dispatcher): + + def __init__(self, fd, map=None): + dispatcher.__init__(self, None, map) + self.connected = True + try: + fd = fd.fileno() + except AttributeError: + pass + self.set_file(fd) + # set it to non-blocking mode + flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + + def set_file(self, fd): + self.socket = file_wrapper(fd) + self._fileno = self.socket.fileno() + self.add_channel() diff --git a/src/network/http-old.py b/src/network/http-old.py new file mode 100644 index 00000000..56d24915 --- /dev/null +++ b/src/network/http-old.py @@ -0,0 +1,49 @@ +import asyncore +import socket +import time + +requestCount = 0 +parallel = 50 +duration = 60 + + +class HTTPClient(asyncore.dispatcher): + port = 12345 + + def __init__(self, host, path, connect=True): + if not hasattr(self, '_map'): + asyncore.dispatcher.__init__(self) + if connect: + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect((host, HTTPClient.port)) + self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % path + + def handle_close(self): + global requestCount + requestCount += 1 + self.close() + + def handle_read(self): +# print self.recv(8192) + self.recv(8192) + + def writable(self): + return (len(self.buffer) > 0) + + def handle_write(self): + sent = self.send(self.buffer) + self.buffer = self.buffer[sent:] + +if __name__ == "__main__": + # initial fill + for i in range(parallel): + HTTPClient('127.0.0.1', '/') + start = time.time() + while (time.time() - start < duration): + if (len(asyncore.socket_map) < parallel): + for i in range(parallel - len(asyncore.socket_map)): + HTTPClient('127.0.0.1', '/') + print "Active connections: %i" % (len(asyncore.socket_map)) + asyncore.loop(count=len(asyncore.socket_map)/2) + if requestCount % 100 == 0: + print "Processed %i total messages" % (requestCount) diff --git a/src/network/http.py b/src/network/http.py index 56d24915..184213b0 100644 --- a/src/network/http.py +++ b/src/network/http.py @@ -1,49 +1,86 @@ -import asyncore import socket -import time -requestCount = 0 -parallel = 50 -duration = 60 +from advanceddispatcher import AdvancedDispatcher +import asyncore_pollchoose as asyncore +from proxy import Proxy, ProxyError, GeneralProxyError +from socks5 import Socks5Connection, Socks5Resolver, Socks5AuthError, Socks5Error +from socks4a import Socks4aConnection, Socks4aResolver, Socks4aError + +class HttpError(ProxyError): pass -class HTTPClient(asyncore.dispatcher): - port = 12345 +class HttpConnection(AdvancedDispatcher): + def __init__(self, host, path="/"): + AdvancedDispatcher.__init__(self) + self.path = path + self.destination = (host, 80) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(self.destination) + print "connecting in background to %s:%i" % (self.destination[0], self.destination[1]) - def __init__(self, host, path, connect=True): - if not hasattr(self, '_map'): - asyncore.dispatcher.__init__(self) - if connect: - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect((host, HTTPClient.port)) - self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % path + def state_init(self): + self.write_buf += "GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (self.path, self.destination[0]) + print "\"%s\"" % (self.write_buf) + self.set_state("http_request_sent", 0) + return False - def handle_close(self): - global requestCount - requestCount += 1 - self.close() + def state_http_request_sent(self): + if len(self.read_buf) > 0: + print self.read_buf + self.read_buf = b"" + if not self.connected: + self.set_state("close", 0) + return False - def handle_read(self): -# print self.recv(8192) - self.recv(8192) - def writable(self): - return (len(self.buffer) > 0) +class Socks5HttpConnection(Socks5Connection, HttpConnection): + def __init__(self, host, path="/"): + self.path = path + Socks5Connection.__init__(self, address=(host, 80)) + + def state_socks_handshake_done(self): + HttpConnection.state_init(self) + return False + + +class Socks4aHttpConnection(Socks4aConnection, HttpConnection): + def __init__(self, host, path="/"): + Socks4aConnection.__init__(self, address=(host, 80)) + self.path = path + + def state_socks_handshake_done(self): + HttpConnection.state_init(self) + return False - def handle_write(self): - sent = self.send(self.buffer) - self.buffer = self.buffer[sent:] if __name__ == "__main__": # initial fill - for i in range(parallel): - HTTPClient('127.0.0.1', '/') - start = time.time() - while (time.time() - start < duration): - if (len(asyncore.socket_map) < parallel): - for i in range(parallel - len(asyncore.socket_map)): - HTTPClient('127.0.0.1', '/') - print "Active connections: %i" % (len(asyncore.socket_map)) - asyncore.loop(count=len(asyncore.socket_map)/2) - if requestCount % 100 == 0: - print "Processed %i total messages" % (requestCount) + + for host in ("bootstrap8080.bitmessage.org", "bootstrap8444.bitmessage.org"): + proxy = Socks5Resolver(host=host) + while len(asyncore.socket_map) > 0: + print "loop %s, len %i" % (proxy.state, len(asyncore.socket_map)) + asyncore.loop(timeout=1, count=1) + proxy.resolved() + + proxy = Socks4aResolver(host=host) + while len(asyncore.socket_map) > 0: + print "loop %s, len %i" % (proxy.state, len(asyncore.socket_map)) + asyncore.loop(timeout=1, count=1) + proxy.resolved() + + for host in ("bitmessage.org",): + direct = HttpConnection(host) + while len(asyncore.socket_map) > 0: +# print "loop, state = %s" % (direct.state) + asyncore.loop(timeout=1, count=1) + + proxy = Socks5HttpConnection(host) + while len(asyncore.socket_map) > 0: +# print "loop, state = %s" % (proxy.state) + asyncore.loop(timeout=1, count=1) + + proxy = Socks4aHttpConnection(host) + while len(asyncore.socket_map) > 0: +# print "loop, state = %s" % (proxy.state) + asyncore.loop(timeout=1, count=1) diff --git a/src/network/proxy.py b/src/network/proxy.py index d9830431..e3b5acee 100644 --- a/src/network/proxy.py +++ b/src/network/proxy.py @@ -1,15 +1,15 @@ -# SOCKS5 only - -import asyncore import socket -import struct from advanceddispatcher import AdvancedDispatcher +import asyncore_pollchoose as asyncore + +class ProxyError(Exception): pass +class GeneralProxyError(ProxyError): pass class Proxy(AdvancedDispatcher): # these are global, and if you change config during runtime, all active/new # instances should change too - _proxy = ["", 1080] + _proxy = ("127.0.0.1", 9050) _auth = None _remote_dns = True @@ -19,8 +19,8 @@ class Proxy(AdvancedDispatcher): @proxy.setter def proxy(self, address): - if (not type(address) in (list,tuple)) or (len(address) < 2) or (type(address[0]) != type('')) or (type(address[1]) != int): - raise + if type(address) != tuple or (len(address) < 2) or (type(str(address[0])) != type('')) or (type(address[1]) != int): + raise ValueError self.__class__._proxy = address @property @@ -31,160 +31,11 @@ class Proxy(AdvancedDispatcher): def auth(self, authTuple): self.__class__._auth = authTuple - def __init__(self, address=None): - if (not type(address) in (list,tuple)) or (len(address) < 2) or (type(address[0]) != type('')) or (type(address[1]) != int): - raise - AdvancedDispatcher.__init__(self, self.sock) + def __init__(self, address): + if type(address) != tuple or (len(address) < 2) or (type(str(address[0])) != type('')) or (type(address[1]) != int): + raise ValueError + AdvancedDispatcher.__init__(self) self.destination = address self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.sslSocket.setblocking(0) self.connect(self.proxy) - - -class SOCKS5(Proxy): - def __init__(self, address=None, sock=None): - Proxy.__init__(self, address) - self.state = "init" - - def handle_connect(self): - self.process() - - def state_init(self): - if self._auth: - self.write_buf += struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02) - else: - self.write_buf += struct.pack('BBB', 0x05, 0x01, 0x00) - self.set_state("auth_1", 0) - - def state_auth_1(self): - if not self.read_buf_sufficient(2): - return False - ret = struct.unpack('BB', self.read_buf) - self.read_buf = self.read_buf[2:] - if ret[0] != 5: - # general error - raise - elif ret[1] == 0: - # no auth required - self.set_state("auth_done", 2) - elif ret[1] == 2: - # username/password - self.write_buf += struct.pack('BB', 1, len(self._auth[0])) + \ - self._auth[0] + struct.pack('B', len(self._auth[1])) + \ - self._auth[1] - self.set_state("auth_1", 2) - else: - if ret[1] == 0xff: - # auth error - raise - else: - # other error - raise - - def state_auth_needed(self): - if not self.read_buf_sufficient(2): - return False - ret = struct.unpack('BB', self.read_buf) - if ret[0] != 1: - # general error - raise - if ret[1] != 0: - # auth error - raise - # all ok - self.set_state = ("auth_done", 2) - - def state_pre_connect(self): - if not self.read_buf_sufficient(4): - return False - # Get the response - if self.read_buf[0:1] != chr(0x05).encode(): - # general error - self.close() - raise - elif self.read_buf[1:2] != chr(0x00).encode(): - # Connection failed - self.close() - if ord(self.read_buf[1:2])<=8: - # socks 5 erro - raise - #raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])])) - else: - raise - #raise Socks5Error((9, _socks5errors[9])) - # Get the bound address/port - elif self.read_buf[3:4] == chr(0x01).encode(): - self.set_state("proxy_addr_1", 4) - elif self.read_buf[3:4] == chr(0x03).encode(): - self.set_state("proxy_addr_2_1", 4) - else: - self.close() - raise GeneralProxyError((1,_generalerrors[1])) - - def state_proxy_addr_1(self): - if not self.read_buf_sufficient(4): - return False - self.boundaddr = self.read_buf[0:4] - self.set_state("proxy_port", 4) - - def state_proxy_addr_2_1(self): - if not self.read_buf_sufficient(1): - return False - self.address_length = ord(self.read_buf[0:1]) - self.set_state("proxy_addr_2_2", 1) - - def state_proxy_addr_2_2(self): - if not self.read_buf_sufficient(self.address_length): - return False - self.boundaddr = read_buf - self.set_state("proxy_port", self.address_length) - - def state_proxy_port(self): - if not self.read_buf_sufficient(2): - return False - self.boundport = struct.unpack(">H", self.read_buf[0:2])[0] - self.__proxysockname = (self.boundaddr, self.boundport) - if self.ipaddr != None: - self.__proxypeername = (socket.inet_ntoa(self.ipaddr), self.destination[1]) - else: - self.__proxypeername = (self.destination[1], destport) - - -class SOCKS5Connection(SOCKS5): - def __init__(self, address): - SOCKS5.__init__(self, address) - - def state_auth_done(self): - # Now we can request the actual connection - self.write_buf += struct.pack('BBB', 0x05, 0x01, 0x00) - # If the given destination address is an IP address, we'll - # use the IPv4 address request even if remote resolving was specified. - try: - self.ipaddr = socket.inet_aton(self.destination[0]) - self.write_buf += chr(0x01).encode() + ipaddr - except socket.error: - # Well it's not an IP number, so it's probably a DNS name. - if Proxy._remote_dns: - # Resolve remotely - self.ipaddr = None - self.write_buf += chr(0x03).encode() + chr(len(self.destination[0])).encode() + self.destination[0] - else: - # Resolve locally - self.ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0])) - self.write_buf += chr(0x01).encode() + ipaddr - self.write_buf += struct.pack(">H", self.destination[1]) - self.set_state = ("pre_connect", 0) - - -class SOCKS5Resolver(SOCKS5): - def __init__(self, host): - self.host = host - self.port = 8444 - SOCKS5.__init__(self, [self.host, self.port]) - - def state_auth_done(self): - # Now we can request the actual connection - self.write_buf += struct.pack('BBB', 0x05, 0xF0, 0x00) - self.write_buf += chr(0x03).encode() + chr(len(self.host)).encode() + self.host - self.write_buf += struct.pack(">H", self.port) - self.state = "pre_connect" + print "connecting in background to %s:%i" % (self.proxy[0], self.proxy[1]) diff --git a/src/network/socks4a.py b/src/network/socks4a.py new file mode 100644 index 00000000..091e09a5 --- /dev/null +++ b/src/network/socks4a.py @@ -0,0 +1,104 @@ +import socket +import struct + +from advanceddispatcher import AdvancedDispatcher +import asyncore_pollchoose as asyncore +from proxy import Proxy, ProxyError, GeneralProxyError + +class Socks4aError(ProxyError): pass + + +class Socks4a(Proxy): + def __init__(self, address=None): + Proxy.__init__(self, address) + self.ipaddr = None + self.destport = address[1] + + def state_init(self): + self.set_state("auth_done", 0) + + def state_pre_connect(self): + if not self.read_buf_sufficient(8): + return False + # Get the response + if self.read_buf[0:1] != chr(0x00).encode(): + # bad data + self.close() + raise Socks4aError + elif self.read_buf[1:2] != chr(0x5A).encode(): + # Connection failed + self.close() + if ord(self.read_buf[1:2]) in (91, 92, 93): + # socks 4 erro + raise Socks4aError + #raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])-90])) + else: + raise Socks4aError + #raise Socks4aError((94, _socks4aerrors[4])) + # Get the bound address/port + self.boundport = struct.unpack(">H", self.read_buf[2:4])[0] + self.boundaddr = self.read_buf[4:] + self.__proxysockname = (self.boundaddr, self.boundport) + if self.ipaddr != None: + self.__proxypeername = (socket.inet_ntoa(self.ipaddr), self.destination[1]) + else: + self.__proxypeername = (self.destination[0], self.destport) + self.set_state("socks_handshake_done", 8) + + def proxy_sock_name(self): + return socket.inet_ntoa(self.__proxysockname[0]) + + def state_socks_handshake_done(self): + return False + + +class Socks4aConnection(Socks4a): + def __init__(self, address): + Socks4a.__init__(self, address=address) + + def state_auth_done(self): + # Now we can request the actual connection + rmtrslv = False + self.write_buf += struct.pack('>BBH', 0x04, 0x01, self.destination[1]) + # If the given destination address is an IP address, we'll + # use the IPv4 address request even if remote resolving was specified. + try: + self.ipaddr = socket.inet_aton(self.destination[0]) + self.write_buf += ipaddr + except socket.error: + # Well it's not an IP number, so it's probably a DNS name. + if Proxy._remote_dns: + # Resolve remotely + rmtrslv = True + self.ipaddr = None + self.write_buf += struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01) + else: + # Resolve locally + self.ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0])) + self.write_buf += self.ipaddr + if self._auth: + self.write_buf += self._auth[0] + self.write_buf += chr(0x00).encode() + if rmtrslv: + self.write_buf += self.destination[0] + chr(0x00).encode() + self.set_state("pre_connect", 0) + + +class Socks4aResolver(Socks4a): + def __init__(self, host): + self.host = host + self.port = 8444 + Socks4a.__init__(self, address=(self.host, self.port)) + + def state_auth_done(self): + # Now we can request the actual connection + self.write_buf += struct.pack('>BBH', 0x04, 0xF0, self.destination[1]) + self.write_buf += struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01) + if self._auth: + self.write_buf += self._auth[0] + self.write_buf += chr(0x00).encode() + self.write_buf += self.host + chr(0x00).encode() + self.set_state("pre_connect", 0) + + def resolved(self): + print "Resolved %s as %s" % (self.host, self.proxy_sock_name()) diff --git a/src/network/socks5.py b/src/network/socks5.py new file mode 100644 index 00000000..5f3164ca --- /dev/null +++ b/src/network/socks5.py @@ -0,0 +1,170 @@ +import socket +import struct + +from advanceddispatcher import AdvancedDispatcher +import asyncore_pollchoose as asyncore +from proxy import Proxy, ProxyError, GeneralProxyError + +class Socks5AuthError(ProxyError): pass +class Socks5Error(ProxyError): pass + + +class Socks5(Proxy): + def __init__(self, address=None): + Proxy.__init__(self, address) + self.ipaddr = None + self.destport = address[1] + + def handle_connect(self): + self.process() + + def state_init(self): + if self._auth: + self.write_buf += struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02) + else: + self.write_buf += struct.pack('BBB', 0x05, 0x01, 0x00) + self.set_state("auth_1", 0) + + def state_auth_1(self): + if not self.read_buf_sufficient(2): + return False + ret = struct.unpack('BB', self.read_buf) + self.read_buf = self.read_buf[2:] + if ret[0] != 5: + # general error + raise GeneralProxyError + elif ret[1] == 0: + # no auth required + self.set_state("auth_done", 2) + elif ret[1] == 2: + # username/password + self.write_buf += struct.pack('BB', 1, len(self._auth[0])) + \ + self._auth[0] + struct.pack('B', len(self._auth[1])) + \ + self._auth[1] + self.set_state("auth_1", 2) + else: + if ret[1] == 0xff: + # auth error + raise Socks5AuthError + else: + # other error + raise Socks5Error + + def state_auth_needed(self): + if not self.read_buf_sufficient(2): + return False + ret = struct.unpack('BB', self.read_buf) + if ret[0] != 1: + # general error + raise Socks5Error + if ret[1] != 0: + # auth error + raise Socks5AuthError + # all ok + self.set_state = ("auth_done", 2) + + def state_pre_connect(self): + if not self.read_buf_sufficient(4): + return False + # Get the response + if self.read_buf[0:1] != chr(0x05).encode(): + # general error + self.close() + raise Socks5Error + elif self.read_buf[1:2] != chr(0x00).encode(): + # Connection failed + self.close() + if ord(self.read_buf[1:2])<=8: + # socks 5 erro + raise Socks5Error + #raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])])) + else: + raise Socks5Error + #raise Socks5Error((9, _socks5errors[9])) + # Get the bound address/port + elif self.read_buf[3:4] == chr(0x01).encode(): + self.set_state("proxy_addr_1", 4) + elif self.read_buf[3:4] == chr(0x03).encode(): + self.set_state("proxy_addr_2_1", 4) + else: + self.close() + #raise GeneralProxyError((1,_generalerrors[1])) + raise GeneralProxyError + + def state_proxy_addr_1(self): + if not self.read_buf_sufficient(4): + return False + self.boundaddr = self.read_buf[0:4] + self.set_state("proxy_port", 4) + + def state_proxy_addr_2_1(self): + if not self.read_buf_sufficient(1): + return False + self.address_length = ord(self.read_buf[0:1]) + self.set_state("proxy_addr_2_2", 1) + + def state_proxy_addr_2_2(self): + if not self.read_buf_sufficient(self.address_length): + return False + self.boundaddr = read_buf + self.set_state("proxy_port", self.address_length) + + def state_proxy_port(self): + if not self.read_buf_sufficient(2): + return False + self.boundport = struct.unpack(">H", self.read_buf[0:2])[0] + self.__proxysockname = (self.boundaddr, self.boundport) + if self.ipaddr != None: + self.__proxypeername = (socket.inet_ntoa(self.ipaddr), self.destination[1]) + else: + self.__proxypeername = (self.destination[0], self.destport) + self.set_state("socks_handshake_done", 2) + + def proxy_sock_name(self): + return socket.inet_ntoa(self.__proxysockname[0]) + + def state_socks_handshake_done(self): + return False + + +class Socks5Connection(Socks5): + def __init__(self, address): + Socks5.__init__(self, address=address) + + def state_auth_done(self): + # Now we can request the actual connection + self.write_buf += struct.pack('BBB', 0x05, 0x01, 0x00) + # If the given destination address is an IP address, we'll + # use the IPv4 address request even if remote resolving was specified. + try: + self.ipaddr = socket.inet_aton(self.destination[0]) + self.write_buf += chr(0x01).encode() + self.ipaddr + except socket.error: + # Well it's not an IP number, so it's probably a DNS name. + if Proxy._remote_dns: + # Resolve remotely + self.ipaddr = None + self.write_buf += chr(0x03).encode() + chr(len(self.destination[0])).encode() + self.destination[0] + else: + # Resolve locally + self.ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0])) + self.write_buf += chr(0x01).encode() + self.ipaddr + self.write_buf += struct.pack(">H", self.destination[1]) + self.set_state("pre_connect", 0) + + +class Socks5Resolver(Socks5): + def __init__(self, host): + self.host = host + self.port = 8444 + Socks5.__init__(self, address=(self.host, self.port)) + + def state_auth_done(self): + # Now we can request the actual connection + self.write_buf += struct.pack('BBB', 0x05, 0xF0, 0x00) + self.write_buf += chr(0x03).encode() + chr(len(self.host)).encode() + str(self.host) + self.write_buf += struct.pack(">H", self.port) + self.set_state("pre_connect", 0) + + def resolved(self): + print "Resolved %s as %s" % (self.host, self.proxy_sock_name())