Asyncore updates

- asyncore is now on by default
- inv announcements implemented
- bandwidth limit implemented / fixed
- stats on download / upload speed now work
- make prints into logger
- limit knownNodes to 20k as it was before
- green light fixed
- other minor fixes
This commit is contained in:
Peter Šurda 2017-05-29 00:24:07 +02:00
parent 5d4e1e2007
commit c85d52b8e8
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
22 changed files with 316 additions and 158 deletions

View File

@ -858,8 +858,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
objectType, toStreamNumber, encryptedPayload, int(time.time()) + TTL,'')
with shared.printLock:
print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', hexlify(inventoryHash)
protocol.broadcastToSendDataQueues((
toStreamNumber, 'advertiseobject', inventoryHash))
if BMConfigParser.safeGetBoolean("network", "asyncore"):
queues.invQueue.put((toStreamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
toStreamNumber, 'advertiseobject', inventoryHash))
def HandleTrashSentMessageByAckDAta(self, params):
# This API method should only be used when msgid is not available
@ -905,8 +908,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
objectType, pubkeyStreamNumber, payload, int(time.time()) + TTL,'')
with shared.printLock:
print 'broadcasting inv within API command disseminatePubkey with hash:', hexlify(inventoryHash)
protocol.broadcastToSendDataQueues((
pubkeyStreamNumber, 'advertiseobject', inventoryHash))
if BMConfigParser.safeGetBoolean("network", "asyncore"):
queues.invQueue.put((pubkeyStreamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
pubkeyStreamNumber, 'advertiseobject', inventoryHash))
def HandleGetMessageDataByDestinationHash(self, params):
# Method will eventually be used by a particular Android app to

View File

@ -57,6 +57,7 @@ from network.connectionpool import BMConnectionPool
from network.networkthread import BMNetworkThread
from network.receivequeuethread import ReceiveQueueThread
from network.announcethread import AnnounceThread
from network.invthread import InvThread
#from network.downloadthread import DownloadThread
# Helper Functions
@ -275,6 +276,9 @@ class Main:
announceThread = AnnounceThread()
announceThread.daemon = True
announceThread.start()
state.invThread = InvThread()
state.invThread.daemon = True
state.invThread.start()
connectToStream(1)

View File

@ -46,7 +46,7 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
return "%4.0f kB" % num
def updateNumberOfObjectsToBeSynced(self):
self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, PendingDownloadQueue.totalSize() + PendingUpload().len()))
self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, network.stats.pendingDownload() + network.stats.pendingUpload()))
def updateNumberOfMessagesProcessed(self):
self.updateNumberOfObjectsToBeSynced()

View File

@ -16,7 +16,7 @@ BMConfigDefaults = {
"maxuploadrate": 0,
},
"network": {
"asyncore": False,
"asyncore": True,
"bind": None,
},
"inventory": {

View File

@ -1,4 +1,5 @@
import threading
import resource
import shared
import time
import sys
@ -9,6 +10,7 @@ from bmconfigparser import BMConfigParser
from helper_sql import *
from helper_threading import *
from inventory import Inventory
from network.connectionpool import BMConnectionPool
from debug import logger
import knownnodes
import queues
@ -36,6 +38,7 @@ resends msg messages in 5 days (then 10 days, then 20 days, etc...)
class singleCleaner(threading.Thread, StoppableThread):
cycleLength = 300
def __init__(self):
threading.Thread.__init__(self, name="singleCleaner")
@ -51,7 +54,7 @@ class singleCleaner(threading.Thread, StoppableThread):
# initial wait
if state.shutdown == 0:
self.stop.wait(300)
self.stop.wait(singleCleaner.cycleLength)
while state.shutdown == 0:
queues.UISignalQueue.put((
@ -119,8 +122,10 @@ class singleCleaner(threading.Thread, StoppableThread):
# TODO: cleanup pending upload / download
logger.info("Memory usage %s (kB)", resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
if state.shutdown == 0:
self.stop.wait(300)
self.stop.wait(singleCleaner.cycleLength)
def resendPubkeyRequest(address):

View File

@ -192,8 +192,11 @@ class singleWorker(threading.Thread, StoppableThread):
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
if BMConfigParser.safeGetBoolean("network", "asyncore"):
queues.invQueue.put((streamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', ''))
try:
BMConfigParser().set(
@ -283,8 +286,11 @@ class singleWorker(threading.Thread, StoppableThread):
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
if BMConfigParser.safeGetBoolean("network", "asyncore"):
queues.invQueue.put((streamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', ''))
try:
BMConfigParser().set(
@ -374,8 +380,11 @@ class singleWorker(threading.Thread, StoppableThread):
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
if BMConfigParser.safeGetBoolean("network", "asyncore"):
queues.invQueue.put((streamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', ''))
try:
BMConfigParser().set(
@ -504,8 +513,11 @@ class singleWorker(threading.Thread, StoppableThread):
objectType, streamNumber, payload, embeddedTime, tag)
PendingUpload().add(inventoryHash)
logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash))
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
if BMConfigParser.safeGetBoolean("network", "asyncore"):
queues.invQueue.put((streamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Broadcast sent on %1").arg(l10n.formatTimestamp()))))
@ -834,8 +846,11 @@ class singleWorker(threading.Thread, StoppableThread):
# not sending to a chan or one of my addresses
queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Waiting for acknowledgement. Sent on %1").arg(l10n.formatTimestamp()))))
logger.info('Broadcasting inv for my msg(within sendmsg function):' + hexlify(inventoryHash))
protocol.broadcastToSendDataQueues((
toStreamNumber, 'advertiseobject', inventoryHash))
if BMConfigParser.safeGetBoolean("network", "asyncore"):
queues.invQueue.put((toStreamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
toStreamNumber, 'advertiseobject', inventoryHash))
# Update the sent message in the sent table with the necessary information.
if BMConfigParser().has_section(toaddress) or not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK):
@ -937,8 +952,11 @@ class singleWorker(threading.Thread, StoppableThread):
objectType, streamNumber, payload, embeddedTime, '')
PendingUpload().add(inventoryHash)
logger.info('sending inv (for the getpubkey message)')
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
if BMConfigParser.safeGetBoolean("network", "asyncore"):
queues.invQueue.put((streamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
# wait 10% past expiration
sleeptill = int(time.time() + TTL * 1.1)

View File

@ -2,6 +2,7 @@ import Queue
import time
import asyncore_pollchoose as asyncore
from debug import logger
from bmconfigparser import BMConfigParser
class AdvancedDispatcher(asyncore.dispatcher):
@ -56,44 +57,45 @@ class AdvancedDispatcher(asyncore.dispatcher):
self.state = state
def writable(self):
return self.connecting or len(self.write_buf) > 0 or not self.writeQueue.empty()
return asyncore.dispatcher.writable(self) and \
(self.connecting or len(self.write_buf) > 0 or not self.writeQueue.empty())
def readable(self):
return self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len
return asyncore.dispatcher.readable(self) and \
(self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len)
def handle_read(self):
self.lastTx = time.time()
downloadBytes = AdvancedDispatcher._buf_len
if asyncore.maxDownloadRate > 0:
downloadBytes = asyncore.downloadChunk
downloadBytes = asyncore.downloadBucket
if self.expectBytes > 0 and downloadBytes > self.expectBytes:
downloadBytes = self.expectBytes
newData = self.recv(downloadBytes)
if asyncore.maxDownloadRate > 0:
asyncore.downloadBucket -= len(newData)
self.receivedBytes += len(newData)
if self.expectBytes > 0:
self.expectBytes -= len(newData)
asyncore.updateReceived(len(newData))
self.read_buf += newData
if downloadBytes > 0:
newData = self.recv(downloadBytes)
self.receivedBytes += len(newData)
if self.expectBytes > 0:
self.expectBytes -= len(newData)
asyncore.update_received(len(newData))
self.read_buf += newData
self.process()
def handle_write(self):
self.lastTx = time.time()
bufSize = AdvancedDispatcher._buf_len
if asyncore.maxUploadRate > 0:
bufSize = asyncore.uploadChunk
else:
bufSize = self._buf_len
bufSize = asyncore.uploadBucket
while len(self.write_buf) < bufSize:
try:
self.write_buf += self.writeQueue.get(False)
self.writeQueue.task_done()
except Queue.Empty:
break
if bufSize <= 0:
return
if len(self.write_buf) > 0:
written = self.send(self.write_buf[0:bufSize])
asyncore.uploadBucket -= written
asyncore.updateSent(written)
asyncore.update_sent(written)
self.sentBytes += written
self.slice_write_buf(written)
@ -107,7 +109,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
def close(self):
self.read_buf = b""
self.write_buf = b""
self.state = "shutdown"
self.state = "close"
while True:
try:
self.writeQueue.get(False)

View File

@ -17,7 +17,7 @@ class AnnounceThread(threading.Thread, StoppableThread):
self.initStop()
self.name = "AnnounceThread"
BMConnectionPool()
logger.error("init announce thread")
logger.info("init announce thread")
def run(self):
lastSelfAnnounced = 0

View File

@ -90,12 +90,10 @@ class ExitNow(Exception):
_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
maxDownloadRate = 0
downloadChunk = 0
downloadTimestamp = 0
downloadBucket = 0
receivedBytes = 0
maxUploadRate = 0
uploadChunk = 0
uploadTimestamp = 0
uploadBucket = 0
sentBytes = 0
@ -117,48 +115,37 @@ def write(obj):
obj.handle_error()
def set_rates(download, upload):
global maxDownloadRate, maxUploadRate, downloadChunk, uploadChunk, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp
global maxDownloadRate, maxUploadRate, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp
maxDownloadRate = float(download)
if maxDownloadRate > 0:
downloadChunk = 1400
maxUploadRate = float(upload)
if maxUploadRate > 0:
uploadChunk = 1400
downloadBucket = maxDownloadRate
uploadBucket = maxUploadRate
downloadTimestamp = time.time()
uploadTimestamp = time.time()
def updateReceived(download=0):
global receivedBytes
def update_received(download=0):
global receivedBytes, maxDownloadRate, downloadBucket, downloadTimestamp
currentTimestamp = time.time()
receivedBytes += download
if maxDownloadRate > 0:
bucketIncrease = int(maxDownloadRate * (currentTimestamp - downloadTimestamp))
downloadBucket += bucketIncrease
if downloadBucket > maxDownloadRate:
downloadBucket = int(maxDownloadRate)
downloadBucket -= download
downloadTimestamp = currentTimestamp
def updateSent(upload=0):
global sentBytes
def update_sent(upload=0):
global sentBytes, maxUploadRate, uploadBucket, uploadTimestamp
currentTimestamp = time.time()
sentBytes += upload
def wait_tx_buckets():
global downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp
if maxDownloadRate > 0 and maxUploadRate > 0:
wait_for_this_long = min(maxDownloadRate / downloadChunk, maxUploadRate / uploadChunk)
elif maxDownloadRate > 0:
wait_for_this_long = maxDownloadRate / downloadChunk
elif maxUploadRate > 0:
wait_for_this_long = maxUploadRate / uploadChunk
else:
return
wait_for_this_long /= 2
if wait_for_this_long > 1:
wait_for_this_long = 1
elif wait_for_this_long < 0.1:
wait_for_this_long = 0.1
while downloadBucket < downloadChunk and uploadBucket < uploadChunk:
time.sleep(wait_for_this_long)
downloadBucket += (time.time() - downloadTimestamp) * maxDownloadRate
downloadTimestamp = time.time()
uploadBucket += (time.time() - uploadTimestamp) * maxUploadRate
uploadTimestamp = time.time()
if maxUploadRate > 0:
bucketIncrease = int(maxUploadRate * (currentTimestamp - uploadTimestamp))
uploadBucket += bucketIncrease
if uploadBucket > maxUploadRate:
uploadBucket = int(maxUploadRate)
uploadBucket -= upload
uploadTimestamp = currentTimestamp
def _exception(obj):
try:
@ -376,13 +363,19 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None,
if count is None:
while map:
wait_tx_buckets()
# fill buckets first
update_sent()
update_received()
# then poll
poller(timeout, map)
else:
timeout /= count
while map and count > 0:
wait_tx_buckets()
# fill buckets first
update_sent()
update_received()
poller(timeout, map)
# then poll
count = count - 1
class dispatcher:
@ -396,6 +389,8 @@ class dispatcher:
ignore_log_types = frozenset(['warning'])
poller_registered = False
flags = 0
# don't do network IO with a smaller bucket than this
minTx = 1500
def __init__(self, sock=None, map=None):
if map is None:
@ -499,9 +494,13 @@ class dispatcher:
# ==================================================
def readable(self):
if maxDownloadRate > 0:
return downloadBucket > dispatcher.minTx
return True
def writable(self):
if maxUploadRate > 0:
return uploadBucket > dispatcher.minTx
return True
# ==================================================

View File

@ -3,7 +3,6 @@ from binascii import hexlify
import hashlib
import math
import time
from pprint import pprint
import socket
import struct
import random
@ -25,7 +24,7 @@ from network.uploadqueue import UploadQueue, UploadElem, AddrUploadQueue, ObjUpl
import addresses
from bmconfigparser import BMConfigParser
from queues import objectProcessorQueue, portCheckerQueue, UISignalQueue
from queues import objectProcessorQueue, portCheckerQueue, UISignalQueue, invQueue
import shared
import state
import protocol
@ -53,35 +52,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
# maximum time offset
maxTimeOffset = 3600
# def __init__(self, address=None, sock=None):
# AdvancedDispatcher.__init__(self, sock)
# self.verackReceived = False
# self.verackSent = False
# self.lastTx = time.time()
# self.streams = [0]
# self.fullyEstablished = False
# self.connectedAt = 0
# self.skipUntil = 0
# if address is None and sock is not None:
# self.destination = state.Peer(sock.getpeername()[0], sock.getpeername()[1])
# self.isOutbound = False
# TLSDispatcher.__init__(self, sock, server_side=True)
# self.connectedAt = time.time()
# #print "received connection in background from %s:%i" % (self.destination.host, self.destination.port)
# else:
# self.destination = address
# self.isOutbound = True
# if ":" in address.host:
# self.create_socket(socket.AF_INET6, socket.SOCK_STREAM)
# else:
# self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
# self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# TLSDispatcher.__init__(self, sock, server_side=False)
# self.connect(self.destination)
# #print "connecting in background to %s:%i" % (self.destination.host, self.destination.port)
# shared.connectedHostsList[self.destination] = 0
# ObjectTracker.__init__(self)
# UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
def __init__(self, address=None, sock=None):
AdvancedDispatcher.__init__(self, sock)
self.isOutbound = False
# packet/connection from a local IP
self.local = False
def bm_proto_reset(self):
self.magic = None
@ -95,7 +70,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.object = None
def state_bm_header(self):
#print "%s:%i: header" % (self.destination.host, self.destination.port)
if len(self.read_buf) < protocol.Header.size:
#print "Length below header size"
return False
@ -105,7 +79,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
# skip 1 byte in order to sync
self.bm_proto_reset()
self.set_state("bm_header", 1)
print "Bad magic"
logger.debug("Bad magic")
self.close()
return False
if self.payloadLength > BMProto.maxMessageSize:
@ -117,10 +91,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
if len(self.read_buf) < self.payloadLength:
#print "Length below announced object length"
return False
print "%s:%i: command %s (%ib)" % (self.destination.host, self.destination.port, self.command, self.payloadLength)
#logger.debug("%s:%i: command %s (%ib)", self.destination.host, self.destination.port, self.command, self.payloadLength)
self.payload = self.read_buf[:self.payloadLength]
if self.checksum != hashlib.sha512(self.payload).digest()[0:4]:
print "Bad checksum, ignoring"
logger.debug("Bad checksum, ignoring")
self.invalid = True
retval = True
if not self.fullyEstablished and self.command not in ("version", "verack"):
@ -131,28 +105,28 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
retval = getattr(self, "bm_command_" + str(self.command).lower())()
except AttributeError:
# unimplemented command
print "unimplemented command %s" % (self.command)
logger.debug("unimplemented command %s", self.command)
except BMProtoInsufficientDataError:
print "packet length too short, skipping"
logger.debug("packet length too short, skipping")
except BMProtoExcessiveDataError:
print "too much data, skipping"
logger.debug("too much data, skipping")
except BMObjectInsufficientPOWError:
print "insufficient PoW, skipping"
logger.debug("insufficient PoW, skipping")
except BMObjectInvalidDataError:
print "object invalid data, skipping"
logger.debug("object invalid data, skipping")
except BMObjectExpiredError:
print "object expired, skipping"
logger.debug("object expired, skipping")
except BMObjectUnwantedStreamError:
print "object not in wanted stream, skipping"
logger.debug("object not in wanted stream, skipping")
except BMObjectInvalidError:
print "object invalid, skipping"
logger.debug("object invalid, skipping")
except BMObjectAlreadyHaveError:
print "already got object, skipping"
logger.debug("already got object, skipping")
except struct.error:
print "decoding error, skipping"
logger.debug("decoding error, skipping")
else:
#print "Skipping command %s due to invalid data" % (self.command)
print "Closing due to invalid data" % (self.command)
logger.debug("Closing due to invalid command %s", self.command)
self.close()
return False
if retval:
@ -253,13 +227,13 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.payloadOffset += 8
i += 1
if self.payloadOffset > self.payloadLength:
print "Insufficient data %i/%i" % (self.payloadOffset, self.payloadLength)
logger.debug("Insufficient data %i/%i", self.payloadOffset, self.payloadLength)
raise BMProtoInsufficientDataError()
return retval
def bm_command_error(self):
fatalStatus, banTime, inventoryVector, errorText = self.decode_payload_content("vvlsls")
print "%s:%i error: %i, %s" % (self.destination.host, self.destination.port, fatalStatus, errorText)
logger.error("%s:%i error: %i, %s", self.destination.host, self.destination.port, fatalStatus, errorText)
return True
def bm_command_getdata(self):
@ -325,6 +299,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
objectProcessorQueue.put((self.object.objectType,self.object.data))
#DownloadQueue().task_done(self.object.inventoryHash)
network.connectionpool.BMConnectionPool().handleReceivedObject(self, self.object.streamNumber, self.object.inventoryHash)
invQueue.put((self.object.streamNumber, self.object.inventoryHash))
#ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash))
#broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
return True
@ -344,8 +319,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
peer = state.Peer(decodedIP, port)
if peer in knownnodes.knownNodes[stream] and knownnodes.knownNodes[stream][peer] > seenTime:
continue
knownnodes.knownNodes[stream][peer] = seenTime
AddrUploadQueue().put((stream, peer))
if len(knownnodes.knownNodes[stream]) < 20000:
with knownnodes.knownNodesLock:
knownnodes.knownNodes[stream][peer] = seenTime
#knownnodes.knownNodes[stream][peer] = seenTime
#AddrUploadQueue().put((stream, peer))
return True
def bm_command_portcheck(self):
@ -392,7 +370,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.verackSent = True
if not self.isOutbound:
self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, True))
print "%s:%i: Sending version" % (self.destination.host, self.destination.port)
#print "%s:%i: Sending version" % (self.destination.host, self.destination.port)
if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and
protocol.haveSSL(not self.isOutbound)):
self.isSSL = True
@ -472,10 +450,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def close(self, reason=None):
self.set_state("close")
# if reason is None:
# print "%s:%i: closing" % (self.destination.host, self.destination.port)
# #traceback.print_stack()
# else:
# print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason)
if reason is None:
#logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, ''.join(traceback.format_stack()))
logger.debug("%s:%i: closing", self.destination.host, self.destination.port)
#traceback.print_stack()
else:
logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, reason)
network.connectionpool.BMConnectionPool().removeConnection(self)
AdvancedDispatcher.close(self)

View File

@ -21,8 +21,8 @@ import state
class BMConnectionPool(object):
def __init__(self):
asyncore.set_rates(
BMConfigParser().safeGetInt("bitmessagesettings", "maxdownloadrate"),
BMConfigParser().safeGetInt("bitmessagesettings", "maxuploadrate"))
BMConfigParser().safeGetInt("bitmessagesettings", "maxdownloadrate") * 1024,
BMConfigParser().safeGetInt("bitmessagesettings", "maxuploadrate") * 1024)
self.outboundConnections = {}
self.inboundConnections = {}
self.listeningSockets = {}
@ -117,7 +117,6 @@ class BMConnectionPool(object):
if spawnConnections:
if not self.bootstrapped:
print "bootstrapping dns"
helper_bootstrap.dns()
self.bootstrapped = True
established = sum(1 for c in self.outboundConnections.values() if (c.connected and c.fullyEstablished))

82
src/network/invthread.py Normal file
View File

@ -0,0 +1,82 @@
import collections
import Queue
import random
import threading
import time
import addresses
from bmconfigparser import BMConfigParser
from debug import logger
from helper_threading import StoppableThread
from network.bmproto import BMProto
from network.connectionpool import BMConnectionPool
from queues import invQueue
import protocol
import state
class InvThread(threading.Thread, StoppableThread):
size = 10
def __init__(self):
threading.Thread.__init__(self, name="InvThread")
self.initStop()
self.name = "InvThread"
self.shutdown = False
self.collectionOfInvs = []
for i in range(InvThread.size):
self.collectionOfInvs.append({})
def run(self):
iterator = 0
while not state.shutdown:
while True:
try:
(stream, hash) = invQueue.get(False)
self.holdHash (stream, hash)
except Queue.Empty:
break
if len(self.collectionOfInvs[iterator]) > 0:
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
hashes = []
for stream in connection.streams:
try:
for hashId in self.collectionOfInvs[iterator][stream]:
if hashId in connection.objectsNewToThem:
hashes.append(hashId)
del connection.objectsNewToThem[hashId]
except KeyError:
continue
if len(hashes) > 0:
connection.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(len(hashes)) + b"".join(hashes)))
self.collectionOfInvs[iterator] = []
iterator += 1
iterator %= InvThread.size
self.stop.wait(1)
def holdHash(self, stream, hash):
iter = random.randrange(0, InvThread.size)
try:
self.collectionOfInvs[iter][stream].append(hash)
except KeyError, IndexError:
self.collectionOfInvs[iter][stream] = []
self.collectionOfInvs[iter][stream].append(hash)
def hasHash(self, hash):
for streamlist in self.collectionOfInvs:
for stream in streamlist:
if hash in streamlist[stream]:
return True
return False
def hashCount(self):
retval = 0
for streamlist in self.collectionOfInvs:
for stream in streamlist:
retval += len(streamlist[stream])
return retval
def close(self):
self.shutdown = True

View File

@ -12,7 +12,7 @@ class BMNetworkThread(threading.Thread, StoppableThread):
self.initStop()
self.name = "AsyncoreThread"
BMConnectionPool()
logger.error("init asyncore thread")
logger.info("init asyncore thread")
def run(self):
while not self._stopped:

View File

@ -17,7 +17,7 @@ class ReceiveQueueThread(threading.Thread, StoppableThread):
self.initStop()
self.name = "ReceiveQueueThread"
BMConnectionPool()
logger.error("init receive queue thread")
logger.info("init receive queue thread")
def run(self):
lastprinted = int(time.time())

View File

@ -1,9 +1,19 @@
import time
from bmconfigparser import BMConfigParser
from network.connectionpool import BMConnectionPool
from inventory import PendingDownloadQueue, PendingUpload
import asyncore_pollchoose as asyncore
import shared
import throttle
lastReceivedTimestamp = time.time()
lastReceivedBytes = 0
currentReceivedSpeed = 0
lastSentTimestamp = time.time()
lastSentBytes = 0
currentSentSpeed = 0
def connectedHostsList():
if BMConfigParser().safeGetBoolean("network", "asyncore"):
retval = []
@ -25,8 +35,15 @@ def sentBytes():
return throttle.SendThrottle().total
def uploadSpeed():
global lastSentTimestamp, lastSentBytes, currentSentSpeed
if BMConfigParser().safeGetBoolean("network", "asyncore"):
return 0
currentTimestamp = time.time()
if int(lastSentTimestamp) < int(currentTimestamp):
currentSentBytes = asyncore.sentBytes
currentSentSpeed = int((currentSentBytes - lastSentBytes) / (currentTimestamp - lastSentTimestamp))
lastSentBytes = currentSentBytes
lastSentTimestamp = currentTimestamp
return currentSentSpeed
else:
return throttle.sendThrottle().getSpeed()
@ -37,7 +54,35 @@ def receivedBytes():
return throttle.ReceiveThrottle().total
def downloadSpeed():
global lastReceivedTimestamp, lastReceivedBytes, currentReceivedSpeed
if BMConfigParser().safeGetBoolean("network", "asyncore"):
return 0
currentTimestamp = time.time()
if int(lastReceivedTimestamp) < int(currentTimestamp):
currentReceivedBytes = asyncore.receivedBytes
currentReceivedSpeed = int((currentReceivedBytes - lastReceivedBytes) / (currentTimestamp - lastReceivedTimestamp))
lastReceivedBytes = currentReceivedBytes
lastReceivedTimestamp = currentTimestamp
return currentReceivedSpeed
else:
return throttle.ReceiveThrottle().getSpeed()
def pendingDownload():
if BMConfigParser().safeGetBoolean("network", "asyncore"):
tmp = {}
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
for k in connection.objectsNewToMe.keys():
tmp[k] = True
return len(tmp)
else:
return PendingDownloadQueue.totalSize()
def pendingUpload():
if BMConfigParser().safeGetBoolean("network", "asyncore"):
return 0
tmp = {}
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
for k in connection.objectsNewToThem.keys():
tmp[k] = True
return len(tmp)
else:
return PendingUpload().len()

View File

@ -36,7 +36,7 @@ import protocol
class TCPConnection(BMProto, TLSDispatcher):
def __init__(self, address=None, sock=None):
AdvancedDispatcher.__init__(self, sock)
BMProto.__init__(self, address=address, sock=sock)
self.verackReceived = False
self.verackSent = False
self.streams = [0]
@ -60,7 +60,12 @@ class TCPConnection(BMProto, TLSDispatcher):
TLSDispatcher.__init__(self, sock, server_side=False)
self.connect(self.destination)
logger.debug("Connecting to %s:%i", self.destination.host, self.destination.port)
shared.connectedHostsList[self.destination] = 0
encodedAddr = protocol.encodeHost(self.destination.host)
if protocol.checkIPAddress(encodedAddr, True) and not protocol.checkSocksIP(self.destination.host):
self.local = True
else:
self.local = False
#shared.connectedHostsList[self.destination] = 0
ObjectTracker.__init__(self)
UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
self.bm_proto_reset()
@ -83,6 +88,9 @@ class TCPConnection(BMProto, TLSDispatcher):
self.skipUntil = time.time() + delay
def set_connection_fully_established(self):
if not self.isOutbound and not self.local:
shared.clientHasReceivedIncomingConnections = True
UISignalQueue.put(('setStatusIcon', 'green'))
UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
self.antiIntersectionDelay(True)
self.fullyEstablished = True

View File

@ -7,6 +7,7 @@ import socket
import ssl
import sys
from debug import logger
from network.advanceddispatcher import AdvancedDispatcher
import network.asyncore_pollchoose as asyncore
import paths
@ -108,10 +109,10 @@ class TLSDispatcher(AdvancedDispatcher):
return False
# Perform the handshake.
try:
print "handshaking (internal)"
#print "handshaking (internal)"
self.sslSocket.do_handshake()
except ssl.SSLError, err:
print "%s:%i: handshake fail" % (self.destination.host, self.destination.port)
#print "%s:%i: handshake fail" % (self.destination.host, self.destination.port)
self.want_read = self.want_write = False
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
#print "want read"
@ -122,7 +123,7 @@ class TLSDispatcher(AdvancedDispatcher):
if not (self.want_write or self.want_read):
raise
else:
print "%s:%i: TLS handshake success%s" % (self.destination.host, self.destination.port, ", TLS protocol version: %s" % (self.sslSocket.version()) if sys.version_info >= (2, 7, 9) else "")
logger.debug("%s:%i: TLS handshake success%s", self.destination.host, self.destination.port, ", TLS protocol version: %s" % (self.sslSocket.version()) if sys.version_info >= (2, 7, 9) else "")
# The handshake has completed, so remove this channel and...
self.del_channel()
self.set_socket(self.sslSocket)

View File

@ -35,7 +35,7 @@ class UDPSocket(BMProto):
announceInterval = 60
def __init__(self, host=None, sock=None):
AdvancedDispatcher.__init__(self, sock)
BMProto.__init__(self, sock)
self.verackReceived = True
self.verackSent = True
# TODO sort out streams
@ -43,7 +43,6 @@ class UDPSocket(BMProto):
self.fullyEstablished = True
self.connectedAt = 0
self.skipUntil = 0
self.isOutbound = False
if sock is None:
if host is None:
host = ''
@ -51,7 +50,7 @@ class UDPSocket(BMProto):
self.create_socket(socket.AF_INET6, socket.SOCK_DGRAM)
else:
self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
print "binding to %s" % (host)
logger.info("Binding UDP socket to %s:%i", host, UDPSocket.port)
self.socket.bind((host, UDPSocket.port))
#BINDTODEVICE is only available on linux and requires root
#try:
@ -67,10 +66,11 @@ class UDPSocket(BMProto):
ObjectTracker.__init__(self)
self.connecting = False
self.connected = True
# packet was received from a local IP
self.local = False
self.set_state("bm_header")
def state_bm_command(self):
BMProto.state_bm_command(self)
# disable most commands before doing research / testing
# only addr (peer discovery), error and object are implemented
@ -163,7 +163,7 @@ class UDPSocket(BMProto):
return
try:
retval = self.socket.sendto(data, ('<broadcast>', UDPSocket.port))
print "broadcasted %ib" % (retval)
#print "broadcasted %ib" % (retval)
except socket.error as e:
print "socket error on sendato: %s" % (e)
self.writeQueue.task_done()

View File

@ -106,28 +106,35 @@ def checkIPAddress(host, private=False):
def checkIPv4Address(host, hostStandardFormat, private=False):
if host[0] == '\x7F': # 127/8
logger.debug('Ignoring IP address in loopback range: ' + hostStandardFormat)
if not private:
logger.debug('Ignoring IP address in loopback range: ' + hostStandardFormat)
return False
if host[0] == '\x0A': # 10/8
logger.debug('Ignoring IP address in private range: ' + hostStandardFormat)
if not private:
logger.debug('Ignoring IP address in private range: ' + hostStandardFormat)
return hostStandardFormat if private else False
if host[0:2] == '\xC0\xA8': # 192.168/16
logger.debug('Ignoring IP address in private range: ' + hostStandardFormat)
if not private:
logger.debug('Ignoring IP address in private range: ' + hostStandardFormat)
return hostStandardFormat if private else False
if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12
logger.debug('Ignoring IP address in private range:' + hostStandardFormat)
return False
if not private:
logger.debug('Ignoring IP address in private range:' + hostStandardFormat)
return hostStandardFormat if private else False
return False if private else hostStandardFormat
def checkIPv6Address(host, hostStandardFormat, private=False):
if host == ('\x00' * 15) + '\x01':
logger.debug('Ignoring loopback address: ' + hostStandardFormat)
if not private:
logger.debug('Ignoring loopback address: ' + hostStandardFormat)
return False
if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80:
logger.debug ('Ignoring local address: ' + hostStandardFormat)
if not private:
logger.debug ('Ignoring local address: ' + hostStandardFormat)
return hostStandardFormat if private else False
if (ord(host[0]) & 0xfe) == 0xfc:
logger.debug ('Ignoring unique local address: ' + hostStandardFormat)
if not private:
logger.debug ('Ignoring unique local address: ' + hostStandardFormat)
return hostStandardFormat if private else False
return False if private else hostStandardFormat

View File

@ -6,6 +6,7 @@ UISignalQueue = Queue.Queue()
addressGeneratorQueue = Queue.Queue()
# receiveDataThreads dump objects they hear on the network into this queue to be processed.
objectProcessorQueue = ObjectProcessorQueue()
invQueue = Queue.Queue()
portCheckerQueue = Queue.Queue()
peerDiscoveryQueue = Queue.Queue()
apiAddressGeneratorReturnQueue = Queue.Queue(

View File

@ -23,6 +23,8 @@ sqlReady = False # set to true by sqlTread when ready for processing
maximumNumberOfHalfOpenConnections = 0
invThread = None
# If the trustedpeer option is specified in keys.dat then this will
# contain a Peer which will be connected to instead of using the
# addresses advertised by other peers. The client will only connect to

View File

@ -111,8 +111,8 @@ class FilesystemInventory(InventoryStorage):
print "error loading %s" % (hexlify(hashId))
pass
self._inventory = newInventory
for i, v in self._inventory.items():
print "loaded stream: %s, %i items" % (i, len(v))
# for i, v in self._inventory.items():
# print "loaded stream: %s, %i items" % (i, len(v))
def stream_list(self):
return self._inventory.keys()