Docstrings in network.bmproto from #1362

This commit is contained in:
Dmitri Bogomolov 2019-07-10 14:27:42 +03:00
parent bbab0010e6
commit 67d14f9e73
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13
1 changed files with 59 additions and 14 deletions

View File

@ -27,18 +27,22 @@ from randomtrackingdict import RandomTrackingDict
class BMProtoError(ProxyError): class BMProtoError(ProxyError):
"""A Bitmessage Protocol Base Error"""
errorCodes = ("Protocol error") errorCodes = ("Protocol error")
class BMProtoInsufficientDataError(BMProtoError): class BMProtoInsufficientDataError(BMProtoError):
"""A Bitmessage Protocol Insufficient Data Error"""
errorCodes = ("Insufficient data") errorCodes = ("Insufficient data")
class BMProtoExcessiveDataError(BMProtoError): class BMProtoExcessiveDataError(BMProtoError):
"""A Bitmessage Protocol Excessive Data Error"""
errorCodes = ("Too much data") errorCodes = ("Too much data")
class BMProto(AdvancedDispatcher, ObjectTracker): class BMProto(AdvancedDispatcher, ObjectTracker):
"""A parser for the Bitmessage Protocol"""
# ~1.6 MB which is the maximum possible size of an inv message. # ~1.6 MB which is the maximum possible size of an inv message.
maxMessageSize = 1600100 maxMessageSize = 1600100
# 2**18 = 256kB is the maximum size of an object payload # 2**18 = 256kB is the maximum size of an object payload
@ -61,6 +65,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.pendingUpload = RandomTrackingDict() self.pendingUpload = RandomTrackingDict()
def bm_proto_reset(self): def bm_proto_reset(self):
"""Reset the bitmessage object parser"""
self.magic = None self.magic = None
self.command = None self.command = None
self.payloadLength = 0 self.payloadLength = 0
@ -72,6 +77,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.object = None self.object = None
def state_bm_header(self): def state_bm_header(self):
"""Process incoming header"""
self.magic, self.command, self.payloadLength, self.checksum = \ self.magic, self.command, self.payloadLength, self.checksum = \
protocol.Header.unpack(self.read_buf[:protocol.Header.size]) protocol.Header.unpack(self.read_buf[:protocol.Header.size])
self.command = self.command.rstrip('\x00') self.command = self.command.rstrip('\x00')
@ -92,6 +98,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return True return True
def state_bm_command(self): def state_bm_command(self):
"""Process incoming command"""
self.payload = self.read_buf[:self.payloadLength] self.payload = self.read_buf[:self.payloadLength]
if self.checksum != hashlib.sha512(self.payload).digest()[0:4]: if self.checksum != hashlib.sha512(self.payload).digest()[0:4]:
logger.debug('Bad checksum, ignoring') logger.debug('Bad checksum, ignoring')
@ -145,16 +152,19 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return True return True
def decode_payload_string(self, length): def decode_payload_string(self, length):
"""Read and return `length` bytes from payload"""
value = self.payload[self.payloadOffset:self.payloadOffset + length] value = self.payload[self.payloadOffset:self.payloadOffset + length]
self.payloadOffset += length self.payloadOffset += length
return value return value
def decode_payload_varint(self): def decode_payload_varint(self):
"""Decode a varint from the payload"""
value, offset = addresses.decodeVarint(self.payload[self.payloadOffset:]) value, offset = addresses.decodeVarint(self.payload[self.payloadOffset:])
self.payloadOffset += offset self.payloadOffset += offset
return value return value
def decode_payload_node(self): def decode_payload_node(self):
"""Decode node details from the payload"""
# protocol.checkIPAddress() # protocol.checkIPAddress()
services, host, port = self.decode_payload_content("Q16sH") services, host, port = self.decode_payload_content("Q16sH")
if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
@ -172,18 +182,23 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return Node(services, host, port) return Node(services, host, port)
def decode_payload_content(self, pattern="v"): def decode_payload_content(self, pattern="v"):
# L = varint indicating the length of the next array """
# l = varint indicating the length of the next item Decode the payload depending on pattern:
# v = varint (or array)
# H = uint16 L = varint indicating the length of the next array
# I = uint32 l = varint indicating the length of the next item
# Q = uint64 v = varint (or array)
# i = net_addr (without time and stream number) H = uint16
# s = string I = uint32
# 0-9 = length of the next item Q = uint64
# , = end of array i = net_addr (without time and stream number)
s = string
0-9 = length of the next item
, = end of array
"""
def decode_simple(self, char="v"): def decode_simple(self, char="v"):
"""Decode the payload using one char pattern"""
if char == "v": if char == "v":
return self.decode_payload_varint() return self.decode_payload_varint()
if char == "i": if char == "i":
@ -290,6 +305,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
raise BMProtoInsufficientDataError() raise BMProtoInsufficientDataError()
def bm_command_error(self): def bm_command_error(self):
"""Decode an error message and log it"""
fatalStatus, banTime, inventoryVector, errorText = \ fatalStatus, banTime, inventoryVector, errorText = \
self.decode_payload_content("vvlsls") self.decode_payload_content("vvlsls")
logger.error( logger.error(
@ -298,6 +314,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return True return True
def bm_command_getdata(self): def bm_command_getdata(self):
"""
Incoming request for object(s).
If we have them and some other conditions are fulfilled,
append them to the write queue.
"""
items = self.decode_payload_content("l32s") items = self.decode_payload_content("l32s")
# skip? # skip?
now = time.time() now = time.time()
@ -329,15 +350,15 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return True return True
def bm_command_inv(self): def bm_command_inv(self):
"""Non-dandelion announce"""
return self._command_inv(False) return self._command_inv(False)
def bm_command_dinv(self): def bm_command_dinv(self):
""" """Dandelion stem announce"""
Dandelion stem announce
"""
return self._command_inv(True) return self._command_inv(True)
def bm_command_object(self): def bm_command_object(self):
"""Incoming object, process it"""
objectOffset = self.payloadOffset objectOffset = self.payloadOffset
nonce, expiresTime, objectType, version, streamNumber = \ nonce, expiresTime, objectType, version, streamNumber = \
self.decode_payload_content("QQIvv") self.decode_payload_content("QQIvv")
@ -400,6 +421,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return self.decode_payload_content("LQIQ16sH") return self.decode_payload_content("LQIQ16sH")
def bm_command_addr(self): def bm_command_addr(self):
"""Incoming addresses, process them"""
addresses = self._decode_addr() addresses = self._decode_addr()
for i in addresses: for i in addresses:
seenTime, stream, services, ip, port = i seenTime, stream, services, ip, port = i
@ -431,18 +453,33 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return True return True
def bm_command_portcheck(self): def bm_command_portcheck(self):
"""Incoming port check request, queue it."""
portCheckerQueue.put(state.Peer(self.destination, self.peerNode.port)) portCheckerQueue.put(state.Peer(self.destination, self.peerNode.port))
return True return True
def bm_command_ping(self): def bm_command_ping(self):
"""Incoming ping, respond to it."""
self.append_write_buf(protocol.CreatePacket('pong')) self.append_write_buf(protocol.CreatePacket('pong'))
return True return True
def bm_command_pong(self): def bm_command_pong(self):
"""
Incoming pong.
Ignore it. PyBitmessage pings connections after about 5 minutes
of inactivity, and leaves it to the TCP stack to handle actual
timeouts. So there is no need to do anything when a pong arrives.
"""
# nothing really # nothing really
return True return True
def bm_command_verack(self): def bm_command_verack(self):
"""
Incoming verack.
If already sent my own verack, handshake is complete (except
potentially waiting for buffers to flush), so we can continue
to the main connection phase. If not sent verack yet,
continue processing.
"""
self.verackReceived = True self.verackReceived = True
if not self.verackSent: if not self.verackSent:
return True return True
@ -452,6 +489,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return False return False
def bm_command_version(self): def bm_command_version(self):
"""
Incoming version.
Parse and log, remember important things, like streams, bitfields, etc.
"""
(self.remoteProtocolVersion, self.services, self.timestamp, (self.remoteProtocolVersion, self.services, self.timestamp,
self.sockNode, self.peerNode, self.nonce, self.userAgent, self.sockNode, self.peerNode, self.nonce, self.userAgent,
self.streams) = self.decode_payload_content("IQQiiQlsLv") self.streams) = self.decode_payload_content("IQQiiQlsLv")
@ -467,7 +508,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
logger.debug('user agent: %s', self.userAgent) logger.debug('user agent: %s', self.userAgent)
logger.debug('streams: [%s]', ','.join(map(str, self.streams))) logger.debug('streams: [%s]', ','.join(map(str, self.streams)))
if not self.peerValidityChecks(): if not self.peerValidityChecks():
# TODO ABORT # ABORT afterwards
return True return True
self.append_write_buf(protocol.CreatePacket('verack')) self.append_write_buf(protocol.CreatePacket('verack'))
self.verackSent = True self.verackSent = True
@ -490,6 +531,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return False return False
def peerValidityChecks(self): def peerValidityChecks(self):
"""Check the validity of the peer"""
if self.remoteProtocolVersion < 3: if self.remoteProtocolVersion < 3:
self.append_write_buf(protocol.assembleErrorMessage( self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your is using an old protocol. Closing connection.", errorText="Your is using an old protocol. Closing connection.",
@ -569,6 +611,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
@staticmethod @staticmethod
def assembleAddr(peerList): def assembleAddr(peerList):
"""Build up a packed address"""
if isinstance(peerList, state.Peer): if isinstance(peerList, state.Peer):
peerList = (peerList) peerList = (peerList)
if not peerList: if not peerList:
@ -591,6 +634,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
@staticmethod @staticmethod
def stopDownloadingObject(hashId, forwardAnyway=False): def stopDownloadingObject(hashId, forwardAnyway=False):
"""Stop downloading an object"""
for connection in ( for connection in (
connectionpool.BMConnectionPool().inboundConnections.values() + connectionpool.BMConnectionPool().inboundConnections.values() +
connectionpool.BMConnectionPool().outboundConnections.values() connectionpool.BMConnectionPool().outboundConnections.values()
@ -611,6 +655,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
pass pass
def handle_close(self): def handle_close(self):
"""Handle close"""
self.set_state("close") self.set_state("close")
if not (self.accepting or self.connecting or self.connected): if not (self.accepting or self.connecting or self.connected):
# already disconnected # already disconnected