PyBitmessage-2021-04-27/src/network/bmproto.py

658 lines
26 KiB
Python
Raw Normal View History

import base64
import hashlib
import socket
import struct
import time
from binascii import hexlify
2018-07-17 11:28:56 +00:00
import addresses
import connectionpool
import knownnodes
import protocol
import state
from bmconfigparser import BMConfigParser
from debug import logger
from inventory import Inventory
from network.advanceddispatcher import AdvancedDispatcher
from network.dandelion import Dandelion
2018-07-17 11:28:56 +00:00
from network.bmobject import (
BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError,
BMObjectExpiredError, BMObjectUnwantedStreamError,
BMObjectInvalidError, BMObjectAlreadyHaveError)
2017-04-16 16:27:15 +00:00
from network.node import Node
from network.proxy import ProxyError
2018-07-17 11:28:56 +00:00
from objectracker import missingObjects, ObjectTracker
from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue
2018-07-17 11:28:56 +00:00
from randomtrackingdict import RandomTrackingDict
class BMProtoError(ProxyError):
errorCodes = ("Protocol error")
class BMProtoInsufficientDataError(BMProtoError):
errorCodes = ("Insufficient data")
class BMProtoExcessiveDataError(BMProtoError):
errorCodes = ("Too much data")
class BMProto(AdvancedDispatcher, ObjectTracker):
# ~1.6 MB which is the maximum possible size of an inv message.
maxMessageSize = 1600100
# 2**18 = 256kB is the maximum size of an object payload
maxObjectPayloadSize = 2**18
2017-04-04 08:46:01 +00:00
# protocol specification says max 1000 addresses in one addr command
maxAddrCount = 1000
# protocol specification says max 50000 objects in one inv command
maxObjectCount = 50000
# address is online if online less than this many seconds ago
addressAlive = 10800
# maximum time offset
maxTimeOffset = 3600
timeOffsetWrongCount = 0
def __init__(self, address=None, sock=None):
AdvancedDispatcher.__init__(self, sock)
self.isOutbound = False
# packet/connection from a local IP
self.local = False
self.pendingUpload = RandomTrackingDict()
def bm_proto_reset(self):
self.magic = None
self.command = None
self.payloadLength = 0
self.checksum = None
self.payload = None
self.invalid = False
2017-04-16 16:27:15 +00:00
self.payloadOffset = 0
self.expectBytes = protocol.Header.size
self.object = None
def state_bm_header(self):
2018-07-17 11:28:56 +00:00
self.magic, self.command, self.payloadLength, self.checksum = \
protocol.Header.unpack(self.read_buf[:protocol.Header.size])
self.command = self.command.rstrip('\x00')
if self.magic != 0xE9BEB4D9:
# skip 1 byte in order to sync
self.set_state("bm_header", length=1)
self.bm_proto_reset()
2018-07-17 11:28:56 +00:00
logger.debug('Bad magic')
if self.socket.type == socket.SOCK_STREAM:
self.close_reason = "Bad magic"
self.set_state("close")
return False
if self.payloadLength > BMProto.maxMessageSize:
self.invalid = True
2018-07-17 11:28:56 +00:00
self.set_state(
"bm_command",
length=protocol.Header.size, expectBytes=self.payloadLength)
return True
2018-07-17 11:28:56 +00:00
def state_bm_command(self):
self.payload = self.read_buf[:self.payloadLength]
if self.checksum != hashlib.sha512(self.payload).digest()[0:4]:
2018-07-17 11:28:56 +00:00
logger.debug('Bad checksum, ignoring')
self.invalid = True
2017-04-04 08:46:01 +00:00
retval = True
2018-07-17 11:28:56 +00:00
if not self.fullyEstablished and self.command not in (
"error", "version", "verack"):
logger.error(
'Received command %s before connection was fully'
' established, ignoring', self.command)
self.invalid = True
if not self.invalid:
try:
2018-07-17 11:28:56 +00:00
retval = getattr(
self, "bm_command_" + str(self.command).lower())()
except AttributeError:
# unimplemented command
2018-07-17 11:28:56 +00:00
logger.debug('unimplemented command %s', self.command)
except BMProtoInsufficientDataError:
2018-07-17 11:28:56 +00:00
logger.debug('packet length too short, skipping')
except BMProtoExcessiveDataError:
2018-07-17 11:28:56 +00:00
logger.debug('too much data, skipping')
except BMObjectInsufficientPOWError:
2018-07-17 11:28:56 +00:00
logger.debug('insufficient PoW, skipping')
except BMObjectInvalidDataError:
2018-07-17 11:28:56 +00:00
logger.debug('object invalid data, skipping')
except BMObjectExpiredError:
2018-07-17 11:28:56 +00:00
logger.debug('object expired, skipping')
except BMObjectUnwantedStreamError:
2018-07-17 11:28:56 +00:00
logger.debug('object not in wanted stream, skipping')
except BMObjectInvalidError:
2018-07-17 11:28:56 +00:00
logger.debug('object invalid, skipping')
except BMObjectAlreadyHaveError:
2018-07-17 11:28:56 +00:00
logger.debug(
'%(host)s:%(port)i already got object, skipping',
self.destination._asdict())
except struct.error:
2018-07-17 11:28:56 +00:00
logger.debug('decoding error, skipping')
elif self.socket.type == socket.SOCK_DGRAM:
# broken read, ignore
pass
else:
2018-07-17 11:28:56 +00:00
logger.debug('Closing due to invalid command %s', self.command)
self.close_reason = "Invalid command %s" % self.command
self.set_state("close")
return False
2017-04-04 08:46:01 +00:00
if retval:
self.set_state("bm_header", length=self.payloadLength)
2017-04-04 08:46:01 +00:00
self.bm_proto_reset()
# else assume the command requires a different state to follow
return True
2017-04-16 16:27:15 +00:00
def decode_payload_string(self, length):
2018-07-17 11:28:56 +00:00
value = self.payload[self.payloadOffset:self.payloadOffset + length]
2017-04-16 16:27:15 +00:00
self.payloadOffset += length
return value
def decode_payload_varint(self):
value, offset = addresses.decodeVarint(self.payload[self.payloadOffset:])
self.payloadOffset += offset
return value
def decode_payload_node(self):
2018-07-17 11:28:56 +00:00
# protocol.checkIPAddress()
services, host, port = self.decode_payload_content("Q16sH")
if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
host = socket.inet_ntop(socket.AF_INET, str(host[12:16]))
elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43':
# Onion, based on BMD/bitcoind
host = base64.b32encode(host[6:]).lower() + ".onion"
else:
host = socket.inet_ntop(socket.AF_INET6, str(host))
if host == "":
2018-07-17 11:28:56 +00:00
# This can happen on Windows systems which are not 64-bit
# compatible so let us drop the IPv6 address.
host = socket.inet_ntop(socket.AF_INET, str(host[12:16]))
return Node(services, host, port)
2017-04-16 16:27:15 +00:00
2018-07-17 11:28:56 +00:00
def decode_payload_content(self, pattern="v"):
# L = varint indicating the length of the next array
# l = varint indicating the length of the next item
2017-04-16 16:27:15 +00:00
# v = varint (or array)
# H = uint16
# I = uint32
# Q = uint64
# i = net_addr (without time and stream number)
# s = string
# 0-9 = length of the next item
# , = end of array
def decode_simple(self, char="v"):
if char == "v":
return self.decode_payload_varint()
if char == "i":
return self.decode_payload_node()
if char == "H":
self.payloadOffset += 2
2018-07-17 11:28:56 +00:00
return struct.unpack(">H", self.payload[
self.payloadOffset - 2:self.payloadOffset])[0]
if char == "I":
self.payloadOffset += 4
2018-07-17 11:28:56 +00:00
return struct.unpack(">I", self.payload[
self.payloadOffset - 4:self.payloadOffset])[0]
if char == "Q":
self.payloadOffset += 8
2018-07-17 11:28:56 +00:00
return struct.unpack(">Q", self.payload[
self.payloadOffset - 8:self.payloadOffset])[0]
2017-04-16 16:27:15 +00:00
size = None
isArray = False
# size
# iterator starting from size counting to 0
# isArray?
# subpattern
# position of parser in subpattern
# retval (array)
parserStack = [[1, 1, False, pattern, 0, []]]
while True:
i = parserStack[-1][3][parserStack[-1][4]]
2018-07-17 11:28:56 +00:00
if i in "0123456789" and (
size is None or parserStack[-1][3][parserStack[-1][4] - 1]
not in "lL"):
try:
size = size * 10 + int(i)
except TypeError:
size = int(i)
isArray = False
elif i in "Ll" and size is None:
size = self.decode_payload_varint()
2018-07-17 11:28:56 +00:00
isArray = i == "L"
elif size is not None:
if isArray:
2018-07-17 11:28:56 +00:00
parserStack.append([
size, size, isArray,
parserStack[-1][3][parserStack[-1][4]:], 0, []
])
parserStack[-2][4] = len(parserStack[-2][3])
else:
for j in range(parserStack[-1][4], len(parserStack[-1][3])):
if parserStack[-1][3][j] not in "lL0123456789":
break
2018-07-17 11:28:56 +00:00
parserStack.append([
size, size, isArray,
parserStack[-1][3][parserStack[-1][4]:j + 1], 0, []
])
2017-07-11 08:29:29 +00:00
parserStack[-2][4] += len(parserStack[-1][3]) - 1
size = None
continue
elif i == "s":
2018-07-17 11:28:56 +00:00
# if parserStack[-2][2]:
# parserStack[-1][5].append(self.payload[
# self.payloadOffset:self.payloadOffset + parserStack[-1][0]])
# else:
parserStack[-1][5] = self.payload[
self.payloadOffset:self.payloadOffset + parserStack[-1][0]]
self.payloadOffset += parserStack[-1][0]
parserStack[-1][1] = 0
parserStack[-1][2] = True
2018-07-17 11:28:56 +00:00
# del parserStack[-1]
size = None
elif i in "viHIQ":
2018-07-17 11:28:56 +00:00
parserStack[-1][5].append(decode_simple(
self, parserStack[-1][3][parserStack[-1][4]]))
size = None
2017-04-16 16:27:15 +00:00
else:
size = None
for depth in range(len(parserStack) - 1, -1, -1):
parserStack[depth][4] += 1
if parserStack[depth][4] >= len(parserStack[depth][3]):
parserStack[depth][1] -= 1
parserStack[depth][4] = 0
if depth > 0:
if parserStack[depth][2]:
2018-07-17 11:28:56 +00:00
parserStack[depth - 1][5].append(
parserStack[depth][5])
else:
2018-07-17 11:28:56 +00:00
parserStack[depth - 1][5].extend(
parserStack[depth][5])
parserStack[depth][5] = []
if parserStack[depth][1] <= 0:
if depth == 0:
2018-07-17 11:28:56 +00:00
# we're done, at depth 0 counter is at 0
# and pattern is done parsing
return parserStack[depth][5]
del parserStack[-1]
continue
break
break
if self.payloadOffset > self.payloadLength:
2018-07-17 11:28:56 +00:00
logger.debug(
'Insufficient data %i/%i',
self.payloadOffset, self.payloadLength)
raise BMProtoInsufficientDataError()
2017-04-16 16:27:15 +00:00
2017-04-04 08:46:01 +00:00
def bm_command_error(self):
2018-07-17 11:28:56 +00:00
fatalStatus, banTime, inventoryVector, errorText = \
self.decode_payload_content("vvlsls")
logger.error(
'%s:%i error: %i, %s', self.destination.host,
self.destination.port, fatalStatus, errorText)
return True
2017-04-16 16:27:15 +00:00
2017-04-04 08:46:01 +00:00
def bm_command_getdata(self):
items = self.decode_payload_content("l32s")
# skip?
now = time.time()
if now < self.skipUntil:
return True
2018-12-20 19:33:27 +00:00
for i in items:
self.pendingUpload[str(i)] = now
return True
def _command_inv(self, dandelion=False):
items = self.decode_payload_content("l32s")
if len(items) > BMProto.maxObjectCount:
2018-07-17 11:28:56 +00:00
logger.error(
'Too many items in %sinv message!', 'd' if dandelion else '')
raise BMProtoExcessiveDataError()
# ignore dinv if dandelion turned off
if dandelion and not state.dandelion:
return True
for i in map(str, items):
if i in Inventory() and not Dandelion().hasHash(i):
continue
if dandelion and not Dandelion().hasHash(i):
Dandelion().addHash(i, self)
self.handleReceivedInventory(i)
return True
2017-04-16 16:27:15 +00:00
def bm_command_inv(self):
return self._command_inv(False)
def bm_command_dinv(self):
"""
Dandelion stem announce
"""
return self._command_inv(True)
2017-04-04 08:46:01 +00:00
def bm_command_object(self):
objectOffset = self.payloadOffset
2018-07-17 11:28:56 +00:00
nonce, expiresTime, objectType, version, streamNumber = \
self.decode_payload_content("QQIvv")
self.object = BMObject(
nonce, expiresTime, objectType, version, streamNumber,
self.payload, self.payloadOffset)
if len(self.payload) - self.payloadOffset > BMProto.maxObjectPayloadSize:
2018-07-17 11:28:56 +00:00
logger.info(
'The payload length of this object is too large (%d bytes).'
' Ignoring it.', len(self.payload) - self.payloadOffset)
raise BMProtoExcessiveDataError()
try:
self.object.checkProofOfWorkSufficient()
self.object.checkEOLSanity()
self.object.checkAlreadyHave()
2018-07-17 11:28:56 +00:00
except (BMObjectExpiredError, BMObjectAlreadyHaveError,
BMObjectInsufficientPOWError):
BMProto.stopDownloadingObject(self.object.inventoryHash)
2018-07-17 11:28:56 +00:00
raise
try:
self.object.checkStream()
2018-07-17 11:28:56 +00:00
except BMObjectUnwantedStreamError:
acceptmismatch = BMConfigParser().get(
"inventory", "acceptmismatch")
BMProto.stopDownloadingObject(
self.object.inventoryHash, acceptmismatch)
if not acceptmismatch:
raise
try:
self.object.checkObjectByType()
2018-07-17 11:28:56 +00:00
objectProcessorQueue.put((
self.object.objectType, buffer(self.object.data)))
except BMObjectInvalidError:
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
else:
try:
del missingObjects[self.object.inventoryHash]
except KeyError:
pass
if self.object.inventoryHash in Inventory() and Dandelion().hasHash(self.object.inventoryHash):
Dandelion().removeHash(self.object.inventoryHash, "cycle detection")
Inventory()[self.object.inventoryHash] = (
2018-07-17 11:28:56 +00:00
self.object.objectType, self.object.streamNumber,
buffer(self.payload[objectOffset:]), self.object.expiresTime,
buffer(self.object.tag)
)
self.handleReceivedObject(
self.object.streamNumber, self.object.inventoryHash)
invQueue.put((
self.object.streamNumber, self.object.inventoryHash,
self.destination))
return True
2017-04-16 16:27:15 +00:00
def _decode_addr(self):
return self.decode_payload_content("LQIQ16sH")
2017-04-16 16:27:15 +00:00
def bm_command_addr(self):
addresses = self._decode_addr()
for i in addresses:
seenTime, stream, services, ip, port = i
decodedIP = protocol.checkIPAddress(str(ip))
if stream not in state.streamsInWhichIAmParticipating:
continue
if (
decodedIP and time.time() - seenTime > 0 and
seenTime > time.time() - BMProto.addressAlive and
port > 0
):
peer = state.Peer(decodedIP, port)
2017-10-19 06:52:44 +00:00
try:
if knownnodes.knownNodes[stream][peer]["lastseen"] > seenTime:
continue
except KeyError:
pass
if len(knownnodes.knownNodes[stream]) < BMConfigParser().safeGetInt("knownnodes", "maxnodes"):
with knownnodes.knownNodesLock:
try:
knownnodes.knownNodes[stream][peer]["lastseen"] = seenTime
except (TypeError, KeyError):
knownnodes.knownNodes[stream][peer] = {
"lastseen": seenTime,
"rating": 0,
"self": False,
}
addrQueue.put((stream, peer, self.destination))
return True
2017-04-16 16:27:15 +00:00
def bm_command_portcheck(self):
portCheckerQueue.put(state.Peer(self.destination, self.peerNode.port))
return True
2017-04-04 08:46:01 +00:00
def bm_command_ping(self):
self.append_write_buf(protocol.CreatePacket('pong'))
return True
2017-04-16 16:27:15 +00:00
2017-04-04 08:46:01 +00:00
def bm_command_pong(self):
2017-04-16 16:27:15 +00:00
# nothing really
return True
2017-04-04 08:46:01 +00:00
def bm_command_verack(self):
self.verackReceived = True
2018-07-17 11:28:56 +00:00
if not self.verackSent:
return True
self.set_state(
"tls_init" if self.isSSL else "connection_fully_established",
length=self.payloadLength, expectBytes=0)
return False
def bm_command_version(self):
2018-07-17 11:28:56 +00:00
(self.remoteProtocolVersion, self.services, self.timestamp,
self.sockNode, self.peerNode, self.nonce, self.userAgent,
self.streams) = self.decode_payload_content("IQQiiQlsLv")
self.nonce = struct.pack('>Q', self.nonce)
2017-04-16 16:27:15 +00:00
self.timeOffset = self.timestamp - int(time.time())
2018-07-17 11:28:56 +00:00
logger.debug('remoteProtocolVersion: %i', self.remoteProtocolVersion)
logger.debug('services: 0x%08X', self.services)
logger.debug('time offset: %i', self.timestamp - int(time.time()))
logger.debug('my external IP: %s', self.sockNode.host)
logger.debug(
'remote node incoming address: %s:%i',
self.destination.host, self.peerNode.port)
logger.debug('user agent: %s', self.userAgent)
logger.debug('streams: [%s]', ','.join(map(str, self.streams)))
2017-04-04 08:46:01 +00:00
if not self.peerValidityChecks():
# TODO ABORT
return True
self.append_write_buf(protocol.CreatePacket('verack'))
2017-04-04 08:46:01 +00:00
self.verackSent = True
if not self.isOutbound:
2018-07-17 11:28:56 +00:00
self.append_write_buf(protocol.assembleVersionMessage(
self.destination.host, self.destination.port,
connectionpool.BMConnectionPool().streams, True,
nodeid=self.nodeid))
logger.debug(
'%(host)s:%(port)i sending version',
self.destination._asdict())
2017-04-04 08:46:01 +00:00
if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and
protocol.haveSSL(not self.isOutbound)):
self.isSSL = True
2018-07-17 11:28:56 +00:00
if not self.verackReceived:
return True
self.set_state(
"tls_init" if self.isSSL else "connection_fully_established",
length=self.payloadLength, expectBytes=0)
return False
2017-04-04 08:46:01 +00:00
def peerValidityChecks(self):
if self.remoteProtocolVersion < 3:
2018-07-17 11:28:56 +00:00
self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your is using an old protocol. Closing connection.",
fatal=2))
logger.debug(
'Closing connection to old protocol version %s, node: %s',
self.remoteProtocolVersion, self.destination)
2017-04-04 08:46:01 +00:00
return False
if self.timeOffset > BMProto.maxTimeOffset:
2018-07-17 11:28:56 +00:00
self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your time is too far in the future compared to mine."
" Closing connection.", fatal=2))
logger.info(
"%s's time is too far in the future (%s seconds)."
" Closing connection to it.", self.destination, self.timeOffset)
BMProto.timeOffsetWrongCount += 1
2017-04-04 08:46:01 +00:00
return False
elif self.timeOffset < -BMProto.maxTimeOffset:
2018-07-17 11:28:56 +00:00
self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your time is too far in the past compared to mine."
" Closing connection.", fatal=2))
logger.info(
"%s's time is too far in the past (timeOffset %s seconds)."
" Closing connection to it.", self.destination, self.timeOffset)
BMProto.timeOffsetWrongCount += 1
2017-04-04 08:46:01 +00:00
return False
else:
BMProto.timeOffsetWrongCount = 0
2017-06-24 10:13:35 +00:00
if not self.streams:
2018-07-17 11:28:56 +00:00
self.append_write_buf(protocol.assembleErrorMessage(
errorText="We don't have shared stream interests."
" Closing connection.", fatal=2))
logger.debug(
'Closed connection to %s because there is no overlapping interest'
' in streams.', self.destination)
2017-04-04 08:46:01 +00:00
return False
2018-07-17 11:28:56 +00:00
if self.destination in connectionpool.BMConnectionPool().inboundConnections:
try:
if not protocol.checkSocksIP(self.destination.host):
2018-07-17 11:28:56 +00:00
self.append_write_buf(protocol.assembleErrorMessage(
errorText="Too many connections from your IP."
" Closing connection.", fatal=2))
logger.debug(
'Closed connection to %s because we are already connected'
' to that IP.', self.destination)
return False
except:
pass
if not self.isOutbound:
2018-07-17 11:28:56 +00:00
# incoming from a peer we're connected to as outbound,
# or server full report the same error to counter deanonymisation
if (
state.Peer(self.destination.host, self.peerNode.port) in
connectionpool.BMConnectionPool().inboundConnections or
len(connectionpool.BMConnectionPool().inboundConnections) +
len(connectionpool.BMConnectionPool().outboundConnections) >
BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections") +
BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections")
):
self.append_write_buf(protocol.assembleErrorMessage(
errorText="Server full, please try again later.", fatal=2))
logger.debug(
'Closed connection to %s due to server full'
' or duplicate inbound/outbound.', self.destination)
return False
2018-07-17 11:28:56 +00:00
if connectionpool.BMConnectionPool().isAlreadyConnected(
self.nonce):
self.append_write_buf(protocol.assembleErrorMessage(
errorText="I'm connected to myself. Closing connection.",
fatal=2))
logger.debug(
"Closed connection to %s because I'm connected to myself.",
self.destination)
return False
return True
@staticmethod
def assembleAddr(peerList):
2017-06-24 10:13:35 +00:00
if isinstance(peerList, state.Peer):
peerList = (peerList)
if not peerList:
return b''
retval = b''
for i in range(0, len(peerList), BMProto.maxAddrCount):
2018-07-17 11:28:56 +00:00
payload = addresses.encodeVarint(
len(peerList[i:i + BMProto.maxAddrCount]))
for address in peerList[i:i + BMProto.maxAddrCount]:
stream, peer, timestamp = address
payload += struct.pack(
'>Q', timestamp) # 64-bit time
payload += struct.pack('>I', stream)
payload += struct.pack(
'>q', 1) # service bit flags offered by this node
payload += protocol.encodeHost(peer.host)
payload += struct.pack('>H', peer.port) # remote port
retval += protocol.CreatePacket('addr', payload)
return retval
2017-04-04 08:46:01 +00:00
@staticmethod
def stopDownloadingObject(hashId, forwardAnyway=False):
2018-07-17 11:28:56 +00:00
for connection in (
connectionpool.BMConnectionPool().inboundConnections.values() +
connectionpool.BMConnectionPool().outboundConnections.values()
):
try:
del connection.objectsNewToMe[hashId]