From 46c9ea940324dcd56cd59b7d6257bfbcd11930b7 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Mon, 20 Mar 2017 18:32:26 +0100 Subject: [PATCH] Async network updates (WIP) - cleaner command handling - separating into header and command handling - incoming connection handler - bugfixes and more debug information --- src/bmproto.py | 150 +++++++++++++++++------------ src/network/advanceddispatcher.py | 5 +- src/network/asyncore_pollchoose.py | 2 + src/protocol.py | 2 +- 4 files changed, 93 insertions(+), 66 deletions(-) diff --git a/src/bmproto.py b/src/bmproto.py index 8bd90aff..0a147d3e 100644 --- a/src/bmproto.py +++ b/src/bmproto.py @@ -15,91 +15,94 @@ class BMProtoError(ProxyError): pass class BMConnection(AdvancedDispatcher): - def __init__(self, address): - AdvancedDispatcher.__init__(self) - self.destination = address + # ~1.6 MB which is the maximum possible size of an inv message. + maxMessageSize = 1600100 + + def __init__(self, address=None, sock=None): + AdvancedDispatcher.__init__(self, sock) + self.verackReceived = False + self.verackSent = False + if address is None and sock is not None: + self.destination = self.addr() + self.isOutbound = False + print "received connection in background from %s:%i" % (self.destination[0], self.destination[1]) + else: + self.destination = address + self.isOutbound = True + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(self.destination) + print "connecting in background to %s:%i" % (self.destination[0], self.destination[1]) + + def bm_proto_reset(self): + self.magic = None + self.command = None + self.payloadLength = None + self.checksum = None self.payload = None - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect(self.destination) - print "connecting in background to %s:%i" % (self.destination[0], self.destination[1]) - - def bm_proto_len_sufficient(self): - if len(self.read_buf) < protocol.Header.size: - print "Length below header size" - return False - if not self.payload: - self.magic, self.command, self.payloadLength, self.checksum = protocol.Header.unpack(self.read_buf[:protocol.Header.size]) - self.command = self.command.rstrip('\x00') - if self.payloadLength > 1600100: # ~1.6 MB which is the maximum possible size of an inv message. - return False - if len(self.read_buf) < self.payloadLength + protocol.Header.size: - print "Length below announced object length" - return False - self.payload = self.read_buf[protocol.Header.size:self.payloadLength + protocol.Header.size] - return True - - def bm_check_command(self): - if self.magic != 0xE9BEB4D9: - self.set_state("crap", protocol.Header.size) - print "Bad magic" - self.payload = None - return False - if self.checksum != hashlib.sha512(self.payload).digest()[0:4]: - self.set_state("crap", protocol.Header.size) - self.payload = None - print "Bad checksum" - return False - print "received %s (%ib)" % (self.command, self.payloadLength) - return True + self.invalid = False def state_init(self): + self.bm_proto_reset() self.write_buf += protocol.assembleVersionMessage(self.destination[0], self.destination[1], (1,), False) if True: print "Sending version (%ib)" % len(self.write_buf) - self.set_state("bm_reccommand", 0) + self.set_state("bm_header", 0) return False - def state_bm_reccommand(self): - if not self.bm_proto_len_sufficient(): + def state_bm_header(self): + if len(self.read_buf) < protocol.Header.size: + print "Length below header size" return False - if not self.bm_check_command(): + self.magic, self.command, self.payloadLength, self.checksum = protocol.Header.unpack(self.read_buf[:protocol.Header.size]) + self.command = self.command.rstrip('\x00') + if self.magic != 0xE9BEB4D9: + # skip 1 byte in order to sync + self.bm_proto_reset() + self.set_state("bm_header", 1) + print "Bad magic" + if self.payloadLength > BMConnection.maxMessageSize: + self.invalid = True + self.set_state("bm_command", protocol.Header.size) + return True + + def state_bm_command(self): + if len(self.read_buf) < self.payloadLength: + print "Length below announced object length" return False - if self.command == "version": - self.bm_recversion() - elif self.command == "verack": - self.bm_recverack() + print "received %s (%ib)" % (self.command, self.payloadLength) + self.payload = self.read_buf[:self.payloadLength] + if self.checksum != hashlib.sha512(self.payload).digest()[0:4]: + print "Bad checksum, ignoring" + self.invalid = True + if not self.invalid: + try: + getattr(self, "bm_command_" + str(self.command))() + except AttributeError: + # unimplemented command + print "unimplemented command %s" % (self.command) else: - print "unimplemented command %s" % (self.command) - self.set_state("bm_reccommand", protocol.Header.size + self.payloadLength) - self.payload = None - print "buffer length = %i" % (len(self.read_buf)) - return False + print "Skipping command %s due to invalid data" % (self.command) + self.set_state("bm_header", self.payloadLength) + self.bm_proto_reset() + return True - def bm_recverack(self): + def bm_command_verack(self): self.verackReceived = True - return False + return True - def bm_recversion(self): - self.remoteProtocolVersion, self.services, timestamp, padding1, self.myExternalIP, padding2, self.remoteNodeIncomingPort = protocol.VersionPacket.unpack(self.payload[:82]) + def bm_command_version(self): + self.remoteProtocolVersion, self.services, self.timestamp, padding1, self.myExternalIP, padding2, self.remoteNodeIncomingPort = protocol.VersionPacket.unpack(self.payload[:protocol.VersionPacket.size]) print "remoteProtocolVersion: %i" % (self.remoteProtocolVersion) print "services: %08X" % (self.services) - print "time offset: %i" % (timestamp - int(time.time())) + print "time offset: %i" % (self.timestamp - int(time.time())) print "my external IP: %s" % (socket.inet_ntoa(self.myExternalIP)) print "remote node incoming port: %i" % (self.remoteNodeIncomingPort) - useragentLength, lengthofUseragentVarint = addresses.decodeVarint(self.payload[80:84]) + useragentLength, lengthOfUseragentVarint = addresses.decodeVarint(self.payload[80:84]) readPosition = 80 + lengthOfUseragentVarint self.userAgent = self.payload[readPosition:readPosition + useragentLength] readPosition += useragentLength print "user agent: %s" % (self.userAgent) - return False - - def state_http_request_sent(self): - if len(self.read_buf) > 0: - print self.read_buf - self.read_buf = b"" - if not self.connected: - self.set_state("close", 0) - return False + return True class Socks5BMConnection(Socks5Connection, BMConnection): @@ -120,6 +123,27 @@ class Socks4aBMConnection(Socks4aConnection, BMConnection): return False +class BMServer(AdvancedDispatcher): + port = 8444 + + def __init__(self, port=None): + if not hasattr(self, '_map'): + AdvancedDispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + if port is None: + port = BMServer.port + self.bind(('127.0.0.1', port)) + self.connections = 0 + self.listen(5) + + def handle_accept(self): + pair = self.accept() + if pair is not None: + sock, addr = pair + BMConnection(sock=sock) + + if __name__ == "__main__": # initial fill diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index c672f9f9..df6e58ef 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -3,9 +3,9 @@ import asyncore_pollchoose as asyncore class AdvancedDispatcher(asyncore.dispatcher): _buf_len = 2097152 # 2MB - def __init__(self): + def __init__(self, sock=None): if not hasattr(self, '_map'): - asyncore.dispatcher.__init__(self) + asyncore.dispatcher.__init__(self, sock) self.read_buf = b"" self.write_buf = b"" self.state = "init" @@ -27,6 +27,7 @@ class AdvancedDispatcher(asyncore.dispatcher): return while True: try: + print "Trying to handle state \"%s\"" % (self.state) if getattr(self, "state_" + str(self.state))() is False: break except AttributeError: diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index 9f231e0e..7fa19f4a 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -287,6 +287,8 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, elif hasattr(select, 'select'): poller = select_poller + print "Poll loop using %s" % (poller.__name__) + if count is None: while map: poller(timeout, map) diff --git a/src/protocol.py b/src/protocol.py index a889ab93..9698f917 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -34,7 +34,7 @@ eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack( #New code should use CreatePacket instead of Header.pack Header = Struct('!L12sL4s') -VersionPacket = Struct('>LqQ20sI36sH') +VersionPacket = Struct('>LqQ20s4s36sH') # Bitfield