361 lines
15 KiB
Python
361 lines
15 KiB
Python
# pylint: disable=too-many-ancestors
|
|
"""
|
|
src/network/tcp.py
|
|
==================
|
|
"""
|
|
|
|
import math
|
|
import random
|
|
import socket
|
|
import time
|
|
|
|
import addresses
|
|
import helper_random
|
|
import knownnodes
|
|
import network.asyncore_pollchoose as asyncore
|
|
import network.connectionpool
|
|
import protocol
|
|
import shared
|
|
import state
|
|
from bmconfigparser import BMConfigParser
|
|
from debug import logger
|
|
from helper_random import randomBytes
|
|
from inventory import Inventory
|
|
from network.advanceddispatcher import AdvancedDispatcher
|
|
from network.bmproto import BMProto
|
|
from network.dandelion import Dandelion
|
|
from network.objectracker import ObjectTracker
|
|
from network.socks4a import Socks4aConnection
|
|
from network.socks5 import Socks5Connection
|
|
from network.tls import TLSDispatcher
|
|
from queues import UISignalQueue, invQueue, receiveDataQueue
|
|
|
|
|
|
class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instance-attributes
|
|
"""
|
|
|
|
.. todo:: Look to understand and/or fix the non-parent-init-called
|
|
"""
|
|
|
|
def __init__(self, address=None, sock=None):
|
|
BMProto.__init__(self, address=address, sock=sock)
|
|
self.verackReceived = False
|
|
self.verackSent = False
|
|
self.streams = [0]
|
|
self.fullyEstablished = False
|
|
self.connectedAt = 0
|
|
self.skipUntil = 0
|
|
if address is None and sock is not None:
|
|
self.destination = state.Peer(sock.getpeername()[0], sock.getpeername()[1])
|
|
self.isOutbound = False
|
|
TLSDispatcher.__init__(self, sock, server_side=True)
|
|
self.connectedAt = time.time()
|
|
logger.debug("Received connection from %s:%i", self.destination.host, self.destination.port)
|
|
self.nodeid = randomBytes(8)
|
|
elif address is not None and sock is not None:
|
|
TLSDispatcher.__init__(self, sock, server_side=False)
|
|
self.isOutbound = True
|
|
logger.debug("Outbound proxy connection to %s:%i", self.destination.host, self.destination.port)
|
|
else:
|
|
self.destination = address
|
|
self.isOutbound = True
|
|
if ":" in address.host:
|
|
self.create_socket(socket.AF_INET6, socket.SOCK_STREAM)
|
|
else:
|
|
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
TLSDispatcher.__init__(self, sock, server_side=False)
|
|
self.connect(self.destination)
|
|
logger.debug("Connecting to %s:%i", self.destination.host, self.destination.port)
|
|
encodedAddr = protocol.encodeHost(self.destination.host)
|
|
self.local = all([
|
|
protocol.checkIPAddress(encodedAddr, True),
|
|
not protocol.checkSocksIP(self.destination.host)
|
|
])
|
|
ObjectTracker.__init__(self) # pylint: disable=non-parent-init-called
|
|
self.bm_proto_reset()
|
|
self.set_state("bm_header", expectBytes=protocol.Header.size)
|
|
|
|
def antiIntersectionDelay(self, initial=False):
|
|
"""
|
|
This is a defense against the so called intersection attacks.
|
|
|
|
It is called when you notice peer is requesting non-existing objects, or right after the connection is
|
|
established. It will estimate how long an object will take to propagate across the network, and skip processing
|
|
"getdata" requests until then. This means an attacker only has one shot per IP to perform the attack.
|
|
"""
|
|
# estimated time for a small object to propagate across the whole network
|
|
max_known_nodes = max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes)
|
|
delay = math.ceil(math.log(max_known_nodes + 2, 20)) * (0.2 + invQueue.queueCount / 2.0)
|
|
# take the stream with maximum amount of nodes
|
|
# +2 is to avoid problems with log(0) and log(1)
|
|
# 20 is avg connected nodes count
|
|
# 0.2 is avg message transmission time
|
|
if delay > 0:
|
|
if initial:
|
|
self.skipUntil = self.connectedAt + delay
|
|
if self.skipUntil > time.time():
|
|
logger.debug("Initial skipping processing getdata for %.2fs", self.skipUntil - time.time())
|
|
else:
|
|
logger.debug("Skipping processing getdata due to missing object for %.2fs", delay)
|
|
self.skipUntil = time.time() + delay
|
|
|
|
def state_connection_fully_established(self):
|
|
"""
|
|
State after the bitmessage protocol handshake is completed (version/verack exchange, and if both side support
|
|
TLS, the TLS handshake as well).
|
|
"""
|
|
self.set_connection_fully_established()
|
|
self.set_state("bm_header")
|
|
self.bm_proto_reset()
|
|
return True
|
|
|
|
def set_connection_fully_established(self):
|
|
"""Initiate inventory synchronisation."""
|
|
if not self.isOutbound and not self.local:
|
|
shared.clientHasReceivedIncomingConnections = True
|
|
UISignalQueue.put(('setStatusIcon', 'green'))
|
|
UISignalQueue.put(('updateNetworkStatusTab', (self.isOutbound, True, self.destination)))
|
|
self.antiIntersectionDelay(True)
|
|
self.fullyEstablished = True
|
|
if self.isOutbound:
|
|
knownnodes.increaseRating(self.destination)
|
|
if self.isOutbound:
|
|
Dandelion().maybeAddStem(self)
|
|
self.sendAddr()
|
|
self.sendBigInv()
|
|
|
|
def sendAddr(self):
|
|
"""Send a partial list of known addresses to peer."""
|
|
# We are going to share a maximum number of 1000 addrs (per overlapping
|
|
# stream) with our peer. 500 from overlapping streams, 250 from the
|
|
# left child stream, and 250 from the right child stream.
|
|
maxAddrCount = BMConfigParser().safeGetInt("bitmessagesettings", "maxaddrperstreamsend", 500)
|
|
|
|
# init
|
|
templist = []
|
|
addrs = {}
|
|
for stream in self.streams:
|
|
with knownnodes.knownNodesLock:
|
|
if knownnodes.knownNodes[stream]:
|
|
filtered = {k: v for k, v in knownnodes.knownNodes[stream].items()
|
|
if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
|
|
elemCount = len(filtered)
|
|
if elemCount > maxAddrCount:
|
|
elemCount = maxAddrCount
|
|
# only if more recent than 3 hours
|
|
addrs[stream] = helper_random.randomsample(filtered.items(), elemCount)
|
|
# sent 250 only if the remote isn't interested in it
|
|
if knownnodes.knownNodes[stream * 2] and stream not in self.streams:
|
|
filtered = {k: v for k, v in knownnodes.knownNodes[stream * 2].items()
|
|
if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
|
|
elemCount = len(filtered)
|
|
if elemCount > maxAddrCount / 2:
|
|
elemCount = int(maxAddrCount / 2)
|
|
addrs[stream * 2] = helper_random.randomsample(filtered.items(), elemCount)
|
|
if knownnodes.knownNodes[(stream * 2) + 1] and stream not in self.streams:
|
|
filtered = {k: v for k, v in knownnodes.knownNodes[stream * 2 + 1].items()
|
|
if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
|
|
elemCount = len(filtered)
|
|
if elemCount > maxAddrCount / 2:
|
|
elemCount = int(maxAddrCount / 2)
|
|
addrs[stream * 2 + 1] = helper_random.randomsample(filtered.items(), elemCount)
|
|
for substream in addrs:
|
|
for peer, params in addrs[substream]:
|
|
templist.append((substream, peer, params["lastseen"]))
|
|
if templist:
|
|
self.append_write_buf(BMProto.assembleAddr(templist))
|
|
|
|
def sendBigInv(self):
|
|
"""Send hashes of all inventory objects, chunked as the protocol has a per-command limit."""
|
|
def sendChunk():
|
|
"""Send one chunk of inv entries in one command"""
|
|
if objectCount == 0:
|
|
return
|
|
logger.debug('Sending huge inv message with %i objects to just this one peer', objectCount)
|
|
self.append_write_buf(protocol.CreatePacket('inv', addresses.encodeVarint(objectCount) + payload))
|
|
|
|
# Select all hashes for objects in this stream.
|
|
bigInvList = {}
|
|
for stream in self.streams:
|
|
# may lock for a long time, but I think it's better than thousands of small locks
|
|
with self.objectsNewToThemLock:
|
|
for objHash in Inventory().unexpired_hashes_by_stream(stream):
|
|
# don't advertise stem objects on bigInv
|
|
if Dandelion().hasHash(objHash):
|
|
continue
|
|
bigInvList[objHash] = 0
|
|
objectCount = 0
|
|
payload = b''
|
|
# Now let us start appending all of these hashes together. They will be
|
|
# sent out in a big inv message to our new peer.
|
|
for obj_hash, _ in bigInvList.items():
|
|
payload += obj_hash
|
|
objectCount += 1
|
|
|
|
# Remove -1 below when sufficient time has passed for users to
|
|
# upgrade to versions of PyBitmessage that accept inv with 50,000
|
|
# items
|
|
if objectCount >= BMProto.maxObjectCount - 1:
|
|
sendChunk()
|
|
payload = b''
|
|
objectCount = 0
|
|
|
|
# flush
|
|
sendChunk()
|
|
|
|
def handle_connect(self):
|
|
"""Callback for TCP connection being established."""
|
|
try:
|
|
AdvancedDispatcher.handle_connect(self)
|
|
except socket.error as e:
|
|
if e.errno in asyncore._DISCONNECTED: # pylint: disable=protected-access
|
|
logger.debug("%s:%i: Connection failed: %s", self.destination.host, self.destination.port, str(e))
|
|
return
|
|
self.nodeid = randomBytes(8)
|
|
self.append_write_buf(
|
|
protocol.assembleVersionMessage(
|
|
self.destination.host,
|
|
self.destination.port,
|
|
network.connectionpool.BMConnectionPool().streams,
|
|
False,
|
|
nodeid=self.nodeid))
|
|
self.connectedAt = time.time()
|
|
receiveDataQueue.put(self.destination)
|
|
|
|
def handle_read(self):
|
|
"""Callback for reading from a socket"""
|
|
TLSDispatcher.handle_read(self)
|
|
if self.isOutbound and self.fullyEstablished:
|
|
for s in self.streams:
|
|
try:
|
|
with knownnodes.knownNodesLock:
|
|
knownnodes.knownNodes[s][self.destination]["lastseen"] = time.time()
|
|
except KeyError:
|
|
pass
|
|
receiveDataQueue.put(self.destination)
|
|
|
|
def handle_write(self):
|
|
"""Callback for writing to a socket"""
|
|
TLSDispatcher.handle_write(self)
|
|
|
|
def handle_close(self):
|
|
"""Callback for connection being closed."""
|
|
if self.isOutbound and not self.fullyEstablished:
|
|
knownnodes.decreaseRating(self.destination)
|
|
if self.fullyEstablished:
|
|
UISignalQueue.put(('updateNetworkStatusTab', (self.isOutbound, False, self.destination)))
|
|
if self.isOutbound:
|
|
Dandelion().maybeRemoveStem(self)
|
|
BMProto.handle_close(self)
|
|
|
|
|
|
class Socks5BMConnection(Socks5Connection, TCPConnection):
|
|
"""SOCKS5 wrapper for TCP connections"""
|
|
|
|
def __init__(self, address):
|
|
Socks5Connection.__init__(self, address=address)
|
|
TCPConnection.__init__(self, address=address, sock=self.socket)
|
|
self.set_state("init")
|
|
|
|
def state_proxy_handshake_done(self):
|
|
"""State when SOCKS5 connection succeeds, we need to send a Bitmessage handshake to peer."""
|
|
Socks5Connection.state_proxy_handshake_done(self)
|
|
self.nodeid = randomBytes(8)
|
|
self.append_write_buf(
|
|
protocol.assembleVersionMessage(
|
|
self.destination.host,
|
|
self.destination.port,
|
|
network.connectionpool.BMConnectionPool().streams,
|
|
False,
|
|
nodeid=self.nodeid))
|
|
self.set_state("bm_header", expectBytes=protocol.Header.size)
|
|
return True
|
|
|
|
|
|
class Socks4aBMConnection(Socks4aConnection, TCPConnection):
|
|
"""SOCKS4a wrapper for TCP connections"""
|
|
|
|
def __init__(self, address):
|
|
Socks4aConnection.__init__(self, address=address)
|
|
TCPConnection.__init__(self, address=address, sock=self.socket)
|
|
self.set_state("init")
|
|
|
|
def state_proxy_handshake_done(self):
|
|
"""State when SOCKS4a connection succeeds, we need to send a Bitmessage handshake to peer."""
|
|
Socks4aConnection.state_proxy_handshake_done(self)
|
|
self.nodeid = randomBytes(8)
|
|
self.append_write_buf(
|
|
protocol.assembleVersionMessage(
|
|
self.destination.host,
|
|
self.destination.port,
|
|
network.connectionpool.BMConnectionPool().streams,
|
|
False,
|
|
nodeid=self.nodeid))
|
|
self.set_state("bm_header", expectBytes=protocol.Header.size)
|
|
return True
|
|
|
|
|
|
class TCPServer(AdvancedDispatcher):
|
|
"""TCP connection server for Bitmessage protocol"""
|
|
|
|
def __init__(self, host='127.0.0.1', port=8444): # pylint: disable=redefined-outer-name
|
|
if not hasattr(self, '_map'):
|
|
AdvancedDispatcher.__init__(self)
|
|
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.set_reuse_addr()
|
|
for attempt in range(50):
|
|
try:
|
|
if attempt > 0:
|
|
port = random.randint(32767, 65535)
|
|
self.bind((host, port))
|
|
except socket.error as e:
|
|
if e.errno in (asyncore.EADDRINUSE, asyncore.WSAEADDRINUSE):
|
|
continue
|
|
else:
|
|
if attempt > 0:
|
|
BMConfigParser().set("bitmessagesettings", "port", str(port))
|
|
BMConfigParser().save()
|
|
break
|
|
self.destination = state.Peer(host, port)
|
|
self.bound = True
|
|
self.listen(5)
|
|
|
|
def is_bound(self):
|
|
"""Is the socket bound?"""
|
|
try:
|
|
return self.bound
|
|
except AttributeError:
|
|
return False
|
|
|
|
def handle_accept(self):
|
|
"""Incoming connection callback"""
|
|
pair = self.accept()
|
|
if pair is not None:
|
|
sock, _ = pair
|
|
state.ownAddresses[state.Peer(sock.getsockname()[0], sock.getsockname()[1])] = True
|
|
if len(network.connectionpool.BMConnectionPool().inboundConnections) + \
|
|
len(network.connectionpool.BMConnectionPool().outboundConnections) > \
|
|
BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections") + \
|
|
BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections") + 10:
|
|
# 10 is a sort of buffer, in between it will go through the version handshake
|
|
# and return an error to the peer
|
|
logger.warning("Server full, dropping connection")
|
|
sock.close()
|
|
return
|
|
try:
|
|
network.connectionpool.BMConnectionPool().addConnection(TCPConnection(sock=sock))
|
|
except socket.error:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# initial fill
|
|
|
|
for host in (("127.0.0.1", 8448),):
|
|
direct = TCPConnection(host)
|
|
while asyncore.socket_map:
|
|
print "loop, state = %s" % (direct.state)
|
|
asyncore.loop(timeout=10, count=1)
|
|
continue
|