diff --git a/src/network/bmproto.py b/src/network/bmproto.py index b35f3997..c8efe91e 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -27,18 +27,22 @@ from randomtrackingdict import RandomTrackingDict class BMProtoError(ProxyError): + """A Bitmessage Protocol Base Error""" errorCodes = ("Protocol error") class BMProtoInsufficientDataError(BMProtoError): + """A Bitmessage Protocol Insufficient Data Error""" errorCodes = ("Insufficient data") class BMProtoExcessiveDataError(BMProtoError): + """A Bitmessage Protocol Excessive Data Error""" errorCodes = ("Too much data") class BMProto(AdvancedDispatcher, ObjectTracker): + """A parser for the Bitmessage Protocol""" # ~1.6 MB which is the maximum possible size of an inv message. maxMessageSize = 1600100 # 2**18 = 256kB is the maximum size of an object payload @@ -61,6 +65,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.pendingUpload = RandomTrackingDict() def bm_proto_reset(self): + """Reset the bitmessage object parser""" self.magic = None self.command = None self.payloadLength = 0 @@ -72,6 +77,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.object = None def state_bm_header(self): + """Process incoming header""" self.magic, self.command, self.payloadLength, self.checksum = \ protocol.Header.unpack(self.read_buf[:protocol.Header.size]) self.command = self.command.rstrip('\x00') @@ -92,6 +98,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return True def state_bm_command(self): + """Process incoming command""" self.payload = self.read_buf[:self.payloadLength] if self.checksum != hashlib.sha512(self.payload).digest()[0:4]: logger.debug('Bad checksum, ignoring') @@ -145,16 +152,19 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return True def decode_payload_string(self, length): + """Read and return `length` bytes from payload""" value = self.payload[self.payloadOffset:self.payloadOffset + length] self.payloadOffset += length return value def decode_payload_varint(self): + """Decode a varint from the payload""" value, offset = addresses.decodeVarint(self.payload[self.payloadOffset:]) self.payloadOffset += offset return value def decode_payload_node(self): + """Decode node details from the payload""" # protocol.checkIPAddress() services, host, port = self.decode_payload_content("Q16sH") if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': @@ -172,18 +182,23 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return Node(services, host, port) def decode_payload_content(self, pattern="v"): - # L = varint indicating the length of the next array - # l = varint indicating the length of the next item - # v = varint (or array) - # H = uint16 - # I = uint32 - # Q = uint64 - # i = net_addr (without time and stream number) - # s = string - # 0-9 = length of the next item - # , = end of array + """ + Decode the payload depending on pattern: + + L = varint indicating the length of the next array + l = varint indicating the length of the next item + v = varint (or array) + H = uint16 + I = uint32 + Q = uint64 + i = net_addr (without time and stream number) + s = string + 0-9 = length of the next item + , = end of array + """ def decode_simple(self, char="v"): + """Decode the payload using one char pattern""" if char == "v": return self.decode_payload_varint() if char == "i": @@ -290,6 +305,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): raise BMProtoInsufficientDataError() def bm_command_error(self): + """Decode an error message and log it""" fatalStatus, banTime, inventoryVector, errorText = \ self.decode_payload_content("vvlsls") logger.error( @@ -298,6 +314,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return True def bm_command_getdata(self): + """ + Incoming request for object(s). + If we have them and some other conditions are fulfilled, + append them to the write queue. + """ items = self.decode_payload_content("l32s") # skip? now = time.time() @@ -329,15 +350,15 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return True def bm_command_inv(self): + """Non-dandelion announce""" return self._command_inv(False) def bm_command_dinv(self): - """ - Dandelion stem announce - """ + """Dandelion stem announce""" return self._command_inv(True) def bm_command_object(self): + """Incoming object, process it""" objectOffset = self.payloadOffset nonce, expiresTime, objectType, version, streamNumber = \ self.decode_payload_content("QQIvv") @@ -400,6 +421,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return self.decode_payload_content("LQIQ16sH") def bm_command_addr(self): + """Incoming addresses, process them""" addresses = self._decode_addr() for i in addresses: seenTime, stream, services, ip, port = i @@ -431,18 +453,33 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return True def bm_command_portcheck(self): + """Incoming port check request, queue it.""" portCheckerQueue.put(state.Peer(self.destination, self.peerNode.port)) return True def bm_command_ping(self): + """Incoming ping, respond to it.""" self.append_write_buf(protocol.CreatePacket('pong')) return True def bm_command_pong(self): + """ + Incoming pong. + Ignore it. PyBitmessage pings connections after about 5 minutes + of inactivity, and leaves it to the TCP stack to handle actual + timeouts. So there is no need to do anything when a pong arrives. + """ # nothing really return True def bm_command_verack(self): + """ + Incoming verack. + If already sent my own verack, handshake is complete (except + potentially waiting for buffers to flush), so we can continue + to the main connection phase. If not sent verack yet, + continue processing. + """ self.verackReceived = True if not self.verackSent: return True @@ -452,6 +489,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return False def bm_command_version(self): + """ + Incoming version. + Parse and log, remember important things, like streams, bitfields, etc. + """ (self.remoteProtocolVersion, self.services, self.timestamp, self.sockNode, self.peerNode, self.nonce, self.userAgent, self.streams) = self.decode_payload_content("IQQiiQlsLv") @@ -467,7 +508,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): logger.debug('user agent: %s', self.userAgent) logger.debug('streams: [%s]', ','.join(map(str, self.streams))) if not self.peerValidityChecks(): - # TODO ABORT + # ABORT afterwards return True self.append_write_buf(protocol.CreatePacket('verack')) self.verackSent = True @@ -490,6 +531,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return False def peerValidityChecks(self): + """Check the validity of the peer""" if self.remoteProtocolVersion < 3: self.append_write_buf(protocol.assembleErrorMessage( errorText="Your is using an old protocol. Closing connection.", @@ -569,6 +611,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): @staticmethod def assembleAddr(peerList): + """Build up a packed address""" if isinstance(peerList, state.Peer): peerList = (peerList) if not peerList: @@ -591,6 +634,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): @staticmethod def stopDownloadingObject(hashId, forwardAnyway=False): + """Stop downloading an object""" for connection in ( connectionpool.BMConnectionPool().inboundConnections.values() + connectionpool.BMConnectionPool().outboundConnections.values() @@ -611,6 +655,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): pass def handle_close(self): + """Handle close""" self.set_state("close") if not (self.accepting or self.connecting or self.connected): # already disconnected