This repository has been archived on 2024-12-20. You can view files and clone it, but cannot push or open issues or pull requests.
PyBitmessage-2024-12-20/src/network/bmproto.py

732 lines
29 KiB
Python
Raw Normal View History

2019-08-30 12:42:39 +02:00
"""
src/network/bmproto.py
==================================
"""
# pylint: disable=attribute-defined-outside-init
import base64
import hashlib
import logging
import socket
import struct
import time
from binascii import hexlify
2018-07-17 13:28:56 +02:00
import addresses
2019-11-14 16:09:26 +01:00
from network import connectionpool
2018-07-17 13:28:56 +02:00
import knownnodes
import protocol
import state
from bmconfigparser import BMConfigParser
from inventory import Inventory
from network.advanceddispatcher import AdvancedDispatcher
from network.constants import (
ADDRESS_ALIVE,
MAX_MESSAGE_SIZE,
MAX_OBJECT_COUNT,
MAX_OBJECT_PAYLOAD_SIZE,
MAX_TIME_OFFSET)
from network.dandelion import Dandelion
2018-07-17 13:28:56 +02:00
from network.bmobject import (
BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError,
BMObjectExpiredError, BMObjectUnwantedStreamError,
BMObjectInvalidError, BMObjectAlreadyHaveError)
from network.proxy import ProxyError
from network.objectracker import missingObjects, ObjectTracker
2019-12-31 13:52:56 +01:00
from network.node import Node, Peer
from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue
from network.randomtrackingdict import RandomTrackingDict
global addr_count
addr_count = 0
global addr_verack
addr_verack = 0
global addr_version
addr_version = 0
# global addr_count
# addr_count = 0
count = 0
2019-12-23 12:18:37 +01:00
logger = logging.getLogger('default')
2019-12-31 13:52:56 +01:00
class BMProtoError(ProxyError):
"""A Bitmessage Protocol Base Error"""
errorCodes = ("Protocol error")
class BMProtoInsufficientDataError(BMProtoError):
"""A Bitmessage Protocol Insufficient Data Error"""
errorCodes = ("Insufficient data")
class BMProtoExcessiveDataError(BMProtoError):
"""A Bitmessage Protocol Excessive Data Error"""
errorCodes = ("Too much data")
class BMProto(AdvancedDispatcher, ObjectTracker):
"""A parser for the Bitmessage Protocol"""
2019-08-30 12:42:39 +02:00
# pylint: disable=too-many-instance-attributes, too-many-public-methods
timeOffsetWrongCount = 0
2019-08-30 12:42:39 +02:00
def __init__(self, address=None, sock=None): # pylint: disable=unused-argument, super-init-not-called
AdvancedDispatcher.__init__(self, sock)
self.isOutbound = False
# packet/connection from a local IP
self.local = False
self.pendingUpload = RandomTrackingDict()
# canonical identifier of network group
self.network_group = None
def bm_proto_reset(self):
"""Reset the bitmessage object parser"""
self.magic = None
self.command = None
self.payloadLength = 0
self.checksum = None
self.payload = None
self.invalid = False
2017-04-16 18:27:15 +02:00
self.payloadOffset = 0
self.expectBytes = protocol.Header.size
self.object = None
def state_bm_header(self):
"""Process incoming header"""
2018-07-17 13:28:56 +02:00
self.magic, self.command, self.payloadLength, self.checksum = \
protocol.Header.unpack(self.read_buf[:protocol.Header.size])
2019-12-31 13:52:56 +01:00
# its shoule be in string
2019-10-09 14:51:29 +02:00
self.command = self.command.rstrip('\x00'.encode('utf-8'))
2019-12-31 13:52:56 +01:00
# pylint: disable=global-statement
global count, addr_version, addr_count, addr_verack
count += 1
if self.command == 'verack'.encode():
2019-12-31 13:52:56 +01:00
addr_verack += 1
# print('the addr_verack count are -{}'.format(addr_verack))
if self.command == 'version'.encode():
2019-12-31 13:52:56 +01:00
addr_version += 1
# print('the addr_version count are -{}'.format(addr_version))
if self.command == 'addr'.encode():
2019-12-31 13:52:56 +01:00
addr_count += 1
# print('the addr_count count are -{}'.format(addr_count))
if self.magic != 0xE9BEB4D9:
# skip 1 byte in order to sync
2019-12-31 13:52:56 +01:00
# in the advancedispatched and length commend's
# escape the 1 length
self.set_state("bm_header", length=1)
self.bm_proto_reset()
2018-07-17 13:28:56 +02:00
logger.debug('Bad magic')
if self.socket.type == socket.SOCK_STREAM:
self.close_reason = "Bad magic"
self.set_state("close")
return False
if self.payloadLength > MAX_MESSAGE_SIZE:
self.invalid = True
2018-07-17 13:28:56 +02:00
self.set_state(
"bm_command",
length=protocol.Header.size, expectBytes=self.payloadLength)
return True
2018-07-17 13:28:56 +02:00
2019-12-31 13:52:56 +01:00
def state_bm_command(self): # pylint: disable=too-many-branches, too-many-statements
"""Process incoming command"""
self.payload = self.read_buf[:self.payloadLength]
if self.checksum != hashlib.sha512(self.payload).digest()[0:4]:
2018-07-17 13:28:56 +02:00
logger.debug('Bad checksum, ignoring')
self.invalid = True
2017-04-04 10:46:01 +02:00
retval = True
2018-07-17 13:28:56 +02:00
if not self.fullyEstablished and self.command not in (
2019-11-14 16:09:26 +01:00
"error".encode(), "version".encode(), "verack".encode()):
2018-07-17 13:28:56 +02:00
logger.error(
2019-11-14 16:09:26 +01:00
'Received command {} before connection was fully'
2019-12-31 13:52:56 +01:00
' established, ignoring'.format(self.command))
self.invalid = True
if not self.invalid:
try:
2019-11-14 16:09:26 +01:00
command = self.command.decode() if self.command else self.command
2018-07-17 13:28:56 +02:00
retval = getattr(
2019-12-31 13:52:56 +01:00
self, "bm_command_" + command)()
except AttributeError:
# unimplemented command
2018-07-17 13:28:56 +02:00
logger.debug('unimplemented command %s', self.command)
except BMProtoInsufficientDataError:
2018-07-17 13:28:56 +02:00
logger.debug('packet length too short, skipping')
except BMProtoExcessiveDataError:
2018-07-17 13:28:56 +02:00
logger.debug('too much data, skipping')
except BMObjectInsufficientPOWError:
2018-07-17 13:28:56 +02:00
logger.debug('insufficient PoW, skipping')
except BMObjectInvalidDataError:
2018-07-17 13:28:56 +02:00
logger.debug('object invalid data, skipping')
except BMObjectExpiredError:
2018-07-17 13:28:56 +02:00
logger.debug('object expired, skipping')
except BMObjectUnwantedStreamError:
2018-07-17 13:28:56 +02:00
logger.debug('object not in wanted stream, skipping')
except BMObjectInvalidError:
2018-07-17 13:28:56 +02:00
logger.debug('object invalid, skipping')
except BMObjectAlreadyHaveError:
2018-07-17 13:28:56 +02:00
logger.debug(
'%(host)s:%(port)i already got object, skipping',
2020-01-06 12:14:13 +01:00
self.destinaestion._asdict())
except struct.error:
2018-07-17 13:28:56 +02:00
logger.debug('decoding error, skipping')
2020-01-06 12:14:13 +01:00
except ValueError:
pass
elif self.socket.type == socket.SOCK_DGRAM:
# broken read, ignore
pass
else:
2019-11-14 16:09:26 +01:00
logger.debug('Closing due to invalid command {}'.format(self.command))
self.close_reason = ("Invalid command {}".format(self.command))
self.set_state("close")
return False
2017-04-04 10:46:01 +02:00
if retval:
# print('if retval is true and inside the if ')
self.set_state("bm_header", length=self.payloadLength)
2017-04-04 10:46:01 +02:00
self.bm_proto_reset()
# else assume the command requires a different state to follow
return True
2017-04-16 18:27:15 +02:00
def decode_payload_string(self, length):
"""Read and return `length` bytes from payload"""
2018-07-17 13:28:56 +02:00
value = self.payload[self.payloadOffset:self.payloadOffset + length]
2017-04-16 18:27:15 +02:00
self.payloadOffset += length
return value
def decode_payload_varint(self):
"""Decode a varint from the payload"""
2017-04-16 18:27:15 +02:00
value, offset = addresses.decodeVarint(self.payload[self.payloadOffset:])
self.payloadOffset += offset
return value
def decode_payload_node(self):
"""Decode node details from the payload"""
2018-07-17 13:28:56 +02:00
# protocol.checkIPAddress()
services, host, port = self.decode_payload_content("Q16sH")
2020-01-06 12:14:13 +01:00
if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'.encode('raw_unicode_escape'):
2019-11-14 16:09:26 +01:00
host = socket.inet_ntop(socket.AF_INET, host[12:16])
2020-01-06 12:14:13 +01:00
elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43'.encode('raw_unicode_escape'):
# Onion, based on BMD/bitcoind
host = base64.b32encode(host[6:]).lower() + ".onion"
else:
2019-11-14 16:09:26 +01:00
host = socket.inet_ntop(socket.AF_INET6, host)
if host == "":
2018-07-17 13:28:56 +02:00
# This can happen on Windows systems which are not 64-bit
# compatible so let us drop the IPv6 address.
2019-11-14 16:09:26 +01:00
host = socket.inet_ntop(socket.AF_INET, host[12:16])
return Node(services, host, port)
2017-04-16 18:27:15 +02:00
2019-08-30 12:42:39 +02:00
def decode_payload_content(self, pattern="v"): # pylint: disable=too-many-branches, too-many-statements
"""
Decode the payload depending on pattern:
L = varint indicating the length of the next array
l = varint indicating the length of the next item
v = varint (or array)
H = uint16
I = uint32
Q = uint64
i = net_addr (without time and stream number)
s = string
0-9 = length of the next item
, = end of array
"""
2017-04-16 18:27:15 +02:00
2019-08-30 12:42:39 +02:00
def decode_simple(self, char="v"): # pylint: disable=inconsistent-return-statements
"""Decode the payload using one char pattern"""
if char == "v":
return self.decode_payload_varint()
if char == "i":
return self.decode_payload_node()
if char == "H":
self.payloadOffset += 2
2018-07-17 13:28:56 +02:00
return struct.unpack(">H", self.payload[
self.payloadOffset - 2:self.payloadOffset])[0]
if char == "I":
self.payloadOffset += 4
2018-07-17 13:28:56 +02:00
return struct.unpack(">I", self.payload[
self.payloadOffset - 4:self.payloadOffset])[0]
if char == "Q":
self.payloadOffset += 8
2018-07-17 13:28:56 +02:00
return struct.unpack(">Q", self.payload[
self.payloadOffset - 8:self.payloadOffset])[0]
2017-04-16 18:27:15 +02:00
size = None
isArray = False
# size
# iterator starting from size counting to 0
# isArray?
# subpattern
# position of parser in subpattern
# retval (array)
parserStack = [[1, 1, False, pattern, 0, []]]
while True:
i = parserStack[-1][3][parserStack[-1][4]]
2018-07-17 13:28:56 +02:00
if i in "0123456789" and (
2019-08-30 12:42:39 +02:00
size is None or parserStack[-1][3][parserStack[-1][4] - 1]
2018-07-17 13:28:56 +02:00
not in "lL"):
try:
size = size * 10 + int(i)
except TypeError:
size = int(i)
isArray = False
elif i in "Ll" and size is None:
size = self.decode_payload_varint()
2018-07-17 13:28:56 +02:00
isArray = i == "L"
elif size is not None:
if isArray:
2018-07-17 13:28:56 +02:00
parserStack.append([
size, size, isArray,
parserStack[-1][3][parserStack[-1][4]:], 0, []
])
parserStack[-2][4] = len(parserStack[-2][3])
else:
for j in range(parserStack[-1][4], len(parserStack[-1][3])):
if parserStack[-1][3][j] not in "lL0123456789":
break
2019-08-30 12:42:39 +02:00
# pylint: disable=undefined-loop-variable
2018-07-17 13:28:56 +02:00
parserStack.append([
size, size, isArray,
parserStack[-1][3][parserStack[-1][4]:j + 1], 0, []
])
2017-07-11 10:29:29 +02:00
parserStack[-2][4] += len(parserStack[-1][3]) - 1
size = None
continue
elif i == "s":
2018-07-17 13:28:56 +02:00
# if parserStack[-2][2]:
# parserStack[-1][5].append(self.payload[
# self.payloadOffset:self.payloadOffset + parserStack[-1][0]])
# else:
parserStack[-1][5] = self.payload[
self.payloadOffset:self.payloadOffset + parserStack[-1][0]]
self.payloadOffset += parserStack[-1][0]
parserStack[-1][1] = 0
parserStack[-1][2] = True
2018-07-17 13:28:56 +02:00
# del parserStack[-1]
size = None
elif i in "viHIQ":
2018-07-17 13:28:56 +02:00
parserStack[-1][5].append(decode_simple(
self, parserStack[-1][3][parserStack[-1][4]]))
size = None
2017-04-16 18:27:15 +02:00
else:
size = None
for depth in range(len(parserStack) - 1, -1, -1):
parserStack[depth][4] += 1
if parserStack[depth][4] >= len(parserStack[depth][3]):
parserStack[depth][1] -= 1
parserStack[depth][4] = 0
if depth > 0:
if parserStack[depth][2]:
2018-07-17 13:28:56 +02:00
parserStack[depth - 1][5].append(
parserStack[depth][5])
else:
2018-07-17 13:28:56 +02:00
parserStack[depth - 1][5].extend(
parserStack[depth][5])
parserStack[depth][5] = []
if parserStack[depth][1] <= 0:
if depth == 0:
2018-07-17 13:28:56 +02:00
# we're done, at depth 0 counter is at 0
# and pattern is done parsing
return parserStack[depth][5]
del parserStack[-1]
continue
break
break
if self.payloadOffset > self.payloadLength:
2018-07-17 13:28:56 +02:00
logger.debug(
'Insufficient data %i/%i',
self.payloadOffset, self.payloadLength)
raise BMProtoInsufficientDataError()
2017-04-16 18:27:15 +02:00
2017-04-04 10:46:01 +02:00
def bm_command_error(self):
"""Decode an error message and log it"""
2019-12-31 13:52:56 +01:00
err_values = self.decode_payload_content("vvlsls")
fatalStatus = err_values[0]
# banTime = err_values[1]
# inventoryVector = err_values[2]
errorText = err_values[3]
2018-07-17 13:28:56 +02:00
logger.error(
'%s:%i error: %i, %s', self.destination.host,
self.destination.port, fatalStatus, errorText)
return True
2017-04-16 18:27:15 +02:00
2017-04-04 10:46:01 +02:00
def bm_command_getdata(self):
"""
Incoming request for object(s).
If we have them and some other conditions are fulfilled,
append them to the write queue.
"""
2019-12-31 13:52:56 +01:00
# 32 an array bit long strings
items = self.decode_payload_content("l32s")
# skip?
now = time.time()
if now < self.skipUntil:
return True
2018-12-20 20:33:27 +01:00
for i in items:
self.pendingUpload[str(i)] = now
return True
def _command_inv(self, dandelion=False):
items = self.decode_payload_content("l32s")
if len(items) > MAX_OBJECT_COUNT:
2018-07-17 13:28:56 +02:00
logger.error(
'Too many items in %sinv message!', 'd' if dandelion else '')
raise BMProtoExcessiveDataError()
# ignore dinv if dandelion turned off
if dandelion and not state.dandelion:
return True
2020-01-06 12:14:13 +01:00
for i in map(bytes, items):
if i in Inventory() and not Dandelion().hasHash(i):
continue
if dandelion and not Dandelion().hasHash(i):
Dandelion().addHash(i, self)
self.handleReceivedInventory(i)
return True
2017-04-16 18:27:15 +02:00
def bm_command_inv(self):
"""Non-dandelion announce"""
return self._command_inv(False)
def bm_command_dinv(self):
"""Dandelion stem announce"""
return self._command_inv(True)
2017-04-04 10:46:01 +02:00
def bm_command_object(self):
"""Incoming object, process it"""
objectOffset = self.payloadOffset
2018-07-17 13:28:56 +02:00
nonce, expiresTime, objectType, version, streamNumber = \
self.decode_payload_content("QQIvv")
self.object = BMObject(
nonce, expiresTime, objectType, version, streamNumber,
self.payload, self.payloadOffset)
if len(self.payload) - self.payloadOffset > MAX_OBJECT_PAYLOAD_SIZE:
2018-07-17 13:28:56 +02:00
logger.info(
'The payload length of this object is too large (%d bytes).'
' Ignoring it.', len(self.payload) - self.payloadOffset)
raise BMProtoExcessiveDataError()
try:
self.object.checkProofOfWorkSufficient()
self.object.checkEOLSanity()
self.object.checkAlreadyHave()
2018-07-17 13:28:56 +02:00
except (BMObjectExpiredError, BMObjectAlreadyHaveError,
BMObjectInsufficientPOWError):
BMProto.stopDownloadingObject(self.object.inventoryHash)
2018-07-17 13:28:56 +02:00
raise
try:
self.object.checkStream()
2018-07-17 13:28:56 +02:00
except BMObjectUnwantedStreamError:
acceptmismatch = BMConfigParser().get(
"inventory", "acceptmismatch")
BMProto.stopDownloadingObject(
self.object.inventoryHash, acceptmismatch)
if not acceptmismatch:
raise
try:
self.object.checkObjectByType()
2018-07-17 13:28:56 +02:00
objectProcessorQueue.put((
2020-01-06 12:14:13 +01:00
self.object.objectType, memoryview(self.object.data)))
2018-07-17 13:28:56 +02:00
except BMObjectInvalidError:
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
else:
try:
del missingObjects[self.object.inventoryHash]
except KeyError:
pass
if self.object.inventoryHash in Inventory() and Dandelion().hasHash(self.object.inventoryHash):
Dandelion().removeHash(self.object.inventoryHash, "cycle detection")
2020-01-06 12:14:13 +01:00
[self.object.inventoryHash] = (
2018-07-17 13:28:56 +02:00
self.object.objectType, self.object.streamNumber,
2020-01-06 12:14:13 +01:00
memoryview(self.payload[objectOffset:]), self.object.expiresTime,
memoryview(self.object.tag)
2018-07-17 13:28:56 +02:00
)
2020-01-06 12:14:13 +01:00
Inventory()[self.object.inventoryHash]
2018-07-17 13:28:56 +02:00
self.handleReceivedObject(
self.object.streamNumber, self.object.inventoryHash)
invQueue.put((
self.object.streamNumber, self.object.inventoryHash,
self.destination))
return True
2017-04-16 18:27:15 +02:00
def _decode_addr(self):
return self.decode_payload_content("LQIQ16sH")
2017-04-16 18:27:15 +02:00
def bm_command_addr(self):
"""Incoming addresses, process them"""
2019-08-30 12:42:39 +02:00
addresses = self._decode_addr() # pylint: disable=redefined-outer-name
for i in addresses:
2019-12-31 13:52:56 +01:00
seenTime, stream, _, ip, port = i
decodedIP = protocol.checkIPAddress(bytes(ip))
if stream not in state.streamsInWhichIAmParticipating:
continue
if (
2019-08-30 12:42:39 +02:00
decodedIP and time.time() - seenTime > 0 and
seenTime > time.time() - ADDRESS_ALIVE and
2019-08-30 12:42:39 +02:00
port > 0
):
peer = Peer(decodedIP, port)
2017-10-19 08:52:44 +02:00
try:
if knownnodes.knownNodes[stream][peer]["lastseen"] > seenTime:
continue
except KeyError:
pass
if len(knownnodes.knownNodes[stream]) < int(BMConfigParser().safeGet("knownnodes", "maxnodes")):
with knownnodes.knownNodesLock:
try:
knownnodes.knownNodes[stream][peer]["lastseen"] = seenTime
except (TypeError, KeyError):
knownnodes.knownNodes[stream][peer] = {
"lastseen": seenTime,
"rating": 0,
"self": False,
}
# since we don't track peers outside of knownnodes,
# only spread if in knownnodes to prevent flood
addrQueue.put((stream, peer, seenTime,
self.destination))
return True
2017-04-16 18:27:15 +02:00
def bm_command_portcheck(self):
"""Incoming port check request, queue it."""
portCheckerQueue.put(Peer(self.destination, self.peerNode.port))
return True
2017-04-04 10:46:01 +02:00
def bm_command_ping(self):
"""Incoming ping, respond to it."""
self.append_write_buf(protocol.CreatePacket('pong'))
return True
2017-04-16 18:27:15 +02:00
2019-08-30 12:42:39 +02:00
def bm_command_pong(self): # pylint: disable=no-self-use
"""
Incoming pong.
Ignore it. PyBitmessage pings connections after about 5 minutes
of inactivity, and leaves it to the TCP stack to handle actual
timeouts. So there is no need to do anything when a pong arrives.
"""
2017-04-16 18:27:15 +02:00
# nothing really
return True
2017-04-04 10:46:01 +02:00
def bm_command_verack(self):
"""
Incoming verack.
If already sent my own verack, handshake is complete (except
potentially waiting for buffers to flush), so we can continue
to the main connection phase. If not sent verack yet,
continue processing.
"""
self.verackReceived = True
2018-07-17 13:28:56 +02:00
if not self.verackSent:
return True
self.set_state(
"tls_init" if self.isSSL else "connection_fully_established",
length=self.payloadLength, expectBytes=0)
return False
2019-12-31 13:52:56 +01:00
def bm_command_version(self):
# print('inside the bmproto ')
"""
Incoming version.
Parse and log, remember important things, like streams, bitfields, etc.
"""
2018-07-17 13:28:56 +02:00
(self.remoteProtocolVersion, self.services, self.timestamp,
self.sockNode, self.peerNode, self.nonce, self.userAgent,
self.streams) = self.decode_payload_content("IQQiiQlsLv")
self.nonce = struct.pack('>Q', self.nonce)
2017-04-16 18:27:15 +02:00
self.timeOffset = self.timestamp - int(time.time())
2018-07-17 13:28:56 +02:00
logger.debug('remoteProtocolVersion: %i', self.remoteProtocolVersion)
logger.debug('services: 0x%08X', self.services)
logger.debug('time offset: %i', self.timeOffset)
2018-07-17 13:28:56 +02:00
logger.debug('my external IP: %s', self.sockNode.host)
logger.debug(
'remote node incoming address: %s:%i',
self.destination.host, self.peerNode.port)
logger.debug('user agent: %s', self.userAgent)
logger.debug('streams: [%s]', ','.join(map(str, self.streams)))
2017-04-04 10:46:01 +02:00
if not self.peerValidityChecks():
# ABORT afterwards
2017-04-04 10:46:01 +02:00
return True
self.append_write_buf(protocol.CreatePacket('verack'))
2017-04-04 10:46:01 +02:00
self.verackSent = True
if not self.isOutbound:
2018-07-17 13:28:56 +02:00
self.append_write_buf(protocol.assembleVersionMessage(
self.destination.host, self.destination.port,
connectionpool.BMConnectionPool().streams, True,
nodeid=self.nodeid))
logger.debug(
'%(host)s:%(port)i sending version',
self.destination._asdict())
2019-12-31 13:52:56 +01:00
if self.services & protocol.NODE_SSL == protocol.NODE_SSL:
# self.isSSL = True
pass
2018-07-17 13:28:56 +02:00
if not self.verackReceived:
return True
# self.set_state(
# "tls_init" if self.isSSL else "connection_fully_established",
# length=self.payloadLength, expectBytes=0)
2018-07-17 13:28:56 +02:00
self.set_state(
"connection_fully_established",
2018-07-17 13:28:56 +02:00
length=self.payloadLength, expectBytes=0)
return False
2017-04-04 10:46:01 +02:00
2019-08-30 12:42:39 +02:00
def peerValidityChecks(self): # pylint: disable=too-many-return-statements
"""Check the validity of the peer"""
2017-04-04 10:46:01 +02:00
if self.remoteProtocolVersion < 3:
2018-07-17 13:28:56 +02:00
self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your is using an old protocol. Closing connection.",
fatal=2))
logger.debug(
'Closing connection to old protocol version %s, node: %s',
self.remoteProtocolVersion, self.destination)
2017-04-04 10:46:01 +02:00
return False
if self.timeOffset > MAX_TIME_OFFSET:
2018-07-17 13:28:56 +02:00
self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your time is too far in the future compared to mine."
" Closing connection.", fatal=2))
logger.info(
"%s's time is too far in the future (%s seconds)."
" Closing connection to it.", self.destination, self.timeOffset)
BMProto.timeOffsetWrongCount += 1
2017-04-04 10:46:01 +02:00
return False
elif self.timeOffset < -MAX_TIME_OFFSET:
2018-07-17 13:28:56 +02:00
self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your time is too far in the past compared to mine."
" Closing connection.", fatal=2))
logger.info(
"%s's time is too far in the past (timeOffset %s seconds)."
" Closing connection to it.", self.destination, self.timeOffset)
BMProto.timeOffsetWrongCount += 1
2017-04-04 10:46:01 +02:00
return False
else:
BMProto.timeOffsetWrongCount = 0
2017-06-24 12:13:35 +02:00
if not self.streams:
2018-07-17 13:28:56 +02:00
self.append_write_buf(protocol.assembleErrorMessage(
errorText="We don't have shared stream interests."
" Closing connection.", fatal=2))
logger.debug(
'Closed connection to %s because there is no overlapping interest'
' in streams.', self.destination)
2017-04-04 10:46:01 +02:00
return False
2018-07-17 13:28:56 +02:00
if self.destination in connectionpool.BMConnectionPool().inboundConnections:
try:
if not protocol.checkSocksIP(self.destination.host):
2018-07-17 13:28:56 +02:00
self.append_write_buf(protocol.assembleErrorMessage(
errorText="Too many connections from your IP."
2019-12-31 13:52:56 +01:00
" Closing connection.", fatal=2))
2018-07-17 13:28:56 +02:00
logger.debug(
2019-11-14 16:09:26 +01:00
'Closed connection to {} because we are already connected'
' to that IP.'.format(self.destination))
return False
except:
pass
if not self.isOutbound:
2018-07-17 13:28:56 +02:00
# incoming from a peer we're connected to as outbound,
# or server full report the same error to counter deanonymisation
if (
Peer(self.destination.host, self.peerNode.port)
in connectionpool.BMConnectionPool().inboundConnections
or len(connectionpool.BMConnectionPool().inboundConnections)
+ len(connectionpool.BMConnectionPool().outboundConnections)
> BMConfigParser().safeGetInt(
'bitmessagesettings', 'maxtotalconnections')
+ BMConfigParser().safeGetInt(
'bitmessagesettings', 'maxbootstrapconnections')
2018-07-17 13:28:56 +02:00
):
self.append_write_buf(protocol.assembleErrorMessage(
errorText="Server full, please try again later.", fatal=2))
logger.debug(
'Closed connection to %s due to server full'
' or duplicate inbound/outbound.', self.destination)
return False
2018-07-17 13:28:56 +02:00
if connectionpool.BMConnectionPool().isAlreadyConnected(
self.nonce):
self.append_write_buf(protocol.assembleErrorMessage(
errorText="I'm connected to myself. Closing connection.",
fatal=2))
logger.debug(
"Closed connection to %s because I'm connected to myself.",
self.destination)
return False
return True
@staticmethod
def stopDownloadingObject(hashId, forwardAnyway=False):
"""Stop downloading an object"""
for connection in connectionpool.BMConnectionPool().connections():
try:
del connection.objectsNewToMe[hashId]
except KeyError:
pass
if not forwardAnyway:
try:
with connection.objectsNewToThemLock:
del connection.objectsNewToThem[hashId]
except KeyError:
pass
try:
del missingObjects[hashId]
except KeyError:
pass
def handle_close(self):
"""Handle close"""
self.set_state("close")
if not (self.accepting or self.connecting or self.connected):
# already disconnected
return
try:
2018-07-17 13:28:56 +02:00
logger.debug(
'%s:%i: closing, %s', self.destination.host,
self.destination.port, self.close_reason)
except AttributeError:
try:
2018-07-17 13:28:56 +02:00
logger.debug(
'%(host)s:%(port)i: closing', self.destination._asdict())
except AttributeError:
2018-07-17 13:28:56 +02:00
logger.debug('Disconnected socket closing')
AdvancedDispatcher.handle_close(self)
class BMStringParser(BMProto):
"""
A special case of BMProto used by objectProcessor to send ACK
"""
def __init__(self):
super(BMStringParser, self).__init__()
self.destination = Peer('127.0.0.1', 8444)
self.payload = None
ObjectTracker.__init__(self)
def send_data(self, data):
"""Send object given by the data string"""
# This class is introduced specially for ACK sending, please
# change log strings if you are going to use it for something else
self.bm_proto_reset()
self.payload = data
try:
self.bm_command_object()
except BMObjectAlreadyHaveError:
pass # maybe the same msg received on different nodes
except BMObjectExpiredError:
logger.debug(
'Sending ACK failure (expired): %s', hexlify(data))
except Exception as e:
logger.debug(
'Exception of type %s while sending ACK',
type(e), exc_info=True)