#2 from Bitmessage/v0.6

m
This commit is contained in:
bug Lady 2019-11-14 02:07:50 +01:00 committed by GitHub
commit 03625c1237
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 377 additions and 347 deletions

View File

@ -49,6 +49,8 @@ extensions = [
'm2r', 'm2r',
] ]
default_role = 'obj'
# Add any paths that contain templates here, relative to this directory. # Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates'] templates_path = ['_templates']
@ -199,7 +201,6 @@ epub_exclude_files = ['search.html']
autodoc_mock_imports = [ autodoc_mock_imports = [
'debug', 'debug',
'pybitmessage.bitmessagekivy', 'pybitmessage.bitmessagekivy',
'pybitmessage.bitmessagemain',
'pybitmessage.bitmessageqt.addressvalidator', 'pybitmessage.bitmessageqt.addressvalidator',
'pybitmessage.helper_startup', 'pybitmessage.helper_startup',
'pybitmessage.network.httpd', 'pybitmessage.network.httpd',
@ -219,17 +220,17 @@ autodoc_mock_imports = [
'qrcode', 'qrcode',
'stem', 'stem',
] ]
autodoc_member_order = 'bysource'
# Apidoc settings # Apidoc settings
apidoc_module_dir = '../pybitmessage' apidoc_module_dir = '../pybitmessage'
apidoc_output_dir = 'autodoc' apidoc_output_dir = 'autodoc'
apidoc_excluded_paths = [ apidoc_excluded_paths = [
'bitmessagekivy', 'bitmessagemain.py', 'build_osx.py', 'bitmessagekivy', 'build_osx.py',
'bitmessageqt/addressvalidator.py', 'bitmessageqt/migrationwizard.py', 'bitmessageqt/addressvalidator.py', 'bitmessageqt/migrationwizard.py',
'bitmessageqt/newaddresswizard.py', 'bitmessageqt/newaddresswizard.py', 'helper_startup.py',
'class_objectProcessor.py', 'defaults.py', 'helper_startup.py',
'kivymd', 'main.py', 'navigationdrawer', 'network/http*', 'kivymd', 'main.py', 'navigationdrawer', 'network/http*',
'pybitmessage', 'queues.py', 'tests', 'version.py' 'pybitmessage', 'tests', 'version.py'
] ]
apidoc_module_first = True apidoc_module_first = True
apidoc_separate_modules = True apidoc_separate_modules = True

View File

@ -54,11 +54,20 @@ def decodeBase58(string, alphabet=ALPHABET):
return num return num
class varintEncodeError(Exception):
"""Exception class for encoding varint"""
pass
class varintDecodeError(Exception):
"""Exception class for decoding varint data"""
pass
def encodeVarint(integer): def encodeVarint(integer):
"""Convert integer into varint bytes""" """Convert integer into varint bytes"""
if integer < 0: if integer < 0:
logger.error('varint cannot be < 0') raise varintEncodeError('varint cannot be < 0')
raise SystemExit
if integer < 253: if integer < 253:
return pack('>B', integer) return pack('>B', integer)
if integer >= 253 and integer < 65536: if integer >= 253 and integer < 65536:
@ -68,13 +77,7 @@ def encodeVarint(integer):
if integer >= 4294967296 and integer < 18446744073709551616: if integer >= 4294967296 and integer < 18446744073709551616:
return pack('>B', 255) + pack('>Q', integer) return pack('>B', 255) + pack('>Q', integer)
if integer >= 18446744073709551616: if integer >= 18446744073709551616:
logger.error('varint cannot be >= 18446744073709551616') raise varintEncodeError('varint cannot be >= 18446744073709551616')
raise SystemExit
class varintDecodeError(Exception):
"""Exception class for decoding varint data"""
pass
def decodeVarint(data): def decodeVarint(data):

View File

@ -7,8 +7,6 @@
# Right now, PyBitmessage only support connecting to stream 1. It doesn't # Right now, PyBitmessage only support connecting to stream 1. It doesn't
# yet contain logic to expand into further streams. # yet contain logic to expand into further streams.
# The software version variable is now held in shared.py
import os import os
import sys import sys
@ -31,40 +29,27 @@ import time
import traceback import traceback
from struct import pack from struct import pack
import defaults
import shared
import state
import shutdown
from bmconfigparser import BMConfigParser
from debug import logger # this should go before any threads
from helper_startup import ( from helper_startup import (
isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections
) )
from singleinstance import singleinstance
import defaults
import shared
import knownnodes
import state
import shutdown
from debug import logger # this should go before any threads
# Classes
from class_sqlThread import sqlThread
from class_singleCleaner import singleCleaner
from class_objectProcessor import objectProcessor
from class_singleWorker import singleWorker
from class_addressGenerator import addressGenerator
from bmconfigparser import BMConfigParser
from inventory import Inventory from inventory import Inventory
from knownnodes import readKnownNodes
from network.connectionpool import BMConnectionPool # Network objects and threads
from network.dandelion import Dandelion from network import (
from network.networkthread import BMNetworkThread BMConnectionPool, Dandelion,
from network.receivequeuethread import ReceiveQueueThread AddrThread, AnnounceThread, BMNetworkThread, InvThread, ReceiveQueueThread,
from network.announcethread import AnnounceThread DownloadThread, UploadThread)
from network.invthread import InvThread from singleinstance import singleinstance
from network.addrthread import AddrThread # Synchronous threads
from network.downloadthread import DownloadThread from threads import (
from network.uploadthread import UploadThread set_thread_name,
addressGenerator, objectProcessor, singleCleaner, singleWorker, sqlThread)
# Helper Functions
import helper_threading
def connectToStream(streamNumber): def connectToStream(streamNumber):
@ -84,14 +69,6 @@ def connectToStream(streamNumber):
except: except:
pass pass
with knownnodes.knownNodesLock:
if streamNumber not in knownnodes.knownNodes:
knownnodes.knownNodes[streamNumber] = {}
if streamNumber * 2 not in knownnodes.knownNodes:
knownnodes.knownNodes[streamNumber * 2] = {}
if streamNumber * 2 + 1 not in knownnodes.knownNodes:
knownnodes.knownNodes[streamNumber * 2 + 1] = {}
BMConnectionPool().connectToStream(streamNumber) BMConnectionPool().connectToStream(streamNumber)
@ -275,7 +252,7 @@ class Main:
self.setSignalHandler() self.setSignalHandler()
helper_threading.set_thread_name("PyBitmessage") set_thread_name("PyBitmessage")
state.dandelion = config.safeGetInt('network', 'dandelion') state.dandelion = config.safeGetInt('network', 'dandelion')
# dandelion requires outbound connections, without them, # dandelion requires outbound connections, without them,
@ -291,7 +268,7 @@ class Main:
defaults.networkDefaultPayloadLengthExtraBytes = int( defaults.networkDefaultPayloadLengthExtraBytes = int(
defaults.networkDefaultPayloadLengthExtraBytes / 100) defaults.networkDefaultPayloadLengthExtraBytes / 100)
knownnodes.readKnownNodes() readKnownNodes()
# Not needed if objproc is disabled # Not needed if objproc is disabled
if state.enableObjProc: if state.enableObjProc:

View File

@ -14,7 +14,7 @@ import network.stats
import shared import shared
import widgets import widgets
from inventory import Inventory from inventory import Inventory
from network.connectionpool import BMConnectionPool from network import BMConnectionPool
from retranslateui import RetranslateMixin from retranslateui import RetranslateMixin
from tr import _translate from tr import _translate
from uisignaler import UISignaler from uisignaler import UISignaler

View File

@ -14,7 +14,7 @@ import highlevelcrypto
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from addresses import decodeAddress, encodeAddress, encodeVarint from addresses import decodeAddress, encodeAddress, encodeVarint
from fallback import RIPEMD160Hash from fallback import RIPEMD160Hash
from network.threads import StoppableThread from network import StoppableThread
class addressGenerator(StoppableThread): class addressGenerator(StoppableThread):
@ -29,6 +29,10 @@ class addressGenerator(StoppableThread):
super(addressGenerator, self).stopThread() super(addressGenerator, self).stopThread()
def run(self): def run(self):
"""
Process the requests for addresses generation
from `.queues.addressGeneratorQueue`
"""
while state.shutdown == 0: while state.shutdown == 0:
queueValue = queues.addressGeneratorQueue.get() queueValue = queues.addressGeneratorQueue.get()
nonceTrialsPerByte = 0 nonceTrialsPerByte = 0

View File

@ -21,6 +21,7 @@ import helper_sent
from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery
from helper_ackPayload import genAckPayload from helper_ackPayload import genAckPayload
from network import bmproto from network import bmproto
from network.node import Peer
import protocol import protocol
import queues import queues
import state import state
@ -57,6 +58,7 @@ class objectProcessor(threading.Thread):
self.successfullyDecryptMessageTimings = [] self.successfullyDecryptMessageTimings = []
def run(self): def run(self):
"""Process the objects from `.queues.objectProcessorQueue`"""
while True: while True:
objectType, data = queues.objectProcessorQueue.get() objectType, data = queues.objectProcessorQueue.get()
@ -160,7 +162,7 @@ class objectProcessor(threading.Thread):
if not host: if not host:
return return
peer = state.Peer(host, port) peer = Peer(host, port)
with knownnodes.knownNodesLock: with knownnodes.knownNodesLock:
knownnodes.addKnownNode( knownnodes.addKnownNode(
stream, peer, is_self=state.ownAddresses.get(peer)) stream, peer, is_self=state.ownAddresses.get(peer))
@ -1051,7 +1053,8 @@ class objectProcessor(threading.Thread):
# for it. # for it.
elif addressVersion >= 4: elif addressVersion >= 4:
tag = hashlib.sha512(hashlib.sha512( tag = hashlib.sha512(hashlib.sha512(
encodeVarint(addressVersion) + encodeVarint(streamNumber) + ripe encodeVarint(addressVersion) + encodeVarint(streamNumber)
+ ripe
).digest()).digest()[32:] ).digest()).digest()[32:]
if tag in state.neededPubkeys: if tag in state.neededPubkeys:
del state.neededPubkeys[tag] del state.neededPubkeys[tag]
@ -1059,9 +1062,8 @@ class objectProcessor(threading.Thread):
def sendMessages(self, address): def sendMessages(self, address):
""" """
This function is called by the possibleNewPubkey function when This method is called by the `possibleNewPubkey` when it sees
that function sees that we now have the necessary pubkey that we now have the necessary pubkey to send one or more messages.
to send one or more messages.
""" """
logger.info('We have been awaiting the arrival of this pubkey.') logger.info('We have been awaiting the arrival of this pubkey.')
sqlExecute( sqlExecute(

View File

@ -1,24 +0,0 @@
import Queue
import threading
import time
class ObjectProcessorQueue(Queue.Queue):
maxSize = 32000000
def __init__(self):
Queue.Queue.__init__(self)
self.sizeLock = threading.Lock()
self.curSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects.
def put(self, item, block = True, timeout = None):
while self.curSize >= self.maxSize:
time.sleep(1)
with self.sizeLock:
self.curSize += len(item[1])
Queue.Queue.put(self, item, block, timeout)
def get(self, block = True, timeout = None):
item = Queue.Queue.get(self, block, timeout)
with self.sizeLock:
self.curSize -= len(item[1])
return item

View File

@ -1,5 +1,5 @@
""" """
The singleCleaner class is a timer-driven thread that cleans data structures The `singleCleaner` class is a timer-driven thread that cleans data structures
to free memory, resends messages when a remote node doesn't respond, and to free memory, resends messages when a remote node doesn't respond, and
sends pong messages to keep connections alive if the network isn't busy. sends pong messages to keep connections alive if the network isn't busy.
@ -31,8 +31,7 @@ import tr
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from helper_sql import sqlQuery, sqlExecute from helper_sql import sqlQuery, sqlExecute
from inventory import Inventory from inventory import Inventory
from network.connectionpool import BMConnectionPool from network import BMConnectionPool, StoppableThread
from network.threads import StoppableThread
class singleCleaner(StoppableThread): class singleCleaner(StoppableThread):
@ -46,12 +45,12 @@ class singleCleaner(StoppableThread):
try: try:
shared.maximumLengthOfTimeToBotherResendingMessages = ( shared.maximumLengthOfTimeToBotherResendingMessages = (
float(BMConfigParser().get( float(BMConfigParser().get(
'bitmessagesettings', 'stopresendingafterxdays')) * 'bitmessagesettings', 'stopresendingafterxdays'))
24 * 60 * 60 * 24 * 60 * 60
) + ( ) + (
float(BMConfigParser().get( float(BMConfigParser().get(
'bitmessagesettings', 'stopresendingafterxmonths')) * 'bitmessagesettings', 'stopresendingafterxmonths'))
(60 * 60 * 24 * 365) / 12) * (60 * 60 * 24 * 365) / 12)
except: except:
# Either the user hasn't set stopresendingafterxdays and # Either the user hasn't set stopresendingafterxdays and
# stopresendingafterxmonths yet or the options are missing # stopresendingafterxmonths yet or the options are missing
@ -93,8 +92,8 @@ class singleCleaner(StoppableThread):
"SELECT toaddress, ackdata, status FROM sent" "SELECT toaddress, ackdata, status FROM sent"
" WHERE ((status='awaitingpubkey' OR status='msgsent')" " WHERE ((status='awaitingpubkey' OR status='msgsent')"
" AND folder='sent' AND sleeptill<? AND senttime>?)", " AND folder='sent' AND sleeptill<? AND senttime>?)",
int(time.time()), int(time.time()) - int(time.time()), int(time.time())
shared.maximumLengthOfTimeToBotherResendingMessages - shared.maximumLengthOfTimeToBotherResendingMessages
) )
for row in queryreturn: for row in queryreturn:
if len(row) < 2: if len(row) < 2:
@ -140,9 +139,7 @@ class singleCleaner(StoppableThread):
# thread.downloadQueue.clear() # thread.downloadQueue.clear()
# inv/object tracking # inv/object tracking
for connection in \ for connection in BMConnectionPool().connections():
BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
connection.clean() connection.clean()
# discovery tracking # discovery tracking

View File

@ -28,7 +28,7 @@ from addresses import calculateInventoryHash, decodeAddress, decodeVarint, encod
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from helper_sql import sqlExecute, sqlQuery from helper_sql import sqlExecute, sqlQuery
from inventory import Inventory from inventory import Inventory
from network.threads import StoppableThread from network import StoppableThread
def sizeof_fmt(num, suffix='h/s'): def sizeof_fmt(num, suffix='h/s'):

View File

@ -1,3 +1,7 @@
"""
sqlThread is defined here
"""
import threading import threading
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
import sqlite3 import sqlite3
@ -19,11 +23,13 @@ import tr
class sqlThread(threading.Thread): class sqlThread(threading.Thread):
"""A thread for all SQL operations"""
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="SQL") threading.Thread.__init__(self, name="SQL")
def run(self): def run(self):
"""Process SQL queries from `.helper_sql.sqlSubmitQueue`"""
self.conn = sqlite3.connect(state.appdata + 'messages.dat') self.conn = sqlite3.connect(state.appdata + 'messages.dat')
self.conn.text_factory = str self.conn.text_factory = str
self.cur = self.conn.cursor() self.cur = self.conn.cursor()

View File

@ -1,24 +1,24 @@
""" """
src/defaults.py Common default values
===============
""" """
# sanity check, prevent doing ridiculous PoW #: sanity check, prevent doing ridiculous PoW
# 20 million PoWs equals approximately 2 days on dev's dual R9 290 #: 20 million PoWs equals approximately 2 days on dev's dual R9 290
ridiculousDifficulty = 20000000 ridiculousDifficulty = 20000000
# Remember here the RPC port read from namecoin.conf so we can restore to #: Remember here the RPC port read from namecoin.conf so we can restore to
# it as default whenever the user changes the "method" selection for #: it as default whenever the user changes the "method" selection for
# namecoin integration to "namecoind". #: namecoin integration to "namecoind".
namecoinDefaultRpcPort = "8336" namecoinDefaultRpcPort = "8336"
# If changed, these values will cause particularly unexpected behavior: # If changed, these values will cause particularly unexpected behavior:
# You won't be able to either send or receive messages because the proof # You won't be able to either send or receive messages because the proof
# of work you do (or demand) won't match that done or demanded by others. # of work you do (or demand) won't match that done or demanded by others.
# Don't change them! # Don't change them!
# The amount of work that should be performed (and demanded) per byte of the payload. #: The amount of work that should be performed (and demanded) per byte
#: of the payload.
networkDefaultProofOfWorkNonceTrialsPerByte = 1000 networkDefaultProofOfWorkNonceTrialsPerByte = 1000
# To make sending short messages a little more difficult, this value is #: To make sending short messages a little more difficult, this value is
# added to the payload length for use in calculating the proof of work #: added to the payload length for use in calculating the proof of work
# target. #: target.
networkDefaultPayloadLengthExtraBytes = 1000 networkDefaultPayloadLengthExtraBytes = 1000

View File

@ -1,17 +1,39 @@
"""Helper Sql performs sql operations.""" """
SQL-related functions defined here are really pass the queries (or other SQL
commands) to :class:`.threads.sqlThread` through `sqlSubmitQueue` queue and check
or return the result got from `sqlReturnQueue`.
This is done that way because :mod:`sqlite3` is so thread-unsafe that they
won't even let you call it from different threads using your own locks.
SQLite objects can only be used from one thread.
.. note:: This actually only applies for certain deployments, and/or
really old version of sqlite. I haven't actually seen it anywhere.
Current versions do have support for threading and multiprocessing.
I don't see an urgent reason to refactor this, but it should be noted
in the comment that the problem is mostly not valid. Sadly, last time
I checked, there is no reliable way to check whether the library is
or isn't thread-safe.
"""
import threading import threading
import Queue import Queue
sqlSubmitQueue = Queue.Queue() sqlSubmitQueue = Queue.Queue()
# SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. """the queue for SQL"""
# SQL objects #can only be called from one thread.
sqlReturnQueue = Queue.Queue() sqlReturnQueue = Queue.Queue()
"""the queue for results"""
sqlLock = threading.Lock() sqlLock = threading.Lock()
def sqlQuery(sqlStatement, *args): def sqlQuery(sqlStatement, *args):
"""SQLLITE execute statement and return query.""" """
Query sqlite and return results
:param str sqlStatement: SQL statement string
:param list args: SQL query parameters
:rtype: list
"""
sqlLock.acquire() sqlLock.acquire()
sqlSubmitQueue.put(sqlStatement) sqlSubmitQueue.put(sqlStatement)

View File

@ -1,13 +1,9 @@
""" """
src/helper_startup.py Startup operations.
=====================
Helper Start performs all the startup operations.
""" """
# pylint: disable=too-many-branches,too-many-statements # pylint: disable=too-many-branches,too-many-statements
from __future__ import print_function from __future__ import print_function
import ConfigParser
import os import os
import platform import platform
import sys import sys
@ -19,28 +15,12 @@ import paths
import state import state
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
# The user may de-select Portable Mode in the settings if they want # The user may de-select Portable Mode in the settings if they want
# the config files to stay in the application data folder. # the config files to stay in the application data folder.
StoreConfigFilesInSameDirectoryAsProgramByDefault = False StoreConfigFilesInSameDirectoryAsProgramByDefault = False
def _loadTrustedPeer():
try:
trustedPeer = BMConfigParser().get('bitmessagesettings', 'trustedpeer')
except ConfigParser.Error:
# This probably means the trusted peer wasn't specified so we
# can just leave it as None
return
try:
host, port = trustedPeer.split(':')
except ValueError:
sys.exit(
'Bad trustedpeer config setting! It should be set as'
' trustedpeer=<hostname>:<portnumber>'
)
state.trustedPeer = state.Peer(host, int(port))
def loadConfig(): def loadConfig():
"""Load the config""" """Load the config"""
config = BMConfigParser() config = BMConfigParser()
@ -134,8 +114,6 @@ def loadConfig():
else: else:
updateConfig() updateConfig()
_loadTrustedPeer()
def updateConfig(): def updateConfig():
"""Save the config""" """Save the config"""

View File

@ -1,21 +0,0 @@
"""set_thread_name for threads that don't use StoppableThread"""
import threading
try:
import prctl
except ImportError:
def set_thread_name(name):
"""Set the thread name for external use (visible from the OS)."""
threading.current_thread().name = name
else:
def set_thread_name(name):
"""Set a name for the thread for python internal use."""
prctl.set_name(name)
def _thread_name_hack(self):
set_thread_name(self.name)
threading.Thread.__bootstrap_original__(self)
# pylint: disable=protected-access
threading.Thread.__bootstrap_original__ = threading.Thread._Thread__bootstrap
threading.Thread._Thread__bootstrap = _thread_name_hack

View File

@ -3,6 +3,7 @@ Manipulations with knownNodes dictionary.
""" """
import json import json
import logging
import os import os
import pickle import pickle
import threading import threading
@ -10,28 +11,33 @@ import time
import state import state
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from debug import logger from network.node import Peer
knownNodesLock = threading.Lock() knownNodesLock = threading.Lock()
"""Thread lock for knownnodes modification"""
knownNodes = {stream: {} for stream in range(1, 4)} knownNodes = {stream: {} for stream in range(1, 4)}
"""The dict of known nodes for each stream"""
knownNodesTrimAmount = 2000 knownNodesTrimAmount = 2000
"""trim stream knownnodes dict to this length"""
# forget a node after rating is this low
knownNodesForgetRating = -0.5 knownNodesForgetRating = -0.5
"""forget a node after rating is this low"""
knownNodesActual = False knownNodesActual = False
logger = logging.getLogger('default')
DEFAULT_NODES = ( DEFAULT_NODES = (
state.Peer('5.45.99.75', 8444), Peer('5.45.99.75', 8444),
state.Peer('75.167.159.54', 8444), Peer('75.167.159.54', 8444),
state.Peer('95.165.168.168', 8444), Peer('95.165.168.168', 8444),
state.Peer('85.180.139.241', 8444), Peer('85.180.139.241', 8444),
state.Peer('158.222.217.190', 8080), Peer('158.222.217.190', 8080),
state.Peer('178.62.12.187', 8448), Peer('178.62.12.187', 8448),
state.Peer('24.188.198.204', 8111), Peer('24.188.198.204', 8111),
state.Peer('109.147.204.113', 1195), Peer('109.147.204.113', 1195),
state.Peer('178.11.46.221', 8444) Peer('178.11.46.221', 8444)
) )
@ -57,19 +63,17 @@ def json_deserialize_knownnodes(source):
for node in json.load(source): for node in json.load(source):
peer = node['peer'] peer = node['peer']
info = node['info'] info = node['info']
peer = state.Peer(str(peer['host']), peer.get('port', 8444)) peer = Peer(str(peer['host']), peer.get('port', 8444))
knownNodes[node['stream']][peer] = info knownNodes[node['stream']][peer] = info
if ( if not (knownNodesActual
not (knownNodesActual or info.get('self')) and or info.get('self')) and peer not in DEFAULT_NODES:
peer not in DEFAULT_NODES
):
knownNodesActual = True knownNodesActual = True
def pickle_deserialize_old_knownnodes(source): def pickle_deserialize_old_knownnodes(source):
""" """
Unpickle source and reorganize knownnodes dict if it's in old format Unpickle source and reorganize knownnodes dict if it has old format
the old format was {Peer:lastseen, ...} the old format was {Peer:lastseen, ...}
the new format is {Peer:{"lastseen":i, "rating":f}} the new format is {Peer:{"lastseen":i, "rating":f}}
""" """
@ -129,7 +133,7 @@ def readKnownNodes():
if onionhostname and ".onion" in onionhostname: if onionhostname and ".onion" in onionhostname:
onionport = config.safeGetInt('bitmessagesettings', 'onionport') onionport = config.safeGetInt('bitmessagesettings', 'onionport')
if onionport: if onionport:
self_peer = state.Peer(onionhostname, onionport) self_peer = Peer(onionhostname, onionport)
addKnownNode(1, self_peer, is_self=True) addKnownNode(1, self_peer, is_self=True)
state.ownAddresses[self_peer] = True state.ownAddresses[self_peer] = True
@ -182,7 +186,7 @@ def dns():
"""Add DNS names to knownnodes""" """Add DNS names to knownnodes"""
for port in [8080, 8444]: for port in [8080, 8444]:
addKnownNode( addKnownNode(
1, state.Peer('bootstrap%s.bitmessage.org' % port, port)) 1, Peer('bootstrap%s.bitmessage.org' % port, port))
def cleanupKnownNodes(): def cleanupKnownNodes():
@ -208,8 +212,8 @@ def cleanupKnownNodes():
del knownNodes[stream][node] del knownNodes[stream][node]
continue continue
# scrap old nodes (age > 3 hours) with low rating # scrap old nodes (age > 3 hours) with low rating
if (age > 10800 and knownNodes[stream][node]["rating"] <= if (age > 10800 and knownNodes[stream][node]["rating"]
knownNodesForgetRating): <= knownNodesForgetRating):
needToWriteKnownNodesToDisk = True needToWriteKnownNodesToDisk = True
del knownNodes[stream][node] del knownNodes[stream][node]
continue continue

View File

@ -0,0 +1,17 @@
from addrthread import AddrThread
from announcethread import AnnounceThread
from connectionpool import BMConnectionPool
from dandelion import Dandelion
from downloadthread import DownloadThread
from invthread import InvThread
from networkthread import BMNetworkThread
from receivequeuethread import ReceiveQueueThread
from threads import StoppableThread
from uploadthread import UploadThread
__all__ = [
"BMConnectionPool", "Dandelion",
"AddrThread", "AnnounceThread", "BMNetworkThread", "DownloadThread",
"InvThread", "ReceiveQueueThread", "UploadThread", "StoppableThread"
]

View File

@ -10,6 +10,7 @@ from bmconfigparser import BMConfigParser
from network.bmproto import BMProto from network.bmproto import BMProto
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from network.udp import UDPSocket from network.udp import UDPSocket
from node import Peer
from threads import StoppableThread from threads import StoppableThread
@ -36,6 +37,8 @@ class AnnounceThread(StoppableThread):
for stream in state.streamsInWhichIAmParticipating: for stream in state.streamsInWhichIAmParticipating:
addr = ( addr = (
stream, stream,
state.Peer('127.0.0.1', BMConfigParser().safeGetInt("bitmessagesettings", "port")), Peer(
'127.0.0.1',
BMConfigParser().safeGetInt('bitmessagesettings', 'port')),
time.time()) time.time())
connection.append_write_buf(BMProto.assembleAddr([addr])) connection.append_write_buf(BMProto.assembleAddr([addr]))

View File

@ -24,8 +24,8 @@ from network.bmobject import (
BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError,
BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectExpiredError, BMObjectUnwantedStreamError,
BMObjectInvalidError, BMObjectAlreadyHaveError) BMObjectInvalidError, BMObjectAlreadyHaveError)
from network.node import Node
from network.proxy import ProxyError from network.proxy import ProxyError
from node import Node, Peer
from objectracker import missingObjects, ObjectTracker from objectracker import missingObjects, ObjectTracker
from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
@ -443,7 +443,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
seenTime > time.time() - BMProto.addressAlive and seenTime > time.time() - BMProto.addressAlive and
port > 0 port > 0
): ):
peer = state.Peer(decodedIP, port) peer = Peer(decodedIP, port)
try: try:
if knownnodes.knownNodes[stream][peer]["lastseen"] > seenTime: if knownnodes.knownNodes[stream][peer]["lastseen"] > seenTime:
continue continue
@ -464,7 +464,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def bm_command_portcheck(self): def bm_command_portcheck(self):
"""Incoming port check request, queue it.""" """Incoming port check request, queue it."""
portCheckerQueue.put(state.Peer(self.destination, self.peerNode.port)) portCheckerQueue.put(Peer(self.destination, self.peerNode.port))
return True return True
def bm_command_ping(self): def bm_command_ping(self):
@ -594,12 +594,14 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
# incoming from a peer we're connected to as outbound, # incoming from a peer we're connected to as outbound,
# or server full report the same error to counter deanonymisation # or server full report the same error to counter deanonymisation
if ( if (
state.Peer(self.destination.host, self.peerNode.port) in Peer(self.destination.host, self.peerNode.port)
connectionpool.BMConnectionPool().inboundConnections or in connectionpool.BMConnectionPool().inboundConnections
len(connectionpool.BMConnectionPool().inboundConnections) + or len(connectionpool.BMConnectionPool().inboundConnections)
len(connectionpool.BMConnectionPool().outboundConnections) > + len(connectionpool.BMConnectionPool().outboundConnections)
BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections") + > BMConfigParser().safeGetInt(
BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections") 'bitmessagesettings', 'maxtotalconnections')
+ BMConfigParser().safeGetInt(
'bitmessagesettings', 'maxbootstrapconnections')
): ):
self.append_write_buf(protocol.assembleErrorMessage( self.append_write_buf(protocol.assembleErrorMessage(
errorText="Server full, please try again later.", fatal=2)) errorText="Server full, please try again later.", fatal=2))
@ -622,7 +624,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
@staticmethod @staticmethod
def assembleAddr(peerList): def assembleAddr(peerList):
"""Build up a packed address""" """Build up a packed address"""
if isinstance(peerList, state.Peer): if isinstance(peerList, Peer):
peerList = (peerList) peerList = (peerList)
if not peerList: if not peerList:
return b'' return b''
@ -645,10 +647,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
@staticmethod @staticmethod
def stopDownloadingObject(hashId, forwardAnyway=False): def stopDownloadingObject(hashId, forwardAnyway=False):
"""Stop downloading an object""" """Stop downloading an object"""
for connection in ( for connection in connectionpool.BMConnectionPool().connections():
connectionpool.BMConnectionPool().inboundConnections.values() +
connectionpool.BMConnectionPool().outboundConnections.values()
):
try: try:
del connection.objectsNewToMe[hashId] del connection.objectsNewToMe[hashId]
except KeyError: except KeyError:
@ -689,7 +688,7 @@ class BMStringParser(BMProto):
""" """
def __init__(self): def __init__(self):
super(BMStringParser, self).__init__() super(BMStringParser, self).__init__()
self.destination = state.Peer('127.0.0.1', 8444) self.destination = Peer('127.0.0.1', 8444)
self.payload = None self.payload = None
ObjectTracker.__init__(self) ObjectTracker.__init__(self)

View File

@ -28,8 +28,6 @@ def chooseConnection(stream):
"bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS' "bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS'
onionOnly = BMConfigParser().safeGetBoolean( onionOnly = BMConfigParser().safeGetBoolean(
"bitmessagesettings", "onionservicesonly") "bitmessagesettings", "onionservicesonly")
if state.trustedPeer:
return state.trustedPeer
try: try:
retval = portCheckerQueue.get(False) retval = portCheckerQueue.get(False)
portCheckerQueue.task_done() portCheckerQueue.task_done()

View File

@ -1,11 +1,11 @@
""" """
src/network/connectionpool.py `BMConnectionPool` class definition
==================================
""" """
import errno import errno
import logging import logging
import re import re
import socket import socket
import sys
import time import time
import asyncore_pollchoose as asyncore import asyncore_pollchoose as asyncore
@ -15,6 +15,7 @@ import protocol
import state import state
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from connectionchooser import chooseConnection from connectionchooser import chooseConnection
from node import Peer
from proxy import Proxy from proxy import Proxy
from singleton import Singleton from singleton import Singleton
from tcp import ( from tcp import (
@ -26,9 +27,23 @@ logger = logging.getLogger('default')
@Singleton @Singleton
# pylint: disable=too-many-instance-attributes
class BMConnectionPool(object): class BMConnectionPool(object):
"""Pool of all existing connections""" """Pool of all existing connections"""
# pylint: disable=too-many-instance-attributes
trustedPeer = None
"""
If the trustedpeer option is specified in keys.dat then this will
contain a Peer which will be connected to instead of using the
addresses advertised by other peers.
The expected use case is where the user has a trusted server where
they run a Bitmessage daemon permanently. If they then run a second
instance of the client on a local machine periodically when they want
to check for messages it will sync with the network a lot faster
without compromising security.
"""
def __init__(self): def __init__(self):
asyncore.set_rates( asyncore.set_rates(
BMConfigParser().safeGetInt( BMConfigParser().safeGetInt(
@ -41,9 +56,33 @@ class BMConnectionPool(object):
self.listeningSockets = {} self.listeningSockets = {}
self.udpSockets = {} self.udpSockets = {}
self.streams = [] self.streams = []
self.lastSpawned = 0 self._lastSpawned = 0
self.spawnWait = 2 self._spawnWait = 2
self.bootstrapped = False self._bootstrapped = False
trustedPeer = BMConfigParser().safeGet(
'bitmessagesettings', 'trustedpeer')
try:
if trustedPeer:
host, port = trustedPeer.split(':')
self.trustedPeer = Peer(host, int(port))
except ValueError:
sys.exit(
'Bad trustedpeer config setting! It should be set as'
' trustedpeer=<hostname>:<portnumber>'
)
def connections(self):
"""
Shortcut for combined list of connections from
`inboundConnections` and `outboundConnections` dicts
"""
return self.inboundConnections.values() + self.outboundConnections.values()
def establishedConnections(self):
"""Shortcut for list of connections having fullyEstablished == True"""
return [
x for x in self.connections() if x.fullyEstablished]
def connectToStream(self, streamNumber): def connectToStream(self, streamNumber):
"""Connect to a bitmessage stream""" """Connect to a bitmessage stream"""
@ -74,10 +113,7 @@ class BMConnectionPool(object):
def isAlreadyConnected(self, nodeid): def isAlreadyConnected(self, nodeid):
"""Check if we're already connected to this peer""" """Check if we're already connected to this peer"""
for i in ( for i in self.connections():
self.inboundConnections.values() +
self.outboundConnections.values()
):
try: try:
if nodeid == i.nodeid: if nodeid == i.nodeid:
return True return True
@ -103,7 +139,7 @@ class BMConnectionPool(object):
if isinstance(connection, UDPSocket): if isinstance(connection, UDPSocket):
del self.udpSockets[connection.listening.host] del self.udpSockets[connection.listening.host]
elif isinstance(connection, TCPServer): elif isinstance(connection, TCPServer):
del self.listeningSockets[state.Peer( del self.listeningSockets[Peer(
connection.destination.host, connection.destination.port)] connection.destination.host, connection.destination.port)]
elif connection.isOutbound: elif connection.isOutbound:
try: try:
@ -129,10 +165,11 @@ class BMConnectionPool(object):
"bitmessagesettings", "onionbindip") "bitmessagesettings", "onionbindip")
else: else:
host = '127.0.0.1' host = '127.0.0.1'
if (BMConfigParser().safeGetBoolean( if (
"bitmessagesettings", "sockslisten") or BMConfigParser().safeGetBoolean("bitmessagesettings", "sockslisten")
BMConfigParser().safeGet( or BMConfigParser().safeGet("bitmessagesettings", "socksproxytype")
"bitmessagesettings", "socksproxytype") == "none"): == "none"
):
# python doesn't like bind + INADDR_ANY? # python doesn't like bind + INADDR_ANY?
# host = socket.INADDR_ANY # host = socket.INADDR_ANY
host = BMConfigParser().get("network", "bind") host = BMConfigParser().get("network", "bind")
@ -205,11 +242,13 @@ class BMConnectionPool(object):
'bitmessagesettings', 'socksproxytype', '') 'bitmessagesettings', 'socksproxytype', '')
onionsocksproxytype = BMConfigParser().safeGet( onionsocksproxytype = BMConfigParser().safeGet(
'bitmessagesettings', 'onionsocksproxytype', '') 'bitmessagesettings', 'onionsocksproxytype', '')
if (socksproxytype[:5] == 'SOCKS' and if (
not BMConfigParser().safeGetBoolean( socksproxytype[:5] == 'SOCKS'
'bitmessagesettings', 'sockslisten') and and not BMConfigParser().safeGetBoolean(
'.onion' not in BMConfigParser().safeGet( 'bitmessagesettings', 'sockslisten')
'bitmessagesettings', 'onionhostname', '')): and '.onion' not in BMConfigParser().safeGet(
'bitmessagesettings', 'onionhostname', '')
):
acceptConnections = False acceptConnections = False
# pylint: disable=too-many-nested-blocks # pylint: disable=too-many-nested-blocks
@ -217,8 +256,8 @@ class BMConnectionPool(object):
if not knownnodes.knownNodesActual: if not knownnodes.knownNodesActual:
self.startBootstrappers() self.startBootstrappers()
knownnodes.knownNodesActual = True knownnodes.knownNodesActual = True
if not self.bootstrapped: if not self._bootstrapped:
self.bootstrapped = True self._bootstrapped = True
Proxy.proxy = ( Proxy.proxy = (
BMConfigParser().safeGet( BMConfigParser().safeGet(
'bitmessagesettings', 'sockshostname'), 'bitmessagesettings', 'sockshostname'),
@ -247,7 +286,7 @@ class BMConnectionPool(object):
for i in range( for i in range(
state.maximumNumberOfHalfOpenConnections - pending): state.maximumNumberOfHalfOpenConnections - pending):
try: try:
chosen = chooseConnection( chosen = self.trustedPeer or chooseConnection(
helper_random.randomchoice(self.streams)) helper_random.randomchoice(self.streams))
except ValueError: except ValueError:
continue continue
@ -260,8 +299,7 @@ class BMConnectionPool(object):
continue continue
try: try:
if (chosen.host.endswith(".onion") and if chosen.host.endswith(".onion") and Proxy.onion_proxy:
Proxy.onion_proxy is not None):
if onionsocksproxytype == "SOCKS5": if onionsocksproxytype == "SOCKS5":
self.addConnection(Socks5BMConnection(chosen)) self.addConnection(Socks5BMConnection(chosen))
elif onionsocksproxytype == "SOCKS4a": elif onionsocksproxytype == "SOCKS4a":
@ -276,12 +314,9 @@ class BMConnectionPool(object):
if e.errno == errno.ENETUNREACH: if e.errno == errno.ENETUNREACH:
continue continue
self.lastSpawned = time.time() self._lastSpawned = time.time()
else: else:
for i in ( for i in self.connections():
self.inboundConnections.values() +
self.outboundConnections.values()
):
# FIXME: rating will be increased after next connection # FIXME: rating will be increased after next connection
i.handle_close() i.handle_close()
@ -291,7 +326,7 @@ class BMConnectionPool(object):
self.startListening() self.startListening()
else: else:
for bind in re.sub( for bind in re.sub(
'[^\w.]+', ' ', # pylint: disable=anomalous-backslash-in-string r'[^\w.]+', ' ',
BMConfigParser().safeGet('network', 'bind') BMConfigParser().safeGet('network', 'bind')
).split(): ).split():
self.startListening(bind) self.startListening(bind)
@ -301,7 +336,7 @@ class BMConnectionPool(object):
self.startUDPSocket() self.startUDPSocket()
else: else:
for bind in re.sub( for bind in re.sub(
'[^\w.]+', ' ', # pylint: disable=anomalous-backslash-in-string r'[^\w.]+', ' ',
BMConfigParser().safeGet('network', 'bind') BMConfigParser().safeGet('network', 'bind')
).split(): ).split():
self.startUDPSocket(bind) self.startUDPSocket(bind)
@ -319,16 +354,13 @@ class BMConnectionPool(object):
i.accepting = i.connecting = i.connected = False i.accepting = i.connecting = i.connected = False
logger.info('Stopped udp sockets.') logger.info('Stopped udp sockets.')
loopTime = float(self.spawnWait) loopTime = float(self._spawnWait)
if self.lastSpawned < time.time() - self.spawnWait: if self._lastSpawned < time.time() - self._spawnWait:
loopTime = 2.0 loopTime = 2.0
asyncore.loop(timeout=loopTime, count=1000) asyncore.loop(timeout=loopTime, count=1000)
reaper = [] reaper = []
for i in ( for i in self.connections():
self.inboundConnections.values() +
self.outboundConnections.values()
):
minTx = time.time() - 20 minTx = time.time() - 20
if i.fullyEstablished: if i.fullyEstablished:
minTx -= 300 - 20 minTx -= 300 - 20
@ -340,10 +372,8 @@ class BMConnectionPool(object):
time.time() - i.lastTx) time.time() - i.lastTx)
i.set_state("close") i.set_state("close")
for i in ( for i in (
self.inboundConnections.values() + self.connections()
self.outboundConnections.values() + + self.listeningSockets.values() + self.udpSockets.values()
self.listeningSockets.values() +
self.udpSockets.values()
): ):
if not (i.accepting or i.connecting or i.connected): if not (i.accepting or i.connecting or i.connected):
reaper.append(i) reaper.append(i)

View File

@ -1,6 +1,5 @@
""" """
src/network/downloadthread.py `DownloadThread` class definition
=============================
""" """
import time import time
@ -29,7 +28,7 @@ class DownloadThread(StoppableThread):
def cleanPending(self): def cleanPending(self):
"""Expire pending downloads eventually""" """Expire pending downloads eventually"""
deadline = time.time() - DownloadThread.requestExpires deadline = time.time() - self.requestExpires
try: try:
toDelete = [k for k, v in missingObjects.iteritems() if v < deadline] toDelete = [k for k, v in missingObjects.iteritems() if v < deadline]
except RuntimeError: except RuntimeError:
@ -43,15 +42,12 @@ class DownloadThread(StoppableThread):
while not self._stopped: while not self._stopped:
requested = 0 requested = 0
# Choose downloading peers randomly # Choose downloading peers randomly
connections = [ connections = BMConnectionPool().establishedConnections()
x for x in
BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values()
if x.fullyEstablished]
helper_random.randomshuffle(connections) helper_random.randomshuffle(connections)
try: requestChunk = max(int(
requestChunk = max(int(min(DownloadThread.maxRequestChunk, len(missingObjects)) / len(connections)), 1) min(self.maxRequestChunk, len(missingObjects))
except ZeroDivisionError: / len(connections)), 1) if connections else 1
requestChunk = 1
for i in connections: for i in connections:
now = time.time() now = time.time()
# avoid unnecessary delay # avoid unnecessary delay
@ -81,7 +77,7 @@ class DownloadThread(StoppableThread):
'%s:%i Requesting %i objects', '%s:%i Requesting %i objects',
i.destination.host, i.destination.port, chunkCount) i.destination.host, i.destination.port, chunkCount)
requested += chunkCount requested += chunkCount
if time.time() >= self.lastCleaned + DownloadThread.cleanInterval: if time.time() >= self.lastCleaned + self.cleanInterval:
self.cleanPending() self.cleanPending()
if not requested: if not requested:
self.stop.wait(1) self.stop.wait(1)

View File

@ -20,9 +20,7 @@ def handleExpiredDandelion(expired):
the object""" the object"""
if not expired: if not expired:
return return
for i in \ for i in BMConnectionPool().connections():
BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
if not i.fullyEstablished: if not i.fullyEstablished:
continue continue
for x in expired: for x in expired:
@ -44,9 +42,7 @@ class InvThread(StoppableThread):
def handleLocallyGenerated(stream, hashId): def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling""" """Locally generated inventory items require special handling"""
Dandelion().addHash(hashId, stream=stream) Dandelion().addHash(hashId, stream=stream)
for connection in \ for connection in BMConnectionPool().connections():
BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
if state.dandelion and connection != Dandelion().objectChildStem(hashId): if state.dandelion and connection != Dandelion().objectChildStem(hashId):
continue continue
connection.objectsNewToThem[hashId] = time() connection.objectsNewToThem[hashId] = time()
@ -67,8 +63,7 @@ class InvThread(StoppableThread):
break break
if chunk: if chunk:
for connection in BMConnectionPool().inboundConnections.values() + \ for connection in BMConnectionPool().connections():
BMConnectionPool().outboundConnections.values():
fluffs = [] fluffs = []
stems = [] stems = []
for inv in chunk: for inv in chunk:
@ -96,13 +91,13 @@ class InvThread(StoppableThread):
if fluffs: if fluffs:
random.shuffle(fluffs) random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket( connection.append_write_buf(protocol.CreatePacket(
'inv', addresses.encodeVarint(len(fluffs)) + 'inv',
"".join(fluffs))) addresses.encodeVarint(len(fluffs)) + ''.join(fluffs)))
if stems: if stems:
random.shuffle(stems) random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket( connection.append_write_buf(protocol.CreatePacket(
'dinv', addresses.encodeVarint(len(stems)) + 'dinv',
"".join(stems))) addresses.encodeVarint(len(stems)) + ''.join(stems)))
invQueue.iterate() invQueue.iterate()
for i in range(len(chunk)): for i in range(len(chunk)):

View File

@ -1,7 +1,7 @@
""" """
src/network/node.py Named tuples representing the network peers
===================
""" """
import collections import collections
Peer = collections.namedtuple('Peer', ['host', 'port'])
Node = collections.namedtuple('Node', ['services', 'host', 'port']) Node = collections.namedtuple('Node', ['services', 'host', 'port'])

View File

@ -95,8 +95,7 @@ class ObjectTracker(object):
def handleReceivedObject(self, streamNumber, hashid): def handleReceivedObject(self, streamNumber, hashid):
"""Handling received object""" """Handling received object"""
for i in network.connectionpool.BMConnectionPool().inboundConnections.values( for i in network.connectionpool.BMConnectionPool().connections():
) + network.connectionpool.BMConnectionPool().outboundConnections.values():
if not i.fullyEstablished: if not i.fullyEstablished:
continue continue
try: try:

View File

@ -8,9 +8,9 @@ import socket
import time import time
import asyncore_pollchoose as asyncore import asyncore_pollchoose as asyncore
import state
from advanceddispatcher import AdvancedDispatcher from advanceddispatcher import AdvancedDispatcher
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from node import Peer
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -90,9 +90,10 @@ class Proxy(AdvancedDispatcher):
def onion_proxy(self, address): def onion_proxy(self, address):
"""Set onion proxy address""" """Set onion proxy address"""
if address is not None and ( if address is not None and (
not isinstance(address, tuple) or len(address) < 2 or not isinstance(address, tuple) or len(address) < 2
not isinstance(address[0], str) or or not isinstance(address[0], str)
not isinstance(address[1], int)): or not isinstance(address[1], int)
):
raise ValueError raise ValueError
self.__class__._onion_proxy = address self.__class__._onion_proxy = address
@ -107,7 +108,7 @@ class Proxy(AdvancedDispatcher):
self.__class__._onion_auth = authTuple self.__class__._onion_auth = authTuple
def __init__(self, address): def __init__(self, address):
if not isinstance(address, state.Peer): if not isinstance(address, Peer):
raise ValueError raise ValueError
AdvancedDispatcher.__init__(self) AdvancedDispatcher.__init__(self)
self.destination = address self.destination = address

View File

@ -32,14 +32,12 @@ class ReceiveQueueThread(StoppableThread):
try: try:
connection = BMConnectionPool().getConnectionByAddr(dest) connection = BMConnectionPool().getConnectionByAddr(dest)
# KeyError = connection object not found except KeyError: # connection object not found
except KeyError:
receiveDataQueue.task_done() receiveDataQueue.task_done()
continue continue
try: try:
connection.process() connection.process()
# UnknownStateError = state isn't implemented except UnknownStateError: # state isn't implemented
except UnknownStateError:
pass pass
except socket.error as err: except socket.error as err:
if err.errno == errno.EBADF: if err.errno == errno.EBADF:

View File

@ -8,7 +8,7 @@ src/network/socks5.py
import socket import socket
import struct import struct
import state from node import Peer
from proxy import GeneralProxyError, Proxy, ProxyError from proxy import GeneralProxyError, Proxy, ProxyError
@ -200,7 +200,7 @@ class Socks5Resolver(Socks5):
def __init__(self, host): def __init__(self, host):
self.host = host self.host = host
self.port = 8444 self.port = 8444
Socks5.__init__(self, address=state.Peer(self.host, self.port)) Socks5.__init__(self, address=Peer(self.host, self.port))
def state_auth_done(self): def state_auth_done(self):
"""Perform resolving""" """Perform resolving"""

View File

@ -19,16 +19,7 @@ currentSentSpeed = 0
def connectedHostsList(): def connectedHostsList():
"""List of all the connected hosts""" """List of all the connected hosts"""
retval = [] return BMConnectionPool().establishedConnections()
for i in BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
if not i.fullyEstablished:
continue
try:
retval.append(i)
except AttributeError:
pass
return retval
def sentBytes(): def sentBytes():
@ -71,12 +62,6 @@ def downloadSpeed():
def pendingDownload(): def pendingDownload():
"""Getting pending downloads""" """Getting pending downloads"""
return len(missingObjects) return len(missingObjects)
# tmp = {}
# for connection in BMConnectionPool().inboundConnections.values() + \
# BMConnectionPool().outboundConnections.values():
# for k in connection.objectsNewToMe.keys():
# tmp[k] = True
# return len(tmp)
def pendingUpload(): def pendingUpload():

View File

@ -28,6 +28,7 @@ from network.objectracker import ObjectTracker
from network.socks4a import Socks4aConnection from network.socks4a import Socks4aConnection
from network.socks5 import Socks5Connection from network.socks5 import Socks5Connection
from network.tls import TLSDispatcher from network.tls import TLSDispatcher
from node import Peer
from queues import UISignalQueue, invQueue, receiveDataQueue from queues import UISignalQueue, invQueue, receiveDataQueue
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -49,7 +50,7 @@ class TCPConnection(BMProto, TLSDispatcher):
self.connectedAt = 0 self.connectedAt = 0
self.skipUntil = 0 self.skipUntil = 0
if address is None and sock is not None: if address is None and sock is not None:
self.destination = state.Peer(*sock.getpeername()) self.destination = Peer(*sock.getpeername())
self.isOutbound = False self.isOutbound = False
TLSDispatcher.__init__(self, sock, server_side=True) TLSDispatcher.__init__(self, sock, server_side=True)
self.connectedAt = time.time() self.connectedAt = time.time()
@ -334,7 +335,7 @@ def bootstrap(connection_class):
_connection_base = connection_class _connection_base = connection_class
def __init__(self, host, port): def __init__(self, host, port):
self._connection_base.__init__(self, state.Peer(host, port)) self._connection_base.__init__(self, Peer(host, port))
self.close_reason = self._succeed = False self.close_reason = self._succeed = False
def bm_command_addr(self): def bm_command_addr(self):
@ -384,7 +385,7 @@ class TCPServer(AdvancedDispatcher):
'bitmessagesettings', 'port', str(port)) 'bitmessagesettings', 'port', str(port))
BMConfigParser().save() BMConfigParser().save()
break break
self.destination = state.Peer(host, port) self.destination = Peer(host, port)
self.bound = True self.bound = True
self.listen(5) self.listen(5)
@ -402,7 +403,7 @@ class TCPServer(AdvancedDispatcher):
except (TypeError, IndexError): except (TypeError, IndexError):
return return
state.ownAddresses[state.Peer(*sock.getsockname())] = True state.ownAddresses[Peer(*sock.getsockname())] = True
if ( if (
len(connectionpool.BMConnectionPool().inboundConnections) + len(connectionpool.BMConnectionPool().inboundConnections) +
len(connectionpool.BMConnectionPool().outboundConnections) > len(connectionpool.BMConnectionPool().outboundConnections) >

View File

@ -9,6 +9,7 @@ import socket
import state import state
import protocol import protocol
from bmproto import BMProto from bmproto import BMProto
from node import Peer
from objectracker import ObjectTracker from objectracker import ObjectTracker
from queues import receiveDataQueue from queues import receiveDataQueue
@ -43,8 +44,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
else: else:
self.socket = sock self.socket = sock
self.set_socket_reuse() self.set_socket_reuse()
self.listening = state.Peer(*self.socket.getsockname()) self.listening = Peer(*self.socket.getsockname())
self.destination = state.Peer(*self.socket.getsockname()) self.destination = Peer(*self.socket.getsockname())
ObjectTracker.__init__(self) ObjectTracker.__init__(self)
self.connecting = False self.connecting = False
self.connected = True self.connected = True
@ -96,7 +97,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
self.destination.host, self.destination.port, remoteport) self.destination.host, self.destination.port, remoteport)
if self.local: if self.local:
state.discoveredPeers[ state.discoveredPeers[
state.Peer(self.destination.host, remoteport) Peer(self.destination.host, remoteport)
] = time.time() ] = time.time()
return True return True
@ -131,7 +132,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
logger.error("socket error: %s", e) logger.error("socket error: %s", e)
return return
self.destination = state.Peer(*addr) self.destination = Peer(*addr)
encodedAddr = protocol.encodeHost(addr[0]) encodedAddr = protocol.encodeHost(addr[0])
self.local = bool(protocol.checkIPAddress(encodedAddr, True)) self.local = bool(protocol.checkIPAddress(encodedAddr, True))
# overwrite the old buffer to avoid mixing data and so that # overwrite the old buffer to avoid mixing data and so that

View File

@ -1,5 +1,5 @@
""" """
src/network/uploadthread.py `UploadThread` class definition
""" """
import time import time
@ -22,19 +22,19 @@ class UploadThread(StoppableThread):
def run(self): def run(self):
while not self._stopped: while not self._stopped:
uploaded = 0 uploaded = 0
# Choose downloading peers randomly # Choose uploading peers randomly
connections = [x for x in BMConnectionPool().inboundConnections.values() + connections = BMConnectionPool().establishedConnections()
BMConnectionPool().outboundConnections.values() if x.fullyEstablished]
helper_random.randomshuffle(connections) helper_random.randomshuffle(connections)
for i in connections: for i in connections:
now = time.time() now = time.time()
# avoid unnecessary delay # avoid unnecessary delay
if i.skipUntil >= now: if i.skipUntil >= now:
continue continue
if len(i.write_buf) > UploadThread.maxBufSize: if len(i.write_buf) > self.maxBufSize:
continue continue
try: try:
request = i.pendingUpload.randomKeys(RandomTrackingDict.maxPending) request = i.pendingUpload.randomKeys(
RandomTrackingDict.maxPending)
except KeyError: except KeyError:
continue continue
payload = bytearray() payload = bytearray()

View File

@ -1,20 +1,51 @@
import Queue """Most of the queues used by bitmessage threads are defined here."""
import Queue
import threading
import time
from class_objectProcessorQueue import ObjectProcessorQueue
from multiqueue import MultiQueue from multiqueue import MultiQueue
class ObjectProcessorQueue(Queue.Queue):
"""Special queue class using lock for `.threads.objectProcessor`"""
maxSize = 32000000
def __init__(self):
Queue.Queue.__init__(self)
self.sizeLock = threading.Lock()
#: in Bytes. We maintain this to prevent nodes from flooding us
#: with objects which take up too much memory. If this gets
#: too big we'll sleep before asking for further objects.
self.curSize = 0
def put(self, item, block=True, timeout=None):
while self.curSize >= self.maxSize:
time.sleep(1)
with self.sizeLock:
self.curSize += len(item[1])
Queue.Queue.put(self, item, block, timeout)
def get(self, block=True, timeout=None):
item = Queue.Queue.get(self, block, timeout)
with self.sizeLock:
self.curSize -= len(item[1])
return item
workerQueue = Queue.Queue() workerQueue = Queue.Queue()
UISignalQueue = Queue.Queue() UISignalQueue = Queue.Queue()
addressGeneratorQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue()
# receiveDataThreads dump objects they hear on the network into this #: `.network.ReceiveQueueThread` instances dump objects they hear
# queue to be processed. #: on the network into this queue to be processed.
objectProcessorQueue = ObjectProcessorQueue() objectProcessorQueue = ObjectProcessorQueue()
invQueue = MultiQueue() invQueue = MultiQueue()
addrQueue = MultiQueue() addrQueue = MultiQueue()
portCheckerQueue = Queue.Queue() portCheckerQueue = Queue.Queue()
receiveDataQueue = Queue.Queue() receiveDataQueue = Queue.Queue()
# The address generator thread uses this queue to get information back #: The address generator thread uses this queue to get information back
# to the API thread. #: to the API thread.
apiAddressGeneratorReturnQueue = Queue.Queue() apiAddressGeneratorReturnQueue = Queue.Queue()
# Exceptions #: for exceptions
excQueue = Queue.Queue() excQueue = Queue.Queue()

View File

@ -10,7 +10,7 @@ from debug import logger
from helper_sql import sqlQuery, sqlStoredProcedure from helper_sql import sqlQuery, sqlStoredProcedure
from inventory import Inventory from inventory import Inventory
from knownnodes import saveKnownNodes from knownnodes import saveKnownNodes
from network.threads import StoppableThread from network import StoppableThread
from queues import ( from queues import (
addressGeneratorQueue, objectProcessorQueue, UISignalQueue, workerQueue) addressGeneratorQueue, objectProcessorQueue, UISignalQueue, workerQueue)

View File

@ -1,7 +1,6 @@
""" """
Global runtime variables. Global runtime variables.
""" """
import collections
neededPubkeys = {} neededPubkeys = {}
streamsInWhichIAmParticipating = [] streamsInWhichIAmParticipating = []
@ -47,24 +46,8 @@ uploadThread = None
ownAddresses = {} ownAddresses = {}
trustedPeer = None
"""
If the trustedpeer option is specified in keys.dat then this will
contain a Peer which will be connected to instead of using the
addresses advertised by other peers. The client will only connect to
this peer and the timing attack mitigation will be disabled in order
to download data faster. The expected use case is where the user has
a fast connection to a trusted server where they run a BitMessage
daemon permanently. If they then run a second instance of the client
on a local machine periodically when they want to check for messages
it will sync with the network a lot faster without compromising
security.
"""
discoveredPeers = {} discoveredPeers = {}
Peer = collections.namedtuple('Peer', ['host', 'port'])
dandelion = 0 dandelion = 0
testmode = False testmode = False

View File

@ -17,6 +17,7 @@ from bmconfigparser import BMConfigParser
from helper_msgcoding import MsgEncode, MsgDecode from helper_msgcoding import MsgEncode, MsgDecode
from network import asyncore_pollchoose as asyncore from network import asyncore_pollchoose as asyncore
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from network.node import Peer
from network.tcp import Socks4aBMConnection, Socks5BMConnection, TCPConnection from network.tcp import Socks4aBMConnection, Socks5BMConnection, TCPConnection
from queues import excQueue from queues import excQueue
@ -30,7 +31,7 @@ def pickle_knownnodes():
with open(knownnodes_file, 'wb') as dst: with open(knownnodes_file, 'wb') as dst:
pickle.dump({ pickle.dump({
stream: { stream: {
state.Peer( Peer(
'%i.%i.%i.%i' % tuple([ '%i.%i.%i.%i' % tuple([
random.randint(1, 255) for i in range(4)]), random.randint(1, 255) for i in range(4)]),
8444): {'lastseen': now, 'rating': 0.1} 8444): {'lastseen': now, 'rating': 0.1}
@ -90,7 +91,7 @@ class TestCore(unittest.TestCase):
"""initial fill script from network.tcp""" """initial fill script from network.tcp"""
BMConfigParser().set('bitmessagesettings', 'dontconnect', 'true') BMConfigParser().set('bitmessagesettings', 'dontconnect', 'true')
try: try:
for peer in (state.Peer("127.0.0.1", 8448),): for peer in (Peer("127.0.0.1", 8448),):
direct = TCPConnection(peer) direct = TCPConnection(peer)
while asyncore.socket_map: while asyncore.socket_map:
print("loop, state = %s" % direct.state) print("loop, state = %s" % direct.state)
@ -147,7 +148,7 @@ class TestCore(unittest.TestCase):
def _initiate_bootstrap(self): def _initiate_bootstrap(self):
BMConfigParser().set('bitmessagesettings', 'dontconnect', 'true') BMConfigParser().set('bitmessagesettings', 'dontconnect', 'true')
self._outdate_knownnodes() self._outdate_knownnodes()
knownnodes.addKnownNode(1, state.Peer('127.0.0.1', 8444), is_self=True) knownnodes.addKnownNode(1, Peer('127.0.0.1', 8444), is_self=True)
knownnodes.cleanupKnownNodes() knownnodes.cleanupKnownNodes()
time.sleep(2) time.sleep(2)

46
src/threads.py Normal file
View File

@ -0,0 +1,46 @@
"""
PyBitmessage does various tasks in separate threads. Most of them inherit
from `.network.StoppableThread`. There are `addressGenerator` for
addresses generation, `objectProcessor` for processing the network objects
passed minimal validation, `singleCleaner` to periodically clean various
internal storages (like inventory and knownnodes) and do forced garbage
collection, `singleWorker` for doing PoW, `sqlThread` for querying sqlite
database.
There are also other threads in the `.network` package.
:func:`set_thread_name` is defined here for the threads that don't inherit from
:class:`.network.StoppableThread`
"""
import threading
try:
import prctl
except ImportError:
def set_thread_name(name):
"""Set a name for the thread for python internal use."""
threading.current_thread().name = name
else:
def set_thread_name(name):
"""Set the thread name for external use (visible from the OS)."""
prctl.set_name(name)
def _thread_name_hack(self):
set_thread_name(self.name)
threading.Thread.__bootstrap_original__(self)
# pylint: disable=protected-access
threading.Thread.__bootstrap_original__ = threading.Thread._Thread__bootstrap
threading.Thread._Thread__bootstrap = _thread_name_hack
from class_addressGenerator import addressGenerator
from class_objectProcessor import objectProcessor
from class_singleCleaner import singleCleaner
from class_singleWorker import singleWorker
from class_sqlThread import sqlThread
__all__ = [
"addressGenerator", "objectProcessor", "singleCleaner", "singleWorker",
"sqlThread"
]

View File

@ -1,9 +1,6 @@
# pylint: disable=too-many-statements,too-many-branches,protected-access,no-self-use # pylint: disable=too-many-statements,too-many-branches,protected-access,no-self-use
""" """
src/upnp.py Complete UPnP port forwarding implementation in separate thread.
===========
A simple upnp module to forward port for BitMessage
Reference: http://mattscodecave.com/posts/using-python-and-upnp-to-forward-a-port Reference: http://mattscodecave.com/posts/using-python-and-upnp-to-forward-a-port
""" """
@ -21,8 +18,8 @@ import state
import tr import tr
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from debug import logger from debug import logger
from network.connectionpool import BMConnectionPool from network import BMConnectionPool, StoppableThread
from network.threads import StoppableThread from network.node import Peer
def createRequestXML(service, action, arguments=None): def createRequestXML(service, action, arguments=None):
@ -263,7 +260,7 @@ class uPnPThread(StoppableThread):
self.routers.append(newRouter) self.routers.append(newRouter)
self.createPortMapping(newRouter) self.createPortMapping(newRouter)
try: try:
self_peer = state.Peer( self_peer = Peer(
newRouter.GetExternalIPAddress(), newRouter.GetExternalIPAddress(),
self.extPort self.extPort
) )