This repository has been archived on 2025-01-16. You can view files and clone it, but cannot push or open issues or pull requests.
PyBitmessage-2025-01-16/src/network/connectionpool.py

451 lines
18 KiB
Python
Raw Normal View History

2019-08-31 11:06:53 +02:00
"""
`BMConnectionPool` class definition
2019-08-31 11:06:53 +02:00
"""
import errno
import logging
2018-07-17 13:28:56 +02:00
import re
import socket
import sys
import time
import network.asyncore_pollchoose as asyncore
2018-07-17 13:28:56 +02:00
import helper_random
import knownnodes
import protocol
import state
2018-07-17 13:28:56 +02:00
from bmconfigparser import BMConfigParser
from network.connectionchooser import chooseConnection
from network.proxy import Proxy
2019-12-23 12:18:37 +01:00
from network.tcp import (
2019-12-31 13:52:56 +01:00
TCPServer, Socks5BMConnection, Socks4aBMConnection, TCPConnection, bootstrap)
from network.udp import UDPSocket
2019-12-31 13:52:56 +01:00
from singleton import Singleton
from .node import Peer
logger = logging.getLogger('default')
@Singleton
class BMConnectionPool(object):
2019-07-08 16:20:29 +02:00
"""Pool of all existing connections"""
# pylint: disable=too-many-instance-attributes
trustedPeer = None
"""
If the trustedpeer option is specified in keys.dat then this will
contain a Peer which will be connected to instead of using the
addresses advertised by other peers.
The expected use case is where the user has a trusted server where
they run a Bitmessage daemon permanently. If they then run a second
instance of the client on a local machine periodically when they want
to check for messages it will sync with the network a lot faster
without compromising security.
"""
def __init__(self):
asyncore.set_rates(
2018-07-17 13:28:56 +02:00
BMConfigParser().safeGetInt(
"bitmessagesettings", "maxdownloadrate"),
BMConfigParser().safeGetInt(
"bitmessagesettings", "maxuploadrate")
)
self.outboundConnections = {}
self.inboundConnections = {}
self.listeningSockets = {}
self.udpSockets = {}
self.streams = []
self._lastSpawned = 0
self._spawnWait = 2
self._bootstrapped = False
trustedPeer = BMConfigParser().safeGet(
'bitmessagesettings', 'trustedpeer')
try:
if trustedPeer:
host, port = trustedPeer.split(':')
self.trustedPeer = Peer(host, int(port))
except ValueError:
sys.exit(
'Bad trustedpeer config setting! It should be set as'
' trustedpeer=<hostname>:<portnumber>'
)
def connections(self):
"""
Shortcut for combined list of connections from
`inboundConnections` and `outboundConnections` dicts
"""
2019-12-23 12:18:37 +01:00
inboundConnections = [inboundConnections for inboundConnections in self.inboundConnections.values()]
2019-12-31 13:52:56 +01:00
outboundConnections = [outboundConnections for outboundConnections in self.outboundConnections.values()]
return [connections for connections in inboundConnections + outboundConnections]
def establishedConnections(self):
"""Shortcut for list of connections having fullyEstablished == True"""
return [
x for x in self.connections() if x.fullyEstablished]
def connectToStream(self, streamNumber):
2019-07-08 16:20:29 +02:00
"""Connect to a bitmessage stream"""
self.streams.append(streamNumber)
def getConnectionByAddr(self, addr):
2019-07-08 16:20:29 +02:00
"""
Return an (existing) connection object based on a `Peer` object
(IP and port)
"""
2018-07-17 13:28:56 +02:00
try:
return self.inboundConnections[addr]
2018-07-17 13:28:56 +02:00
except KeyError:
pass
try:
2018-07-17 13:28:56 +02:00
return self.inboundConnections[addr.host]
except (KeyError, AttributeError):
pass
2018-07-17 13:28:56 +02:00
try:
return self.outboundConnections[addr]
2018-07-17 13:28:56 +02:00
except KeyError:
pass
try:
2018-07-17 13:28:56 +02:00
return self.udpSockets[addr.host]
except (KeyError, AttributeError):
pass
raise KeyError
def isAlreadyConnected(self, nodeid):
2019-07-08 16:20:29 +02:00
"""Check if we're already connected to this peer"""
2019-12-23 12:18:37 +01:00
2019-11-14 16:09:26 +01:00
# for i in (
# self.inboundConnections.values() +
# self.outboundConnections.values()
# ):
2019-12-23 12:18:37 +01:00
# for i in (
# [inboundConnections for inboundConnections in self.inboundConnections.values()] +
# [outboundConnections for outboundConnections in self.outboundConnections.values()]
# ):
for i in self.connections():
try:
if nodeid == i.nodeid:
return True
except AttributeError:
pass
return False
def addConnection(self, connection):
2019-07-08 16:20:29 +02:00
"""Add a connection object to our internal dict"""
if isinstance(connection, UDPSocket):
return
if connection.isOutbound:
self.outboundConnections[connection.destination] = connection
else:
if connection.destination.host in self.inboundConnections:
self.inboundConnections[connection.destination] = connection
else:
2018-07-17 13:28:56 +02:00
self.inboundConnections[connection.destination.host] = \
connection
def removeConnection(self, connection):
2019-07-08 16:20:29 +02:00
"""Remove a connection from our internal dict"""
if isinstance(connection, UDPSocket):
del self.udpSockets[connection.listening.host]
elif isinstance(connection, TCPServer):
del self.listeningSockets[Peer(
2018-07-17 13:28:56 +02:00
connection.destination.host, connection.destination.port)]
elif connection.isOutbound:
try:
del self.outboundConnections[connection.destination]
except KeyError:
pass
else:
try:
del self.inboundConnections[connection.destination]
except KeyError:
try:
del self.inboundConnections[connection.destination.host]
except KeyError:
pass
connection.handle_close()
2019-08-31 11:06:53 +02:00
@staticmethod
def getListeningIP():
2019-07-08 16:20:29 +02:00
"""What IP are we supposed to be listening on?"""
2018-07-17 13:28:56 +02:00
if BMConfigParser().safeGet(
"bitmessagesettings", "onionhostname").endswith(".onion"):
host = BMConfigParser().safeGet(
"bitmessagesettings", "onionbindip")
else:
host = '127.0.0.1'
if (
BMConfigParser().safeGetBoolean("bitmessagesettings", "sockslisten")
or BMConfigParser().safeGet("bitmessagesettings", "socksproxytype")
== "none"
):
# python doesn't like bind + INADDR_ANY?
2018-07-17 13:28:56 +02:00
# host = socket.INADDR_ANY
host = BMConfigParser().get("network", "bind")
return host
2017-08-09 23:30:22 +02:00
def startListening(self, bind=None):
2019-07-08 16:20:29 +02:00
"""Open a listening socket and start accepting connections on it"""
2017-08-09 23:30:22 +02:00
if bind is None:
2019-11-14 16:09:26 +01:00
"this return blank host"
2017-08-09 23:30:22 +02:00
bind = self.getListeningIP()
2019-11-14 16:09:26 +01:00
port = int(BMConfigParser().safeGet("bitmessagesettings", "port"))
# correct port even if it changed
ls = TCPServer(host=bind, port=port)
print('inside the startListening method')
self.listeningSockets[ls.destination] = ls
def startUDPSocket(self, bind=None):
2019-07-08 16:20:29 +02:00
"""
Open an UDP socket. Depending on settings, it can either only
accept incoming UDP packets, or also be able to send them.
"""
if bind is None:
host = self.getListeningIP()
udpSocket = UDPSocket(host=host, announcing=True)
else:
if bind is False:
udpSocket = UDPSocket(announcing=False)
else:
udpSocket = UDPSocket(host=bind, announcing=True)
self.udpSockets[udpSocket.listening.host] = udpSocket
def startBootstrappers(self):
"""Run the process of resolving bootstrap hostnames"""
proxy_type = BMConfigParser().safeGet(
'bitmessagesettings', 'socksproxytype')
# A plugins may be added here
hostname = None
if not proxy_type or proxy_type == 'none':
connection_base = TCPConnection
elif proxy_type == 'SOCKS5':
connection_base = Socks5BMConnection
hostname = helper_random.randomchoice([
'quzwelsuziwqgpt2.onion', None
])
elif proxy_type == 'SOCKS4a':
connection_base = Socks4aBMConnection # FIXME: I cannot test
else:
# This should never happen because socksproxytype setting
# is handled in bitmessagemain before starting the connectionpool
return
bootstrapper = bootstrap(connection_base)
if not hostname:
port = helper_random.randomchoice([8080, 8444])
2019-11-14 16:09:26 +01:00
hostname = ('bootstrap{}.bitmessage.org'.format(port))
else:
port = 8444
self.addConnection(bootstrapper(hostname, port))
def loop(self): # pylint: disable=too-many-branches,too-many-statements
2019-07-08 16:20:29 +02:00
"""Main Connectionpool's loop"""
# pylint: disable=too-many-locals
# defaults to empty loop if outbound connections are maxed
spawnConnections = False
acceptConnections = True
2018-07-17 13:28:56 +02:00
if BMConfigParser().safeGetBoolean(
'bitmessagesettings', 'dontconnect'):
acceptConnections = False
2019-11-14 16:09:26 +01:00
elif bool(BMConfigParser().safeGet(
'bitmessagesettings', 'sendoutgoingconnections')):
spawnConnections = True
socksproxytype = BMConfigParser().safeGet(
'bitmessagesettings', 'socksproxytype', '')
onionsocksproxytype = BMConfigParser().safeGet(
'bitmessagesettings', 'onionsocksproxytype', '')
if (
socksproxytype[:5] == 'SOCKS'
and not BMConfigParser().safeGetBoolean(
'bitmessagesettings', 'sockslisten')
and '.onion' not in BMConfigParser().safeGet(
'bitmessagesettings', 'onionhostname', '')
):
acceptConnections = False
2019-08-31 11:06:53 +02:00
# pylint: disable=too-many-nested-blocks
if spawnConnections:
if not knownnodes.knownNodesActual:
self.startBootstrappers()
knownnodes.knownNodesActual = True
if not self._bootstrapped:
self._bootstrapped = True
2018-07-17 13:28:56 +02:00
Proxy.proxy = (
BMConfigParser().safeGet(
'bitmessagesettings', 'sockshostname'),
2019-11-14 16:09:26 +01:00
int(BMConfigParser().safeGet(
'bitmessagesettings', 'socksport'))
2018-07-17 13:28:56 +02:00
)
# TODO AUTH
# TODO reset based on GUI settings changes
try:
if not onionsocksproxytype.startswith("SOCKS"):
raise ValueError
Proxy.onion_proxy = (
BMConfigParser().safeGet(
'network', 'onionsockshostname', None),
BMConfigParser().safeGet(
'network', 'onionsocksport', None)
2018-07-17 13:28:56 +02:00
)
except ValueError:
Proxy.onion_proxy = None
2018-07-17 13:28:56 +02:00
established = sum(
2019-12-31 13:52:56 +01:00
1 for c in [outboundConnections for outboundConnections in self.outboundConnections.values()]
2018-07-17 13:28:56 +02:00
if (c.connected and c.fullyEstablished))
pending = len(self.outboundConnections) - established
2019-11-14 16:09:26 +01:00
if established < int(BMConfigParser().safeGet(
'bitmessagesettings', 'maxoutboundconnections')):
2018-07-17 13:28:56 +02:00
for i in range(
state.maximumNumberOfHalfOpenConnections - pending):
try:
chosen = self.trustedPeer or chooseConnection(
2018-07-17 13:28:56 +02:00
helper_random.randomchoice(self.streams))
except ValueError:
continue
if chosen in self.outboundConnections:
continue
if chosen.host in self.inboundConnections:
continue
# don't connect to self
if chosen in state.ownAddresses:
continue
# don't connect to the hosts from the same
# network group, defense against sibyl attacks
host_network_group = protocol.network_group(
chosen.host)
same_group = False
for j in self.outboundConnections.values():
if host_network_group == j.network_group:
same_group = True
if chosen.host == j.destination.host:
knownnodes.decreaseRating(chosen)
break
if same_group:
continue
2018-07-17 13:28:56 +02:00
try:
if chosen.host.endswith(".onion") and Proxy.onion_proxy:
if onionsocksproxytype == "SOCKS5":
self.addConnection(Socks5BMConnection(chosen))
elif onionsocksproxytype == "SOCKS4a":
self.addConnection(Socks4aBMConnection(chosen))
elif socksproxytype == "SOCKS5":
self.addConnection(Socks5BMConnection(chosen))
elif socksproxytype == "SOCKS4a":
self.addConnection(Socks4aBMConnection(chosen))
else:
self.addConnection(TCPConnection(chosen))
except socket.error as e:
if e.errno == errno.ENETUNREACH:
continue
2019-12-23 12:18:37 +01:00
# # print('++++++++++++++++++++++++++++++++++++++++++')
# # print('self.inboundConnections.values()-{}'.format(self.inboundConnections.values()))
# # print('self.outboundConnections.values() -{}'.format(self.outboundConnections.values()))
# # print('+++++++++++++++++++++++++++++++++++++++++++')
# else:
2019-12-23 12:18:37 +01:00
# # for i in (
# # list(self.inboundConnections.values()) +
# # list(self.outboundConnections.values())
# # ):
# for i in (
# [inboundConnections for inboundConnections in self.inboundConnections.values()] +
# [inboundConnections for inboundConnections in self.outboundConnections.values()]
# ):
self._lastSpawned = time.time()
else:
for i in self.connections():
# FIXME: rating will be increased after next connection
i.handle_close()
if acceptConnections:
if not self.listeningSockets:
if BMConfigParser().safeGet('network', 'bind') == '':
2017-08-09 23:30:22 +02:00
self.startListening()
else:
2018-07-17 13:28:56 +02:00
for bind in re.sub(
r'[^\w.]+', ' ',
BMConfigParser().safeGet('network', 'bind')
2018-07-17 13:28:56 +02:00
).split():
2017-08-09 23:30:22 +02:00
self.startListening(bind)
logger.info('Listening for incoming connections.')
if False:
2019-11-14 16:09:26 +01:00
# self.udpSockets :- {'0.0.0.0': <network.udp.UDPSocket connected at 0x7f95cce7d7b8>}
if BMConfigParser().safeGet('network', 'bind') == '':
self.startUDPSocket()
else:
2018-07-17 13:28:56 +02:00
for bind in re.sub(
r'[^\w.]+', ' ',
BMConfigParser().safeGet('network', 'bind')
2018-07-17 13:28:56 +02:00
).split():
self.startUDPSocket(bind)
self.startUDPSocket(False)
logger.info('Starting UDP socket(s).')
else:
if self.listeningSockets:
for i in self.listeningSockets.values():
i.close_reason = "Stopping listening"
i.accepting = i.connecting = i.connected = False
logger.info('Stopped listening for incoming connections.')
if self.udpSockets:
for i in self.udpSockets.values():
i.close_reason = "Stopping UDP socket"
i.accepting = i.connecting = i.connected = False
logger.info('Stopped udp sockets.')
loopTime = float(self._spawnWait)
if self._lastSpawned < time.time() - self._spawnWait:
loopTime = 2.0
asyncore.loop(timeout=loopTime, count=1000)
reaper = []
2019-12-23 12:18:37 +01:00
# # for i in (
# # list(self.inboundConnections.values()) +
# # list(self.outboundConnections.values())
# # ):
2019-11-14 16:09:26 +01:00
# for i in (
2019-12-23 12:18:37 +01:00
# [inboundConnections for inboundConnections in self.inboundConnections.values()] +
# [outboundConnections for outboundConnections in self.outboundConnections.values()]
2019-11-14 16:09:26 +01:00
# ):
2019-12-23 12:18:37 +01:00
for i in self.connections():
minTx = time.time() - 20
if i.fullyEstablished:
minTx -= 300 - 20
if i.lastTx < minTx:
if i.fullyEstablished:
i.append_write_buf(protocol.CreatePacket('ping'))
else:
2018-07-17 13:28:56 +02:00
i.close_reason = "Timeout (%is)" % (
time.time() - i.lastTx)
i.set_state("close")
2019-11-14 16:09:26 +01:00
# for i in (
# list(self.inboundConnections.values()) +
# list(self.outboundConnections.values()) +
# list(self.listeningSockets.values()) +
# list(self.udpSockets.values())
# ):
2018-07-17 13:28:56 +02:00
for i in (
2019-12-31 13:52:56 +01:00
# [inboundConnections for inboundConnections in self.inboundConnections.values()] +
# [outboundConnections for outboundConnections in self.outboundConnections.values()] +
# [listeningSockets for listeningSockets in self.listeningSockets.values()] +
# [udpSockets for udpSockets in self.udpSockets.values()]
self.connections() +
[listeningSockets for listeningSockets in self.listeningSockets.values()] +
[udpSockets for udpSockets in self.udpSockets.values()]
2018-07-17 13:28:56 +02:00
):
if not (i.accepting or i.connecting or i.connected):
reaper.append(i)
2017-11-17 19:50:39 +01:00
else:
try:
if i.state == "close":
reaper.append(i)
except AttributeError:
pass
for i in reaper:
self.removeConnection(i)