Async network updates (WIP)

- cleaner command handling
- separating into header and command handling
- incoming connection handler
- bugfixes and more debug information
This commit is contained in:
Peter Šurda 2017-03-20 18:32:26 +01:00
parent 913b401dd0
commit 46c9ea9403
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
4 changed files with 93 additions and 66 deletions

View File

@ -15,91 +15,94 @@ class BMProtoError(ProxyError): pass
class BMConnection(AdvancedDispatcher): class BMConnection(AdvancedDispatcher):
def __init__(self, address): # ~1.6 MB which is the maximum possible size of an inv message.
AdvancedDispatcher.__init__(self) maxMessageSize = 1600100
def __init__(self, address=None, sock=None):
AdvancedDispatcher.__init__(self, sock)
self.verackReceived = False
self.verackSent = False
if address is None and sock is not None:
self.destination = self.addr()
self.isOutbound = False
print "received connection in background from %s:%i" % (self.destination[0], self.destination[1])
else:
self.destination = address self.destination = address
self.payload = None self.isOutbound = True
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(self.destination) self.connect(self.destination)
print "connecting in background to %s:%i" % (self.destination[0], self.destination[1]) print "connecting in background to %s:%i" % (self.destination[0], self.destination[1])
def bm_proto_len_sufficient(self): def bm_proto_reset(self):
if len(self.read_buf) < protocol.Header.size: self.magic = None
print "Length below header size" self.command = None
return False self.payloadLength = None
if not self.payload: self.checksum = None
self.magic, self.command, self.payloadLength, self.checksum = protocol.Header.unpack(self.read_buf[:protocol.Header.size])
self.command = self.command.rstrip('\x00')
if self.payloadLength > 1600100: # ~1.6 MB which is the maximum possible size of an inv message.
return False
if len(self.read_buf) < self.payloadLength + protocol.Header.size:
print "Length below announced object length"
return False
self.payload = self.read_buf[protocol.Header.size:self.payloadLength + protocol.Header.size]
return True
def bm_check_command(self):
if self.magic != 0xE9BEB4D9:
self.set_state("crap", protocol.Header.size)
print "Bad magic"
self.payload = None self.payload = None
return False self.invalid = False
if self.checksum != hashlib.sha512(self.payload).digest()[0:4]:
self.set_state("crap", protocol.Header.size)
self.payload = None
print "Bad checksum"
return False
print "received %s (%ib)" % (self.command, self.payloadLength)
return True
def state_init(self): def state_init(self):
self.bm_proto_reset()
self.write_buf += protocol.assembleVersionMessage(self.destination[0], self.destination[1], (1,), False) self.write_buf += protocol.assembleVersionMessage(self.destination[0], self.destination[1], (1,), False)
if True: if True:
print "Sending version (%ib)" % len(self.write_buf) print "Sending version (%ib)" % len(self.write_buf)
self.set_state("bm_reccommand", 0) self.set_state("bm_header", 0)
return False return False
def state_bm_reccommand(self): def state_bm_header(self):
if not self.bm_proto_len_sufficient(): if len(self.read_buf) < protocol.Header.size:
print "Length below header size"
return False return False
if not self.bm_check_command(): self.magic, self.command, self.payloadLength, self.checksum = protocol.Header.unpack(self.read_buf[:protocol.Header.size])
self.command = self.command.rstrip('\x00')
if self.magic != 0xE9BEB4D9:
# skip 1 byte in order to sync
self.bm_proto_reset()
self.set_state("bm_header", 1)
print "Bad magic"
if self.payloadLength > BMConnection.maxMessageSize:
self.invalid = True
self.set_state("bm_command", protocol.Header.size)
return True
def state_bm_command(self):
if len(self.read_buf) < self.payloadLength:
print "Length below announced object length"
return False return False
if self.command == "version": print "received %s (%ib)" % (self.command, self.payloadLength)
self.bm_recversion() self.payload = self.read_buf[:self.payloadLength]
elif self.command == "verack": if self.checksum != hashlib.sha512(self.payload).digest()[0:4]:
self.bm_recverack() print "Bad checksum, ignoring"
else: self.invalid = True
if not self.invalid:
try:
getattr(self, "bm_command_" + str(self.command))()
except AttributeError:
# unimplemented command
print "unimplemented command %s" % (self.command) print "unimplemented command %s" % (self.command)
self.set_state("bm_reccommand", protocol.Header.size + self.payloadLength) else:
self.payload = None print "Skipping command %s due to invalid data" % (self.command)
print "buffer length = %i" % (len(self.read_buf)) self.set_state("bm_header", self.payloadLength)
return False self.bm_proto_reset()
return True
def bm_recverack(self): def bm_command_verack(self):
self.verackReceived = True self.verackReceived = True
return False return True
def bm_recversion(self): def bm_command_version(self):
self.remoteProtocolVersion, self.services, timestamp, padding1, self.myExternalIP, padding2, self.remoteNodeIncomingPort = protocol.VersionPacket.unpack(self.payload[:82]) self.remoteProtocolVersion, self.services, self.timestamp, padding1, self.myExternalIP, padding2, self.remoteNodeIncomingPort = protocol.VersionPacket.unpack(self.payload[:protocol.VersionPacket.size])
print "remoteProtocolVersion: %i" % (self.remoteProtocolVersion) print "remoteProtocolVersion: %i" % (self.remoteProtocolVersion)
print "services: %08X" % (self.services) print "services: %08X" % (self.services)
print "time offset: %i" % (timestamp - int(time.time())) print "time offset: %i" % (self.timestamp - int(time.time()))
print "my external IP: %s" % (socket.inet_ntoa(self.myExternalIP)) print "my external IP: %s" % (socket.inet_ntoa(self.myExternalIP))
print "remote node incoming port: %i" % (self.remoteNodeIncomingPort) print "remote node incoming port: %i" % (self.remoteNodeIncomingPort)
useragentLength, lengthofUseragentVarint = addresses.decodeVarint(self.payload[80:84]) useragentLength, lengthOfUseragentVarint = addresses.decodeVarint(self.payload[80:84])
readPosition = 80 + lengthOfUseragentVarint readPosition = 80 + lengthOfUseragentVarint
self.userAgent = self.payload[readPosition:readPosition + useragentLength] self.userAgent = self.payload[readPosition:readPosition + useragentLength]
readPosition += useragentLength readPosition += useragentLength
print "user agent: %s" % (self.userAgent) print "user agent: %s" % (self.userAgent)
return False return True
def state_http_request_sent(self):
if len(self.read_buf) > 0:
print self.read_buf
self.read_buf = b""
if not self.connected:
self.set_state("close", 0)
return False
class Socks5BMConnection(Socks5Connection, BMConnection): class Socks5BMConnection(Socks5Connection, BMConnection):
@ -120,6 +123,27 @@ class Socks4aBMConnection(Socks4aConnection, BMConnection):
return False return False
class BMServer(AdvancedDispatcher):
port = 8444
def __init__(self, port=None):
if not hasattr(self, '_map'):
AdvancedDispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
if port is None:
port = BMServer.port
self.bind(('127.0.0.1', port))
self.connections = 0
self.listen(5)
def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
BMConnection(sock=sock)
if __name__ == "__main__": if __name__ == "__main__":
# initial fill # initial fill

View File

@ -3,9 +3,9 @@ import asyncore_pollchoose as asyncore
class AdvancedDispatcher(asyncore.dispatcher): class AdvancedDispatcher(asyncore.dispatcher):
_buf_len = 2097152 # 2MB _buf_len = 2097152 # 2MB
def __init__(self): def __init__(self, sock=None):
if not hasattr(self, '_map'): if not hasattr(self, '_map'):
asyncore.dispatcher.__init__(self) asyncore.dispatcher.__init__(self, sock)
self.read_buf = b"" self.read_buf = b""
self.write_buf = b"" self.write_buf = b""
self.state = "init" self.state = "init"
@ -27,6 +27,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
return return
while True: while True:
try: try:
print "Trying to handle state \"%s\"" % (self.state)
if getattr(self, "state_" + str(self.state))() is False: if getattr(self, "state_" + str(self.state))() is False:
break break
except AttributeError: except AttributeError:

View File

@ -287,6 +287,8 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None,
elif hasattr(select, 'select'): elif hasattr(select, 'select'):
poller = select_poller poller = select_poller
print "Poll loop using %s" % (poller.__name__)
if count is None: if count is None:
while map: while map:
poller(timeout, map) poller(timeout, map)

View File

@ -34,7 +34,7 @@ eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack(
#New code should use CreatePacket instead of Header.pack #New code should use CreatePacket instead of Header.pack
Header = Struct('!L12sL4s') Header = Struct('!L12sL4s')
VersionPacket = Struct('>LqQ20sI36sH') VersionPacket = Struct('>LqQ20s4s36sH')
# Bitfield # Bitfield