Big Asyncore update

- most of the stuff is done so it partially works
- disabled pollers other than select (debugging necessary)
- can switch in the settings, section network, option asyncore (defaults
to False)
This commit is contained in:
Peter Šurda 2017-05-24 16:51:49 +02:00
parent d498f1c0ae
commit d635e515b9
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
14 changed files with 928 additions and 190 deletions

View File

@ -50,6 +50,9 @@ from class_smtpDeliver import smtpDeliver
from class_smtpServer import smtpServer
from bmconfigparser import BMConfigParser
from network.connectionpool import BMConnectionPool
from network.networkthread import BMNetworkThread
# Helper Functions
import helper_bootstrap
import helper_generic
@ -80,10 +83,13 @@ def connectToStream(streamNumber):
if streamNumber*2+1 not in knownnodes.knownNodes:
knownnodes.knownNodes[streamNumber*2+1] = {}
for i in range(maximumNumberOfHalfOpenConnections):
a = outgoingSynSender()
a.setup(streamNumber, selfInitiatedConnections)
a.start()
if BMConfigParser().safeGetBoolean("network", "asyncore"):
BMConnectionPool().connectToStream(streamNumber)
else:
for i in range(maximumNumberOfHalfOpenConnections):
a = outgoingSynSender()
a.setup(streamNumber, selfInitiatedConnections)
a.start()
def _fixWinsock():
if not ('win32' in sys.platform) and not ('win64' in sys.platform):
@ -242,12 +248,18 @@ class Main:
singleAPIThread.daemon = True # close the main program even if there are threads left
singleAPIThread.start()
if BMConfigParser().safeGetBoolean("network", "asyncore"):
asyncoreThread = BMNetworkThread()
asyncoreThread.daemon = False
asyncoreThread.start()
connectToStream(1)
singleListenerThread = singleListener()
singleListenerThread.setup(selfInitiatedConnections)
singleListenerThread.daemon = True # close the main program even if there are threads left
singleListenerThread.start()
if not BMConfigParser().safeGetBoolean("network", "asyncore"):
singleListenerThread = singleListener()
singleListenerThread.setup(selfInitiatedConnections)
singleListenerThread.daemon = True # close the main program even if there are threads left
singleListenerThread.start()
if BMConfigParser().safeGetBoolean('bitmessagesettings','upnp'):
import upnp

View File

@ -1,4 +1,7 @@
import time
import asyncore_pollchoose as asyncore
from bmconfigparser import BMConfigParser
class AdvancedDispatcher(asyncore.dispatcher):
_buf_len = 2097152 # 2MB
@ -9,6 +12,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
self.read_buf = b""
self.write_buf = b""
self.state = "init"
self.lastTx = time.time()
def append_write_buf(self, string = None):
self.write_buf += string
@ -32,7 +36,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
return
while True:
try:
print "Trying to handle state \"%s\"" % (self.state)
# print "Trying to handle state \"%s\"" % (self.state)
if getattr(self, "state_" + str(self.state))() is False:
break
except AttributeError:
@ -50,13 +54,30 @@ class AdvancedDispatcher(asyncore.dispatcher):
return self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len
def handle_read(self):
print "handle_read"
self.read_buf += self.recv(AdvancedDispatcher._buf_len)
self.lastTx = time.time()
if asyncore.maxDownloadRate > 0:
newData = self.recv(asyncore.downloadChunk)
asyncore.downloadBucket -= len(newData)
self.read_buf += newData
else:
self.read_buf += self.recv(AdvancedDispatcher._buf_len)
self.process()
def handle_write(self):
written = self.send(self.write_buf)
self.lastTx = time.time()
if asyncore.maxUploadRate > 0:
written = self.send(self.write_buf[0:asyncore.uploadChunk])
asyncore.uploadBucket -= written
else:
written = self.send(self.write_buf)
self.slice_write_buf(written)
def handle_connect(self):
self.lastTx = time.time()
self.process()
def close(self):
self.read_buf = b""
self.write_buf = b""
self.state = "shutdown"
asyncore.dispatcher.close(self)

View File

@ -46,6 +46,8 @@ many of the difficult problems for you, making the task of building
sophisticated high-performance network servers and clients a snap.
"""
# randomise object order for bandwidth balancing
import random
import select
import socket
import sys
@ -56,6 +58,11 @@ import os
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
errorcode
try:
from errno import WSAEWOULDBLOCK
except:
pass
from ssl import SSLError, SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE
_DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
EBADF))
@ -81,6 +88,15 @@ class ExitNow(Exception):
_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
maxDownloadRate = 0
downloadChunk = 0
downloadTimestamp = 0
downloadBucket = 0
maxUploadRate = 0
uploadChunk = 0
uploadTimestamp = 0
uploadBucket = 0
def read(obj):
try:
obj.handle_read_event()
@ -97,6 +113,44 @@ def write(obj):
except:
obj.handle_error()
def set_rates(download, upload):
global maxDownloadRate, maxUploadRate, downloadChunk, uploadChunk, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp
maxDownloadRate = float(download)
if maxDownloadRate > 0:
downloadChunk = 1400
maxUploadRate = float(upload)
if maxUploadRate > 0:
uploadChunk = 1400
downloadBucket = maxDownloadRate
uploadBucket = maxUploadRate
downloadTimestamp = time.time()
uploadTimestamp = time.time()
def wait_tx_buckets():
global downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp
if maxDownloadRate > 0 and maxUploadRate > 0:
wait_for_this_long = min(maxDownloadRate / downloadChunk, maxUploadRate / uploadChunk)
elif maxDownloadRate > 0:
wait_for_this_long = maxDownloadRate / downloadChunk
elif maxUploadRate > 0:
wait_for_this_long = maxUploadRate / uploadChunk
else:
return
wait_for_this_long /= 2
if wait_for_this_long > 1:
wait_for_this_long = 1
elif wait_for_this_long < 0.1:
wait_for_this_long = 0.1
while downloadBucket < downloadChunk and uploadBucket < uploadChunk:
time.sleep(wait_for_this_long)
downloadBucket += (time.time() - downloadTimestamp) * maxDownloadRate
downloadTimestamp = time.time()
uploadBucket += (time.time() - uploadTimestamp) * maxUploadRate
uploadTimestamp = time.time()
def _exception(obj):
try:
obj.handle_expt_event()
@ -150,13 +204,13 @@ def select_poller(timeout=0.0, map=None):
except KeyboardInterrupt:
return
for fd in r:
for fd in random.sample(r, len(r)):
obj = map.get(fd)
if obj is None:
continue
read(obj)
for fd in w:
for fd in random.sample(w, len(w)):
obj = map.get(fd)
if obj is None:
continue
@ -204,7 +258,7 @@ def poll_poller(timeout=0.0, map=None):
r = poll_poller.pollster.poll(timeout)
except KeyboardInterrupt:
r = []
for fd, flags in r:
for fd, flags in random.sample(r, len(r)):
obj = map.get(fd)
if obj is None:
continue
@ -252,7 +306,7 @@ def epoll_poller(timeout=0.0, map=None):
if err.args[0] != EINTR:
raise
r = []
for fd, flags in r:
for fd, flags in random.sample(r, len(r)):
obj = map.get(fd)
if obj is None:
continue
@ -278,7 +332,7 @@ def kqueue_poller(timeout=0.0, map=None):
selectables += 1
events = kqueue.control(None, selectables, timeout)
for event in events:
for event in random.sample(events, len(events)):
fd = event.ident
obj = map.get(fd)
if obj is None:
@ -307,13 +361,18 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None,
elif hasattr(select, 'select'):
poller = select_poller
print "Poll loop using %s" % (poller.__name__)
poller = select_poller
# print "Poll loop using %s" % (poller.__name__)
if count is None:
while map:
wait_tx_buckets()
poller(timeout, map)
else:
timeout /= count
while map and count > 0:
wait_tx_buckets()
poller(timeout, map)
count = count - 1
@ -482,10 +541,17 @@ class dispatcher:
try:
result = self.socket.send(data)
return result
except socket.error as why:
if why.args[0] == EWOULDBLOCK:
except SSLError as err:
if err.errno == SSL_ERROR_WANT_WRITE:
return 0
elif why.args[0] in _DISCONNECTED:
else:
raise
except socket.error as why:
if why.errno in (errno.EAGAIN, errno.EWOULDBLOCK) or \
(sys.platform.startswith('win') and \
err.errno == errno.WSAEWOULDBLOCK):
return 0
elif why.errno in _DISCONNECTED:
self.handle_close()
return 0
else:
@ -501,9 +567,18 @@ class dispatcher:
return b''
else:
return data
except SSLError as err:
if err.errno == SSL_ERROR_WANT_READ:
return b''
else:
raise
except socket.error as why:
# winsock sometimes raises ENOTCONN
if why.args[0] in _DISCONNECTED:
if why.errno in (errno.EAGAIN, errno.EWOULDBLOCK) or \
(sys.platform.startswith('win') and \
err.errno == errno.WSAEWOULDBLOCK):
return b''
if why.errno in _DISCONNECTED:
self.handle_close()
return b''
else:

94
src/network/bmobject.py Normal file
View File

@ -0,0 +1,94 @@
from binascii import hexlify
import time
from addresses import calculateInventoryHash
from debug import logger
import protocol
import state
class BMObjectInsufficientPOWError(Exception): pass
class BMObjectInvalidDataError(Exception): pass
class BMObjectExpiredError(Exception): pass
class BMObjectUnwantedStreamError(Exception): pass
class BMObjectInvalidError(Exception): pass
class BMObjectAlreadyHaveError(Exception):
pass
class BMObject(object):
# max TTL, 28 days and 3 hours
maxTTL = 28 * 24 * 60 * 60 + 10800
# min TTL, 3 hour (in the past
minTTL = -3600
def __init__(self, nonce, expiresTime, objectType, version, streamNumber, data):
self.nonce = nonce
self.expiresTime = expiresTime
self.objectType = objectType
self.version = version
self.streamNumber = streamNumber
self.inventoryHash = calculateInventoryHash(data)
self.data = data
self.tag = ''
def checkProofOfWorkSufficient(self):
# Let us check to make sure that the proof of work is sufficient.
if not protocol.isProofOfWorkSufficient(self.data):
logger.info('Proof of work is insufficient.')
raise BMObjectInsufficientPOWError()
def checkEOLSanity(self):
# EOL sanity check
if self.expiresTime - int(time.time()) > BMObject.maxTTL:
logger.info('This object\'s End of Life time is too far in the future. Ignoring it. Time is %s' % self.expiresTime)
# TODO: remove from download queue
raise BMObjectExpiredError()
if self.expiresTime - int(time.time()) < BMObject.minTTL:
logger.info('This object\'s End of Life time was too long ago. Ignoring the object. Time is %s' % self.expiresTime)
# TODO: remove from download queue
raise BMObjectExpiredError()
def checkStream(self):
if self.streamNumber not in state.streamsInWhichIAmParticipating:
logger.debug('The streamNumber %s isn\'t one we are interested in.' % self.streamNumber)
raise BMObjectUnwantedStreamError()
def checkMessage(self):
return
def checkGetpubkey(self):
if len(self.data) < 42:
logger.info('getpubkey message doesn\'t contain enough data. Ignoring.')
raise BMObjectInvalidError()
def checkPubkey(self, tag):
if len(self.data) < 146 or len(self.data) > 440: # sanity check
logger.info('pubkey object too short or too long. Ignoring.')
raise BMObjectInvalidError()
if self.version >= 4:
self.tag = tag
logger.debug('tag in received pubkey is: %s' % hexlify(tag))
def checkBroadcast(self, tag):
if len(self.data) < 180:
logger.debug('The payload length of this broadcast packet is unreasonably low. Someone is probably trying funny business. Ignoring message.')
raise BMObjectInvalidError()
# this isn't supported anymore
if self.version < 2:
raise BMObjectInvalidError()
if self.version >= 3:
self.tag = tag
logger.debug('tag in received broadcast is: %s' % hexlify(tag))

View File

@ -1,28 +1,52 @@
import base64
from binascii import hexlify
import hashlib
import math
import time
from pprint import pprint
import socket
from struct import unpack
import struct
import random
import traceback
from addresses import calculateInventoryHash
from debug import logger
from inventory import Inventory
import knownnodes
from network.advanceddispatcher import AdvancedDispatcher
from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError
import network.connectionpool
from network.downloadqueue import DownloadQueue
from network.node import Node
import network.asyncore_pollchoose as asyncore
from network.proxy import Proxy, ProxyError, GeneralProxyError
from network.bmqueues import BMQueues
from network.socks5 import Socks5Connection, Socks5Resolver, Socks5AuthError, Socks5Error
from network.socks4a import Socks4aConnection, Socks4aResolver, Socks4aError
from network.uploadqueue import UploadQueue, UploadElem, AddrUploadQueue, ObjUploadQueue
from network.tls import TLSDispatcher
import addresses
from bmconfigparser import BMConfigParser
from queues import objectProcessorQueue
import shared
import state
import protocol
class BMProtoError(ProxyError): pass
class BMConnection(TLSDispatcher):
class BMProtoInsufficientDataError(BMProtoError): pass
class BMProtoExcessiveDataError(BMProtoError): pass
class BMConnection(TLSDispatcher, BMQueues):
# ~1.6 MB which is the maximum possible size of an inv message.
maxMessageSize = 1600100
# 2**18 = 256kB is the maximum size of an object payload
maxObjectPayloadSize = 2**18
# protocol specification says max 1000 addresses in one addr command
maxAddrCount = 1000
# protocol specification says max 50000 objects in one inv command
@ -32,46 +56,74 @@ class BMConnection(TLSDispatcher):
AdvancedDispatcher.__init__(self, sock)
self.verackReceived = False
self.verackSent = False
self.lastTx = time.time()
self.connectionFullyEstablished = False
self.connectedAt = 0
self.skipUntil = 0
if address is None and sock is not None:
self.destination = self.addr()
self.destination = state.Peer(sock.getpeername()[0], sock.getpeername()[1])
self.isOutbound = False
TLSDispatcher.__init__(self, sock, server_side=True)
print "received connection in background from %s:%i" % (self.destination[0], self.destination[1])
self.connectedAt = time.time()
print "received connection in background from %s:%i" % (self.destination.host, self.destination.port)
else:
self.destination = address
self.isOutbound = True
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(self.destination)
if ":" in address.host:
self.create_socket(socket.AF_INET6, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
TLSDispatcher.__init__(self, sock, server_side=False)
print "connecting in background to %s:%i" % (self.destination[0], self.destination[1])
self.connect(self.destination)
print "connecting in background to %s:%i" % (self.destination.host, self.destination.port)
shared.connectedHostsList[self.destination] = 0
BMQueues.__init__(self)
def bm_proto_reset(self):
self.magic = None
self.command = None
self.payloadLength = None
self.payloadLength = 0
self.checksum = None
self.payload = None
self.invalid = False
self.payloadOffset = 0
self.object = None
def state_init(self):
self.bm_proto_reset()
self.append_write_buf(protocol.assembleVersionMessage(self.destination[0], self.destination[1], (1,), False))
if True:
print "Sending version (%ib)" % len(self.write_buf)
self.set_state("bm_header")
return False
if self.isOutbound:
self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False))
print "%s:%i: Sending version (%ib)" % (self.destination.host, self.destination.port, len(self.write_buf))
self.set_state("bm_header")
return True
def state_bm_ready(self):
print "doing bm ready"
def antiIntersectionDelay(self, initial = False):
# estimated time for a small object to propagate across the whole network
delay = math.ceil(math.log(max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) + 2, 20)) * (0.2 + UploadQueue.queueCount/2)
# take the stream with maximum amount of nodes
# +2 is to avoid problems with log(0) and log(1)
# 20 is avg connected nodes count
# 0.2 is avg message transmission time
if delay > 0:
if initial:
self.skipUntil = self.connectedAt + delay
if self.skipUntil > time.time():
logger.debug("Skipping processing for %.2fs", self.skipUntil - time.time())
else:
logger.debug("Skipping processing due to missing object for %.2fs", self.skipUntil - time.time())
self.skipUntil = time.time() + now
def set_connection_fully_established(self):
self.antiIntersectionDelay(True)
self.connectionFullyEstablished = True
self.sendAddr()
self.sendBigInv()
self.set_state("bm_header")
return False
def state_bm_header(self):
#print "%s:%i: header" % (self.destination.host, self.destination.port)
if len(self.read_buf) < protocol.Header.size:
print "Length below header size"
#print "Length below header size"
return False
self.magic, self.command, self.payloadLength, self.checksum = protocol.Header.unpack(self.read_buf[:protocol.Header.size])
self.command = self.command.rstrip('\x00')
@ -87,20 +139,41 @@ class BMConnection(TLSDispatcher):
def state_bm_command(self):
if len(self.read_buf) < self.payloadLength:
print "Length below announced object length"
#print "Length below announced object length"
return False
print "received %s (%ib)" % (self.command, self.payloadLength)
print "%s:%i: command %s (%ib)" % (self.destination.host, self.destination.port, self.command, self.payloadLength)
self.payload = self.read_buf[:self.payloadLength]
if self.checksum != hashlib.sha512(self.payload).digest()[0:4]:
print "Bad checksum, ignoring"
self.invalid = True
retval = True
if not self.connectionFullyEstablished and self.command not in ("version", "verack"):
logger.error("Received command %s before connection was fully established, ignoring", self.command)
self.invalid = True
if not self.invalid:
try:
retval = getattr(self, "bm_command_" + str(self.command).lower())()
except AttributeError:
# unimplemented command
print "unimplemented command %s" % (self.command)
except BMProtoInsufficientDataError:
print "packet length too short, skipping"
except BMProtoExcessiveDataError:
print "too much data, skipping"
except BMObjectInsufficientPOWError:
print "insufficient PoW, skipping"
except BMObjectInvalidDataError:
print "object invalid data, skipping"
except BMObjectExpiredError:
print "object expired, skipping"
except BMObjectUnwantedStreamError:
print "object not in wanted stream, skipping"
except BMObjectInvalidError:
print "object invalid, skipping"
except BMObjectAlreadyHaveError:
print "already got object, skipping"
except struct.error:
print "decoding error, skipping"
else:
print "Skipping command %s due to invalid data" % (self.command)
if retval:
@ -120,11 +193,24 @@ class BMConnection(TLSDispatcher):
return value
def decode_payload_node(self):
services, address, port = self.decode_payload_content("Q16sH")
return Node(services, address, port)
services, host, port = self.decode_payload_content("Q16sH")
if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
host = socket.inet_ntop(socket.AF_INET, host[12:])
elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43':
# Onion, based on BMD/bitcoind
host = base64.b32encode(host[6:]).lower() + ".onion"
else:
host = socket.inet_ntop(socket.AF_INET6, host)
if host == "":
# This can happen on Windows systems which are not 64-bit compatible
# so let us drop the IPv6 address.
host = socket.inet_ntop(socket.AF_INET, host[12:])
return Node(services, host, port)
def decode_payload_content(self, pattern = "v"):
# l = varint indicating the length of the next item
# l = varint indicating the length of the next array
# L = varint indicating the length of the next item
# v = varint (or array)
# H = uint16
# I = uint32
@ -135,86 +221,171 @@ class BMConnection(TLSDispatcher):
# , = end of array
retval = []
size = 0
size = None
insideDigit = False
i = 0
for i in range(len(pattern)):
if pattern[i] in "0123456789":
while i < len(pattern):
if pattern[i] in "0123456789" and (i == 0 or pattern[i-1] not in "lL"):
if size is None:
size = 0
size = size * 10 + int(pattern[i])
i += 1
continue
elif pattern[i] == "l":
elif pattern[i] == "l" and size is None:
size = self.decode_payload_varint()
i += 1
continue
if size > 0:
innerval = []
elif pattern[i] == "L" and size is None:
size = self.decode_payload_varint()
i += 1
continue
if size is not None:
if pattern[i] == "s":
retval.append(self.payload[self.payloadOffset:self.payloadOffset + size])
self.payloadOffset += size
i += 1
else:
if "," in pattern[i:]:
subpattern = pattern[i:pattern.index(",")]
else:
subpattern = pattern[i:]
for j in range(size):
if "," in pattern[i:]:
retval.append(self.decode_payload_content(pattern[i:pattern.index(",")]))
if pattern[i-1:i] == "L":
retval.extend(self.decode_payload_content(subpattern))
else:
retval.append(self.decode_payload_content(pattern[i:]))
size = 0
retval.append(self.decode_payload_content(subpattern))
i += len(subpattern)
size = None
else:
if pattern[i] == "v":
retval.append(self.decode_payload_varint())
if pattern[i] == "i":
retval.append(self.decode_payload_node())
if pattern[i] == "H":
retval.append(unpack(">H", self.payload[self.payloadOffset:self.payloadOffset+2])[0])
retval.append(struct.unpack(">H", self.payload[self.payloadOffset:self.payloadOffset+2])[0])
self.payloadOffset += 2
if pattern[i] == "I":
retval.append(unpack(">I", self.payload[self.payloadOffset:self.payloadOffset+4])[0])
retval.append(struct.unpack(">I", self.payload[self.payloadOffset:self.payloadOffset+4])[0])
self.payloadOffset += 4
if pattern[i] == "Q":
retval.append(unpack(">Q", self.payload[self.payloadOffset:self.payloadOffset+8])[0])
retval.append(struct.unpack(">Q", self.payload[self.payloadOffset:self.payloadOffset+8])[0])
self.payloadOffset += 8
i += 1
if self.payloadOffset > self.payloadLength:
print "Insufficient data %i/%i" % (self.payloadOffset, self.payloadLength)
raise BMProtoInsufficientDataError()
return retval
def bm_command_error(self):
fatalStatus, banTime, inventoryVector, errorText = self.decode_payload_content("vvlsls")
print "%s:%i error: %i, %s" % (self.destination.host, self.destination.port, fatalStatus, errorText)
return True
def bm_command_getdata(self):
items = self.decode_payload_content("l32s")
#self.antiIntersectionDelay(True) # only handle getdata requests if we have been connected long enough
items = self.decode_payload_content("L32s")
# if time.time() < self.skipUntil:
# print "skipping getdata"
# return True
for i in items:
logger.debug('received getdata request for item:' + hexlify(i))
if self.objectHashHolderInstance.hasHash(i):
print "received getdata request for item %s" % (hexlify(i))
#logger.debug('received getdata request for item:' + hexlify(i))
#if i in ObjUploadQueue.streamElems(1):
if False:
self.antiIntersectionDelay()
else:
if i in Inventory():
self.append_write_buf(protocol.CreatePacket('object', Inventory()[i].payload))
else:
#self.antiIntersectionDelay()
self.antiIntersectionDelay()
logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,))
return True
def bm_command_inv(self):
items = self.decode_payload_content("L32s")
if len(items) >= BMConnection.maxObjectCount:
logger.error("Too many items in inv message!")
raise BMProtoExcessiveDataError()
else:
print "items in inv: %i" % (len(items))
startTime = time.time()
#advertisedSet = set()
for i in items:
#advertisedSet.add(i)
self.handleReceivedObj(i)
#objectsNewToMe = advertisedSet
#for stream in self.streams:
#objectsNewToMe -= Inventory().hashes_by_stream(stream)
logger.info('inv message lists %i objects. Of those %i are new to me. It took %f seconds to figure that out.', len(items), len(self.objectsNewToMe), time.time()-startTime)
payload = addresses.encodeVarint(len(self.objectsNewToMe)) + ''.join(self.objectsNewToMe.keys())
self.append_write_buf(protocol.CreatePacket('getdata', payload))
# for i in random.sample(self.objectsNewToMe, len(self.objectsNewToMe)):
# DownloadQueue().put(i)
return True
def bm_command_object(self):
lengthOfTimeWeShouldUseToProcessThisMessage = shared.checkAndShareObjectWithPeers(self.payload)
self.downloadQueue.task_done(calculateInventoryHash(self.payload))
objectOffset = self.payloadOffset
nonce, expiresTime, objectType, version, streamNumber = self.decode_payload_content("QQIvv")
self.object = BMObject(nonce, expiresTime, objectType, version, streamNumber, self.payload)
if len(self.payload) - self.payloadOffset > BMConnection.maxObjectPayloadSize:
logger.info('The payload length of this object is too large (%s bytes). Ignoring it.' % len(self.payload) - self.payloadOffset)
raise BMProtoExcessiveDataError()
self.object.checkProofOfWorkSufficient()
self.object.checkEOLSanity()
self.object.checkStream()
try:
if self.object.objectType == protocol.OBJECT_GETPUBKEY:
self.object.checkGetpubkey()
elif self.object.objectType == protocol.OBJECT_PUBKEY:
self.object.checkPubkey(self.payload[self.payloadOffset:self.payloadOffset+32])
elif self.object.objectType == protocol.OBJECT_MSG:
self.object.checkMessage()
elif self.object.objectType == protocol.OBJECT_BROADCAST:
self.object.checkBroadcast(self.payload[self.payloadOffset:self.payloadOffset+32])
# other objects don't require other types of tests
except BMObjectAlreadyHaveError:
pass
else:
Inventory()[self.object.inventoryHash] = (
self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag)
objectProcessorQueue.put((self.object.objectType,self.object.data))
#DownloadQueue().task_done(self.object.inventoryHash)
network.connectionpool.BMConnectionPool().handleReceivedObject(self, self.object.streamNumber, self.object.inventoryHash)
#ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash))
#broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
return True
def bm_command_addr(self):
addresses = self.decode_payload_content("lQbQ16sH")
addresses = self.decode_payload_content("lQIQ16sH")
return True
def bm_command_ping(self):
self.append_write_buf(protocol.CreatePacket('pong'))
return True
def bm_command_pong(self):
# nothing really
pass
return True
def bm_command_verack(self):
self.verackReceived = True
if self.verackSent:
if self.isSSL:
self.set_state("tls_init", self.payloadLength)
self.bm_proto_reset()
return False
else:
self.set_state("bm_ready", self.payloadLength)
else:
self.set_state("bm_header", self.payloadLength)
self.bm_proto_reset()
return False
self.set_connection_fully_established()
return True
return True
def bm_command_version(self):
#self.remoteProtocolVersion, self.services, self.timestamp, padding1, self.myExternalIP, padding2, self.remoteNodeIncomingPort = protocol.VersionPacket.unpack(self.payload[:protocol.VersionPacket.size])
@ -223,24 +394,30 @@ class BMConnection(TLSDispatcher):
print "remoteProtocolVersion: %i" % (self.remoteProtocolVersion)
print "services: %08X" % (self.services)
print "time offset: %i" % (self.timestamp - int(time.time()))
print "my external IP: %s" % (self.sockNode.address)
print "my external IP: %s" % (self.sockNode.host)
print "remote node incoming port: %i" % (self.peerNode.port)
print "user agent: %s" % (self.userAgent)
if not self.peerValidityChecks():
# TODO ABORT
return True
shared.connectedHostsList[self.destination] = self.streams[0]
self.append_write_buf(protocol.CreatePacket('verack'))
self.verackSent = True
if not self.isOutbound:
self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, True))
print "%s:%i: Sending version (%ib)" % (self.destination.host, self.destination.port, len(self.write_buf))
if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and
protocol.haveSSL(not self.isOutbound)):
self.isSSL = True
if self.verackReceived:
if self.isSSL:
self.set_state("tls_init", self.payloadLength)
self.bm_proto_reset()
return False
else:
self.set_state("bm_ready", self.payloadLength)
self.bm_proto_reset()
return False
self.set_connection_fully_established()
return True
return True
def peerValidityChecks(self):
if self.remoteProtocolVersion < 3:
@ -271,14 +448,24 @@ class BMConnection(TLSDispatcher):
logger.debug ('Closed connection to %s because there is no overlapping interest in streams.',
str(self.peer))
return False
if self.destination in network.connectionpool.BMConnectionPool().inboundConnections:
try:
if not protocol.checkSocksIP(self.destination.host):
self.append_write_buf(protocol.assembleErrorMessage(fatal=2,
errorText="Too many connections from your IP. Closing connection."))
logger.debug ('Closed connection to %s because we are already connected to that IP.',
str(self.peer))
return False
except:
pass
return True
def sendAddr(self):
def sendChunk():
if numberOfAddressesInAddrMessage == 0:
if addressCount == 0:
return
self.append_write_buf(protocol.CreatePacket('addr', \
addresses.encodeVarint(numberOfAddressesInAddrMessage) + payload))
addresses.encodeVarint(addressCount) + payload))
# We are going to share a maximum number of 1000 addrs (per overlapping
# stream) with our peer. 500 from overlapping streams, 250 from the
@ -287,7 +474,7 @@ class BMConnection(TLSDispatcher):
# init
addressCount = 0
payload = ''
payload = b''
for stream in self.streams:
addrsInMyStream = {}
@ -320,42 +507,42 @@ class BMConnection(TLSDispatcher):
addrsInChildStreamRight = random.sample(filtered.items(), elemCount)
for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInMyStream:
addressCount += 1
payload += pack(
payload += struct.pack(
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
payload += pack('>I', stream)
payload += pack(
payload += struct.pack('>I', stream)
payload += struct.pack(
'>q', 1) # service bit flags offered by this node
payload += protocol.encodeHost(HOST)
payload += pack('>H', PORT) # remote port
payload += struct.pack('>H', PORT) # remote port
if addressCount >= BMConnection.maxAddrCount:
sendChunk()
payload = ''
payload = b''
addressCount = 0
for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamLeft:
addressCount += 1
payload += pack(
payload += struct.pack(
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
payload += pack('>I', stream * 2)
payload += pack(
payload += struct.pack('>I', stream * 2)
payload += struct.pack(
'>q', 1) # service bit flags offered by this node
payload += protocol.encodeHost(HOST)
payload += pack('>H', PORT) # remote port
payload += struct.pack('>H', PORT) # remote port
if addressCount >= BMConnection.maxAddrCount:
sendChunk()
payload = ''
payload = b''
addressCount = 0
for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamRight:
addressCount += 1
payload += pack(
payload += struct.pack(
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
payload += pack('>I', (stream * 2) + 1)
payload += pack(
payload += struct.pack('>I', (stream * 2) + 1)
payload += struct.pack(
'>q', 1) # service bit flags offered by this node
payload += protocol.encodeHost(HOST)
payload += pack('>H', PORT) # remote port
payload += struct.pack('>H', PORT) # remote port
if addressCount >= BMConnection.maxAddrCount:
sendChunk()
payload = ''
payload = b''
addressCount = 0
# flush
@ -365,19 +552,21 @@ class BMConnection(TLSDispatcher):
def sendChunk():
if objectCount == 0:
return
payload = encodeVarint(objectCount) + payload
logger.debug('Sending huge inv message with %i objects to just this one peer',
str(numberOfObjects))
self.append_write_buf(protocol.CreatePacket('inv', payload))
logger.debug('Sending huge inv message with %i objects to just this one peer', objectCount)
self.append_write_buf(protocol.CreatePacket('inv', addresses.encodeVarint(objectCount) + payload))
# Select all hashes for objects in this stream.
bigInvList = {}
for stream in self.streams:
for hash in Inventory().unexpired_hashes_by_stream(stream):
if not self.objectHashHolderInstance.hasHash(hash):
bigInvList[hash] = 0
bigInvList[hash] = 0
# for hash in ObjUploadQueue().streamHashes(stream):
# try:
# del bigInvList[hash]
# except KeyError:
# pass
objectCount = 0
payload = ''
payload = b''
# Now let us start appending all of these hashes together. They will be
# sent out in a big inv message to our new peer.
for hash, storedValue in bigInvList.items():
@ -385,12 +574,43 @@ class BMConnection(TLSDispatcher):
objectCount += 1
if objectCount >= BMConnection.maxObjectCount:
self.sendChunk()
payload = ''
payload = b''
objectCount = 0
# flush
sendChunk()
def handle_connect_event(self):
try:
asyncore.dispatcher.handle_connect_event(self)
self.connectedAt = time.time()
except socket.error as e:
print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e))
self.close()
def handle_read_event(self):
try:
asyncore.dispatcher.handle_read_event(self)
except socket.error as e:
print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e))
self.close()
def handle_write_event(self):
try:
asyncore.dispatcher.handle_write_event(self)
except socket.error as e:
print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e))
self.close()
def close(self, reason=None):
if reason is None:
print "%s:%i: closing" % (self.destination.host, self.destination.port)
#traceback.print_stack()
else:
print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason)
network.connectionpool.BMConnectionPool().removeConnection(self)
asyncore.dispatcher.close(self)
class Socks5BMConnection(Socks5Connection, BMConnection):
def __init__(self, address):
@ -411,24 +631,19 @@ class Socks4aBMConnection(Socks4aConnection, BMConnection):
class BMServer(AdvancedDispatcher):
port = 8444
def __init__(self, port=None):
def __init__(self, host='127.0.0.1', port=8444):
if not hasattr(self, '_map'):
AdvancedDispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
if port is None:
port = BMServer.port
self.bind(('127.0.0.1', port))
self.connections = 0
self.bind((host, port))
self.listen(5)
def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
BMConnection(sock=sock)
network.connectionpool.BMConnectionPool().addConnection(BMConnection(sock=sock))
if __name__ == "__main__":

95
src/network/bmqueues.py Normal file
View File

@ -0,0 +1,95 @@
import time
from inventory import Inventory
from network.downloadqueue import DownloadQueue
from network.uploadqueue import UploadQueue
haveBloom = False
try:
# pybloomfiltermmap
from pybloomfilter import BloomFilter
haveBloom = True
except ImportError:
try:
# pybloom
from pybloom import BloomFilter
haveBloom = True
except ImportError:
pass
# it isn't actually implemented yet so no point in turning it on
haveBloom = False
class BMQueues(object):
invCleanPeriod = 300
invInitialCapacity = 50000
invErrorRate = 0.03
def __init__(self):
self.objectsNewToMe = {}
self.objectsNewToThem = {}
self.initInvBloom()
self.initAddrBloom()
def initInvBloom(self):
if haveBloom:
# lock?
self.invBloom = BloomFilter(capacity=BMQueues.invInitialCapacity,
error_rate=BMQueues.invErrorRate)
def initAddrBloom(self):
if haveBloom:
# lock?
self.addrBloom = BloomFilter(capacity=BMQueues.invInitialCapacity,
error_rate=BMQueues.invErrorRate)
def clean(self):
if self.lastcleaned < time.time() - BMQueues.invCleanPeriod:
if haveBloom:
if PendingDownloadQueue().size() == 0:
self.initInvBloom()
self.initAddrBloom()
else:
# release memory
self.objectsNewToMe = self.objectsNewToMe.copy()
self.objectsNewToThem = self.objectsNewToThem.copy()
def hasObj(self, hashid):
if haveBloom:
return hashid in self.invBloom
else:
return hashid in self.objectsNewToMe
def handleReceivedObj(self, hashid):
if haveBloom:
self.invBloom.add(hashid)
elif hashid in Inventory():
try:
del self.objectsNewToThem[hashid]
except KeyError:
pass
else:
self.objectsNewToMe[hashid] = True
def hasAddr(self, addr):
if haveBloom:
return addr in self.invBloom
def addAddr(self, hashid):
if haveBloom:
self.addrBloom.add(hashid)
# addr sending -> per node upload queue, and flush every minute or so
# inv sending -> if not in bloom, inv immediately, otherwise put into a per node upload queue and flush every minute or so
# no bloom
# - if inv arrives
# - if we don't have it, add tracking and download queue
# - if we do have it, remove from tracking
# tracking downloads
# - per node hash of items the node has but we don't
# tracking inv
# - per node hash of items that neither the remote node nor we have
#

View File

@ -0,0 +1,11 @@
import random
from bmconfigparser import BMConfigParser
import knownnodes
import state
def chooseConnection(stream):
if state.trustedPeer:
return state.trustedPeer
else:
return random.choice(knownnodes.knownNodes[stream].keys())

View File

@ -0,0 +1,149 @@
import errno
import socket
import time
import random
from bmconfigparser import BMConfigParser
from debug import logger
import helper_bootstrap
import network.bmproto
from network.connectionchooser import chooseConnection
import network.asyncore_pollchoose as asyncore
import protocol
from singleton import Singleton
import shared
import state
@Singleton
class BMConnectionPool(object):
def __init__(self):
asyncore.set_rates(
BMConfigParser().safeGetInt("bitmessagesettings", "maxdownloadrate"),
BMConfigParser().safeGetInt("bitmessagesettings", "maxuploadrate"))
self.outboundConnections = {}
self.inboundConnections = {}
self.listeningSockets = {}
self.streams = []
self.bootstrapped = False
def handleReceivedObject(self, connection, streamNumber, hashid):
for i in self.inboundConnections.values() + self.outboundConnections.values():
if not isinstance(i, network.bmproto.BMConnection):
continue
if i == connection:
try:
del i.objectsNewToThem[hashid]
except KeyError:
pass
else:
try:
del i.objectsNewToThem[hashid]
except KeyError:
i.objectsNewToThem[hashid] = True
try:
del i.objectsNewToMe[hashid]
except KeyError:
pass
def connectToStream(self, streamNumber):
self.streams.append(streamNumber)
def addConnection(self, connection):
if connection.isOutbound:
self.outboundConnections[connection.destination] = connection
else:
if connection.destination.host in self.inboundConnections:
self.inboundConnections[connection.destination] = connection
else:
self.inboundConnections[connection.destination.host] = connection
def removeConnection(self, connection):
if connection.isOutbound:
try:
del self.outboundConnections[connection.destination]
except KeyError:
pass
else:
try:
del self.inboundConnections[connection.destination]
except KeyError:
try:
del self.inboundConnections[connection.destination.host]
except KeyError:
pass
def startListening(self):
port = BMConfigParser().safeGetInt("bitmessagesettings", "port")
if BMConfigParser().safeGet("bitmessagesettings", "onionhostname").endswith(".onion"):
host = BMConfigParser().safeGet("bitmessagesettigns", "onionbindip")
else:
host = '127.0.0.1'
if BMConfigParser().safeGetBoolean("bitmessagesettings", "sockslisten") or \
BMConfigParser().get("bitmessagesettings", "socksproxytype") == "none":
host = ''
self.listeningSockets[state.Peer(host, port)] = network.bmproto.BMServer(host=host, port=port)
def loop(self):
# defaults to empty loop if outbound connections are maxed
spawnConnections = False
acceptConnections = True
if BMConfigParser().safeGetBoolean('bitmessagesettings', 'dontconnect'):
acceptConnections = False
else:
spawnConnections = True
if BMConfigParser().safeGetBoolean('bitmessagesettings', 'sendoutgoingconnections'):
spawnConnections = True
if BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and \
(not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and \
".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname')):
acceptConnections = False
spawnConnections = False
if spawnConnections:
if not self.bootstrapped:
print "bootstrapping dns"
helper_bootstrap.dns()
self.bootstrapped = True
for i in range(len(self.outboundConnections), BMConfigParser().safeGetInt("bitmessagesettings", "maxoutboundconnections")):
chosen = chooseConnection(random.choice(self.streams))
if chosen in self.outboundConnections:
continue
if chosen.host in self.inboundConnections:
continue
#for c in self.outboundConnections:
# if chosen == c.destination:
# continue
#for c in self.inboundConnections:
# if chosen.host == c.destination.host:
# continue
try:
if (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS5"):
self.addConnection(network.bmproto.Socks5BMConnection(chosen))
elif (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS4a"):
self.addConnection(network.bmproto.Socks4aBMConnection(chosen))
elif not chosen.host.endswith(".onion"):
self.addConnection(network.bmproto.BMConnection(chosen))
except socket.error as e:
if e.errno == errno.ENETUNREACH:
continue
if acceptConnections and len(self.listeningSockets) == 0:
self.startListening()
logger.info('Listening for incoming connections.')
if len(self.listeningSockets) > 0 and not acceptConnections:
for i in self.listeningSockets:
i.close()
logger.info('Stopped listening for incoming connections.')
# while len(asyncore.socket_map) > 0 and state.shutdown == 0:
# print "loop, state = %s" % (proxy.state)
asyncore.loop(timeout=2.0, count=1)
for i in self.inboundConnections.values() + self.outboundConnections.values():
minTx = time.time() - 20
if i.connectionFullyEstablished:
minTx -= 300 - 20
if i.lastTx < minTx:
i.close("Timeout (%is)" % (time.time() - i.lastTx))

View File

@ -0,0 +1,12 @@
#import collections
from threading import current_thread, enumerate as threadingEnumerate, RLock
import Queue
import time
#from helper_sql import *
from singleton import Singleton
@Singleton
class DownloadQueue(Queue.Queue):
# keep a track of objects that have been advertised to us but we haven't downloaded them yet
maxWait = 300

View File

@ -0,0 +1,40 @@
import threading
from bmconfigparser import BMConfigParser
from debug import logger
from helper_threading import StoppableThread
import network.asyncore_pollchoose as asyncore
from network.connectionpool import BMConnectionPool
class BMNetworkThread(threading.Thread, StoppableThread):
def __init__(self):
threading.Thread.__init__(self, name="BMNetworkThread")
self.initStop()
self.name = "AsyncoreThread"
BMConnectionPool()
logger.error("init asyncore thread")
def run(self):
while not self._stopped:
BMConnectionPool().loop()
def stopThread(self):
super(BMNetworkThread, self).stopThread()
for i in BMConnectionPool().listeningSockets:
try:
i.close()
except:
pass
for i in BMConnectionPool().outboundConnections:
try:
i.close()
except:
pass
for i in BMConnectionPool().inboundConnections:
try:
i.close()
except:
pass
# just in case
asyncore.close_all()

View File

@ -1,66 +1,3 @@
import time
from inventory import PendingDownloadQueue
try:
# pybloomfiltermmap
from pybloomfilter import BloomFilter
except ImportError:
try:
# pybloom
from pybloom import BloomFilter
except ImportError:
# bundled pybloom
from fallback.pybloom import BloomFilter
class Node(object):
invCleanPeriod = 300
invInitialCapacity = 50000
invErrorRate = 0.03
def __init__(self):
self.initInvBloom()
self.initAddrBloom()
def initInvBloom(self):
# lock?
self.invBloom = BloomFilter(capacity=Node.invInitialCapacity,
error_rate=Node.invErrorRate)
def initAddrBloom(self):
# lock?
self.addrBloom = BloomFilter(capacity=Node.invInitialCapacity,
error_rate=Node.invErrorRate)
def cleanBloom(self):
if self.lastcleaned < time.time() - Node.invCleanPeriod:
if PendingDownloadQueue().size() == 0:
self.initInvBloom()
self.initAddrBloom()
def hasInv(self, hashid):
return hashid in self.invBloom
def addInv(self, hashid):
self.invBloom.add(hashid)
def hasAddr(self, hashid):
return hashid in self.invBloom
def addInv(self, hashid):
self.invBloom.add(hashid)
# addr sending -> per node upload queue, and flush every minute or so
# inv sending -> if not in bloom, inv immediately, otherwise put into a per node upload queue and flush every minute or so
# no bloom
# - if inv arrives
# - if we don't have it, add tracking and download queue
# - if we do have it, remove from tracking
# tracking downloads
# - per node hash of items the node has but we don't
# tracking inv
# - per node hash of items that neither the remote node nor we have
#
import collections
Node = collections.namedtuple('Node', ['services', 'host', 'port'])

View File

@ -60,14 +60,14 @@ class TLSDispatcher(AdvancedDispatcher):
def writable(self):
if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0:
print "tls writable, %r" % (self.want_write)
#print "tls writable, %r" % (self.want_write)
return self.want_write
else:
return AdvancedDispatcher.writable(self)
def readable(self):
if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0:
print "tls readable, %r" % (self.want_read)
#print "tls readable, %r" % (self.want_read)
return self.want_read
else:
return AdvancedDispatcher.readable(self)
@ -75,19 +75,19 @@ class TLSDispatcher(AdvancedDispatcher):
def handle_read(self):
# wait for write buffer flush
if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0:
print "handshaking (read)"
#print "handshaking (read)"
self.state_tls_handshake()
else:
print "not handshaking (read)"
#print "not handshaking (read)"
return AdvancedDispatcher.handle_read(self)
def handle_write(self):
# wait for write buffer flush
if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0:
print "handshaking (write)"
#print "handshaking (write)"
self.state_tls_handshake()
else:
print "not handshaking (write)"
#print "not handshaking (write)"
return AdvancedDispatcher.handle_write(self)
def state_tls_handshake(self):
@ -96,24 +96,25 @@ class TLSDispatcher(AdvancedDispatcher):
return False
# Perform the handshake.
try:
print "handshaking (internal)"
#print "handshaking (internal)"
self.sslSocket.do_handshake()
except ssl.SSLError, err:
print "handshake fail"
#print "%s:%i: handshake fail" % (self.destination.host, self.destination.port)
self.want_read = self.want_write = False
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
print "want read"
#print "want read"
self.want_read = True
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
print "want write"
if err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
#print "want write"
self.want_write = True
else:
if not (self.want_write or self.want_read):
raise
else:
print "handshake success"
print "%s:%i: handshake success" % (self.destination.host, self.destination.port)
# The handshake has completed, so remove this channel and...
self.del_channel()
self.set_socket(self.sslSocket)
self.tlsDone = True
self.state_bm_ready()
self.set_state("bm_header")
self.set_connection_fully_established()
return False

View File

@ -0,0 +1,70 @@
from collections import namedtuple
import Queue
import random
from threading import current_thread, enumerate as threadingEnumerate, RLock
import time
#from helper_sql import *
from singleton import Singleton
UploadElem = namedtuple("UploadElem", "stream identifier")
class UploadQueueDeadlineException(Exception):
pass
class UploadQueue(object):
queueCount = 10
def __init__(self):
self.queue = []
self.lastGet = 0
self.getIterator = 0
for i in range(UploadQueue.queueCount):
self.queue.append([])
def put(self, item):
self.queue[random.randrange(0, UploadQueue.queueCount)].append(item)
def get(self):
i = UploadQueue.queueCount
retval = []
while self.lastGet < time.time() - 1 and i > 0:
if len(self.queue) > 0:
retval.extend(self.queue[self.getIterator])
self.queue[self.getIterator] = []
self.lastGet += 1
# only process each queue once
i -= 1
self.getIterator = (self.getIterator + 1) % UploadQueue.queueCount
if self.lastGet < time.time() - 1:
self.lastGet = time.time()
return retval
def streamElems(self, stream):
retval = {}
for q in self.queue:
for elem in q:
if elem.stream == stream:
retval[elem.identifier] = True
return retval
def len(self):
retval = 0
for i in range(UploadQueue.queueCount):
retval += len(self.queue[i])
return retval
def stop(self):
for i in range(UploadQueue.queueCount):
self.queue[i] = []
@Singleton
class AddrUploadQueue(UploadQueue):
pass
@Singleton
class ObjUploadQueue(UploadQueue):
pass

View File

@ -32,6 +32,12 @@ STATUS_WARNING = 0
STATUS_ERROR = 1
STATUS_FATAL = 2
#Object types
OBJECT_GETPUBKEY = 0
OBJECT_PUBKEY = 1
OBJECT_MSG = 2
OBJECT_BROADCAST = 3
eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack(
'>Q', random.randrange(1, 18446744073709551615))