Asyncore update
- separate queue for processing blocking stuff on reception - rewrote write buffer as a queue - some addr handling - number of half open connections correct
This commit is contained in:
parent
51e52401fe
commit
e309a1edb3
|
@ -52,6 +52,7 @@ from bmconfigparser import BMConfigParser
|
||||||
|
|
||||||
from network.connectionpool import BMConnectionPool
|
from network.connectionpool import BMConnectionPool
|
||||||
from network.networkthread import BMNetworkThread
|
from network.networkthread import BMNetworkThread
|
||||||
|
from network.receivequeuethread import ReceiveQueueThread
|
||||||
|
|
||||||
# Helper Functions
|
# Helper Functions
|
||||||
import helper_bootstrap
|
import helper_bootstrap
|
||||||
|
@ -65,13 +66,13 @@ def connectToStream(streamNumber):
|
||||||
|
|
||||||
if isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections():
|
if isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections():
|
||||||
# Some XP and Vista systems can only have 10 outgoing connections at a time.
|
# Some XP and Vista systems can only have 10 outgoing connections at a time.
|
||||||
maximumNumberOfHalfOpenConnections = 9
|
state.maximumNumberOfHalfOpenConnections = 9
|
||||||
else:
|
else:
|
||||||
maximumNumberOfHalfOpenConnections = 64
|
state.maximumNumberOfHalfOpenConnections = 64
|
||||||
try:
|
try:
|
||||||
# don't overload Tor
|
# don't overload Tor
|
||||||
if BMConfigParser().get('bitmessagesettings', 'socksproxytype') != 'none':
|
if BMConfigParser().get('bitmessagesettings', 'socksproxytype') != 'none':
|
||||||
maximumNumberOfHalfOpenConnections = 4
|
state.maximumNumberOfHalfOpenConnections = 4
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -86,7 +87,7 @@ def connectToStream(streamNumber):
|
||||||
if BMConfigParser().safeGetBoolean("network", "asyncore"):
|
if BMConfigParser().safeGetBoolean("network", "asyncore"):
|
||||||
BMConnectionPool().connectToStream(streamNumber)
|
BMConnectionPool().connectToStream(streamNumber)
|
||||||
else:
|
else:
|
||||||
for i in range(maximumNumberOfHalfOpenConnections):
|
for i in range(state.maximumNumberOfHalfOpenConnections):
|
||||||
a = outgoingSynSender()
|
a = outgoingSynSender()
|
||||||
a.setup(streamNumber, selfInitiatedConnections)
|
a.setup(streamNumber, selfInitiatedConnections)
|
||||||
a.start()
|
a.start()
|
||||||
|
@ -252,6 +253,9 @@ class Main:
|
||||||
asyncoreThread = BMNetworkThread()
|
asyncoreThread = BMNetworkThread()
|
||||||
asyncoreThread.daemon = False
|
asyncoreThread.daemon = False
|
||||||
asyncoreThread.start()
|
asyncoreThread.start()
|
||||||
|
receiveQueueThread = ReceiveQueueThread()
|
||||||
|
receiveQueueThread.daemon = False
|
||||||
|
receiveQueueThread.start()
|
||||||
|
|
||||||
connectToStream(1)
|
connectToStream(1)
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from threading import RLock
|
import Queue
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import asyncore_pollchoose as asyncore
|
import asyncore_pollchoose as asyncore
|
||||||
|
@ -12,20 +12,16 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
||||||
asyncore.dispatcher.__init__(self, sock)
|
asyncore.dispatcher.__init__(self, sock)
|
||||||
self.read_buf = b""
|
self.read_buf = b""
|
||||||
self.write_buf = b""
|
self.write_buf = b""
|
||||||
self.writeLock = RLock()
|
self.writeQueue = Queue.Queue()
|
||||||
|
self.receiveQueue = Queue.Queue()
|
||||||
self.state = "init"
|
self.state = "init"
|
||||||
self.lastTx = time.time()
|
self.lastTx = time.time()
|
||||||
self.sentBytes = 0
|
self.sentBytes = 0
|
||||||
self.receivedBytes = 0
|
self.receivedBytes = 0
|
||||||
|
|
||||||
def append_write_buf(self, string = None):
|
|
||||||
with self.writeLock:
|
|
||||||
self.write_buf += string
|
|
||||||
|
|
||||||
def slice_write_buf(self, length=0):
|
def slice_write_buf(self, length=0):
|
||||||
if length > 0:
|
if length > 0:
|
||||||
with self.writeLock:
|
self.write_buf = self.write_buf[length:]
|
||||||
self.write_buf = self.write_buf[length:]
|
|
||||||
|
|
||||||
def slice_read_buf(self, length=0):
|
def slice_read_buf(self, length=0):
|
||||||
if length > 0:
|
if length > 0:
|
||||||
|
@ -54,7 +50,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
||||||
self.state = state
|
self.state = state
|
||||||
|
|
||||||
def writable(self):
|
def writable(self):
|
||||||
return self.connecting or len(self.write_buf) > 0
|
return self.connecting or len(self.write_buf) > 0 or not self.writeQueue.empty()
|
||||||
|
|
||||||
def readable(self):
|
def readable(self):
|
||||||
return self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len
|
return self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len
|
||||||
|
@ -74,18 +70,28 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
||||||
def handle_write(self):
|
def handle_write(self):
|
||||||
self.lastTx = time.time()
|
self.lastTx = time.time()
|
||||||
if asyncore.maxUploadRate > 0:
|
if asyncore.maxUploadRate > 0:
|
||||||
written = self.send(self.write_buf[0:asyncore.uploadChunk])
|
bufSize = asyncore.uploadChunk
|
||||||
asyncore.uploadBucket -= written
|
|
||||||
else:
|
else:
|
||||||
written = self.send(self.write_buf)
|
bufSize = self._buf_len
|
||||||
asyncore.updateSent(written)
|
while len(self.write_buf) < bufSize:
|
||||||
self.sentBytes += written
|
try:
|
||||||
self.slice_write_buf(written)
|
self.write_buf += self.writeQueue.get(False)
|
||||||
|
except Queue.Empty:
|
||||||
|
break
|
||||||
|
if len(self.write_buf) > 0:
|
||||||
|
written = self.send(self.write_buf[0:bufSize])
|
||||||
|
asyncore.uploadBucket -= written
|
||||||
|
asyncore.updateSent(written)
|
||||||
|
self.sentBytes += written
|
||||||
|
self.slice_write_buf(written)
|
||||||
|
|
||||||
def handle_connect(self):
|
def handle_connect(self):
|
||||||
self.lastTx = time.time()
|
self.lastTx = time.time()
|
||||||
self.process()
|
self.process()
|
||||||
|
|
||||||
|
def state_close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.read_buf = b""
|
self.read_buf = b""
|
||||||
self.write_buf = b""
|
self.write_buf = b""
|
||||||
|
|
|
@ -58,7 +58,7 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
self.verackSent = False
|
self.verackSent = False
|
||||||
self.lastTx = time.time()
|
self.lastTx = time.time()
|
||||||
self.streams = [0]
|
self.streams = [0]
|
||||||
self.connectionFullyEstablished = False
|
self.fullyEstablished = False
|
||||||
self.connectedAt = 0
|
self.connectedAt = 0
|
||||||
self.skipUntil = 0
|
self.skipUntil = 0
|
||||||
if address is None and sock is not None:
|
if address is None and sock is not None:
|
||||||
|
@ -95,8 +95,8 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
def state_init(self):
|
def state_init(self):
|
||||||
self.bm_proto_reset()
|
self.bm_proto_reset()
|
||||||
if self.isOutbound:
|
if self.isOutbound:
|
||||||
self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False))
|
self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False))
|
||||||
print "%s:%i: Sending version (%ib)" % (self.destination.host, self.destination.port, len(self.write_buf))
|
print "%s:%i: Sending version" % (self.destination.host, self.destination.port)
|
||||||
self.set_state("bm_header")
|
self.set_state("bm_header")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -114,12 +114,12 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
logger.debug("Skipping processing for %.2fs", self.skipUntil - time.time())
|
logger.debug("Skipping processing for %.2fs", self.skipUntil - time.time())
|
||||||
else:
|
else:
|
||||||
logger.debug("Skipping processing due to missing object for %.2fs", self.skipUntil - time.time())
|
logger.debug("Skipping processing due to missing object for %.2fs", self.skipUntil - time.time())
|
||||||
self.skipUntil = time.time() + now
|
self.skipUntil = time.time() + delay
|
||||||
|
|
||||||
def set_connection_fully_established(self):
|
def set_connection_fully_established(self):
|
||||||
UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||||
self.antiIntersectionDelay(True)
|
self.antiIntersectionDelay(True)
|
||||||
self.connectionFullyEstablished = True
|
self.fullyEstablished = True
|
||||||
self.sendAddr()
|
self.sendAddr()
|
||||||
self.sendBigInv()
|
self.sendBigInv()
|
||||||
|
|
||||||
|
@ -135,6 +135,8 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
self.bm_proto_reset()
|
self.bm_proto_reset()
|
||||||
self.set_state("bm_header", 1)
|
self.set_state("bm_header", 1)
|
||||||
print "Bad magic"
|
print "Bad magic"
|
||||||
|
self.close()
|
||||||
|
return False
|
||||||
if self.payloadLength > BMConnection.maxMessageSize:
|
if self.payloadLength > BMConnection.maxMessageSize:
|
||||||
self.invalid = True
|
self.invalid = True
|
||||||
self.set_state("bm_command", protocol.Header.size)
|
self.set_state("bm_command", protocol.Header.size)
|
||||||
|
@ -150,7 +152,7 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
print "Bad checksum, ignoring"
|
print "Bad checksum, ignoring"
|
||||||
self.invalid = True
|
self.invalid = True
|
||||||
retval = True
|
retval = True
|
||||||
if not self.connectionFullyEstablished and self.command not in ("version", "verack"):
|
if not self.fullyEstablished and self.command not in ("version", "verack"):
|
||||||
logger.error("Received command %s before connection was fully established, ignoring", self.command)
|
logger.error("Received command %s before connection was fully established, ignoring", self.command)
|
||||||
self.invalid = True
|
self.invalid = True
|
||||||
if not self.invalid:
|
if not self.invalid:
|
||||||
|
@ -178,7 +180,10 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
except struct.error:
|
except struct.error:
|
||||||
print "decoding error, skipping"
|
print "decoding error, skipping"
|
||||||
else:
|
else:
|
||||||
print "Skipping command %s due to invalid data" % (self.command)
|
#print "Skipping command %s due to invalid data" % (self.command)
|
||||||
|
print "Closing due to invalid data" % (self.command)
|
||||||
|
self.close()
|
||||||
|
return False
|
||||||
if retval:
|
if retval:
|
||||||
self.set_state("bm_header", self.payloadLength)
|
self.set_state("bm_header", self.payloadLength)
|
||||||
self.bm_proto_reset()
|
self.bm_proto_reset()
|
||||||
|
@ -298,12 +303,7 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
if False:
|
if False:
|
||||||
self.antiIntersectionDelay()
|
self.antiIntersectionDelay()
|
||||||
else:
|
else:
|
||||||
try:
|
self.receiveQueue.put(("object", i))
|
||||||
self.append_write_buf(protocol.CreatePacket('object', Inventory()[i].payload))
|
|
||||||
# this is faster than "if i in Inventory()"
|
|
||||||
except KeyError:
|
|
||||||
self.antiIntersectionDelay()
|
|
||||||
logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,))
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def bm_command_inv(self):
|
def bm_command_inv(self):
|
||||||
|
@ -327,7 +327,7 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
logger.info('inv message lists %i objects. Of those %i are new to me. It took %f seconds to figure that out.', len(items), len(self.objectsNewToMe), time.time()-startTime)
|
logger.info('inv message lists %i objects. Of those %i are new to me. It took %f seconds to figure that out.', len(items), len(self.objectsNewToMe), time.time()-startTime)
|
||||||
|
|
||||||
payload = addresses.encodeVarint(len(self.objectsNewToMe)) + ''.join(self.objectsNewToMe.keys())
|
payload = addresses.encodeVarint(len(self.objectsNewToMe)) + ''.join(self.objectsNewToMe.keys())
|
||||||
self.append_write_buf(protocol.CreatePacket('getdata', payload))
|
self.writeQueue.put(protocol.CreatePacket('getdata', payload))
|
||||||
|
|
||||||
# for i in random.sample(self.objectsNewToMe, len(self.objectsNewToMe)):
|
# for i in random.sample(self.objectsNewToMe, len(self.objectsNewToMe)):
|
||||||
# DownloadQueue().put(i)
|
# DownloadQueue().put(i)
|
||||||
|
@ -370,6 +370,18 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
|
|
||||||
def bm_command_addr(self):
|
def bm_command_addr(self):
|
||||||
addresses = self.decode_payload_content("lQIQ16sH")
|
addresses = self.decode_payload_content("lQIQ16sH")
|
||||||
|
import pprint
|
||||||
|
for i in addresses:
|
||||||
|
seenTime, stream, services, ip, port = i
|
||||||
|
decodedIP = protocol.checkIPAddress(ip)
|
||||||
|
if stream not in state.streamsInWhichIAmParticipating:
|
||||||
|
continue
|
||||||
|
#print "maybe adding %s in stream %i to knownnodes (%i)" % (decodedIP, stream, len(knownnodes.knownNodes[stream]))
|
||||||
|
if decodedIP is not False and seenTime > time.time() - 10800:
|
||||||
|
peer = state.Peer(decodedIP, port)
|
||||||
|
if peer in knownnodes.knownNodes[stream] and knownnodes.knownNodes[stream][peer] > seenTime:
|
||||||
|
continue
|
||||||
|
knownnodes.knownNodes[stream][peer] = seenTime
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def bm_command_portcheck(self):
|
def bm_command_portcheck(self):
|
||||||
|
@ -377,7 +389,7 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def bm_command_ping(self):
|
def bm_command_ping(self):
|
||||||
self.append_write_buf(protocol.CreatePacket('pong'))
|
self.writeQueue.put(protocol.CreatePacket('pong'))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def bm_command_pong(self):
|
def bm_command_pong(self):
|
||||||
|
@ -411,11 +423,11 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
# TODO ABORT
|
# TODO ABORT
|
||||||
return True
|
return True
|
||||||
#shared.connectedHostsList[self.destination] = self.streams[0]
|
#shared.connectedHostsList[self.destination] = self.streams[0]
|
||||||
self.append_write_buf(protocol.CreatePacket('verack'))
|
self.writeQueue.put(protocol.CreatePacket('verack'))
|
||||||
self.verackSent = True
|
self.verackSent = True
|
||||||
if not self.isOutbound:
|
if not self.isOutbound:
|
||||||
self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, True))
|
self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, True))
|
||||||
print "%s:%i: Sending version (%ib)" % (self.destination.host, self.destination.port, len(self.write_buf))
|
print "%s:%i: Sending version" % (self.destination.host, self.destination.port)
|
||||||
if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and
|
if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and
|
||||||
protocol.haveSSL(not self.isOutbound)):
|
protocol.haveSSL(not self.isOutbound)):
|
||||||
self.isSSL = True
|
self.isSSL = True
|
||||||
|
@ -431,20 +443,20 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
|
|
||||||
def peerValidityChecks(self):
|
def peerValidityChecks(self):
|
||||||
if self.remoteProtocolVersion < 3:
|
if self.remoteProtocolVersion < 3:
|
||||||
self.append_write_buf(protocol.assembleErrorMessage(fatal=2,
|
self.writeQueue.put(protocol.assembleErrorMessage(fatal=2,
|
||||||
errorText="Your is using an old protocol. Closing connection."))
|
errorText="Your is using an old protocol. Closing connection."))
|
||||||
logger.debug ('Closing connection to old protocol version %s, node: %s',
|
logger.debug ('Closing connection to old protocol version %s, node: %s',
|
||||||
str(self.remoteProtocolVersion), str(self.peer))
|
str(self.remoteProtocolVersion), str(self.peer))
|
||||||
return False
|
return False
|
||||||
if self.timeOffset > 3600:
|
if self.timeOffset > 3600:
|
||||||
self.append_write_buf(protocol.assembleErrorMessage(fatal=2,
|
self.writeQueue.put(protocol.assembleErrorMessage(fatal=2,
|
||||||
errorText="Your time is too far in the future compared to mine. Closing connection."))
|
errorText="Your time is too far in the future compared to mine. Closing connection."))
|
||||||
logger.info("%s's time is too far in the future (%s seconds). Closing connection to it.",
|
logger.info("%s's time is too far in the future (%s seconds). Closing connection to it.",
|
||||||
self.peer, self.timeOffset)
|
self.peer, self.timeOffset)
|
||||||
shared.timeOffsetWrongCount += 1
|
shared.timeOffsetWrongCount += 1
|
||||||
return False
|
return False
|
||||||
elif self.timeOffset < -3600:
|
elif self.timeOffset < -3600:
|
||||||
self.append_write_buf(protocol.assembleErrorMessage(fatal=2,
|
self.writeQueue.put(protocol.assembleErrorMessage(fatal=2,
|
||||||
errorText="Your time is too far in the past compared to mine. Closing connection."))
|
errorText="Your time is too far in the past compared to mine. Closing connection."))
|
||||||
logger.info("%s's time is too far in the past (timeOffset %s seconds). Closing connection to it.",
|
logger.info("%s's time is too far in the past (timeOffset %s seconds). Closing connection to it.",
|
||||||
self.peer, self.timeOffset)
|
self.peer, self.timeOffset)
|
||||||
|
@ -453,7 +465,7 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
else:
|
else:
|
||||||
shared.timeOffsetWrongCount = 0
|
shared.timeOffsetWrongCount = 0
|
||||||
if len(self.streams) == 0:
|
if len(self.streams) == 0:
|
||||||
self.append_write_buf(protocol.assembleErrorMessage(fatal=2,
|
self.writeQueue.put(protocol.assembleErrorMessage(fatal=2,
|
||||||
errorText="We don't have shared stream interests. Closing connection."))
|
errorText="We don't have shared stream interests. Closing connection."))
|
||||||
logger.debug ('Closed connection to %s because there is no overlapping interest in streams.',
|
logger.debug ('Closed connection to %s because there is no overlapping interest in streams.',
|
||||||
str(self.peer))
|
str(self.peer))
|
||||||
|
@ -461,7 +473,7 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
if self.destination in network.connectionpool.BMConnectionPool().inboundConnections:
|
if self.destination in network.connectionpool.BMConnectionPool().inboundConnections:
|
||||||
try:
|
try:
|
||||||
if not protocol.checkSocksIP(self.destination.host):
|
if not protocol.checkSocksIP(self.destination.host):
|
||||||
self.append_write_buf(protocol.assembleErrorMessage(fatal=2,
|
self.writeQueue.put(protocol.assembleErrorMessage(fatal=2,
|
||||||
errorText="Too many connections from your IP. Closing connection."))
|
errorText="Too many connections from your IP. Closing connection."))
|
||||||
logger.debug ('Closed connection to %s because we are already connected to that IP.',
|
logger.debug ('Closed connection to %s because we are already connected to that IP.',
|
||||||
str(self.peer))
|
str(self.peer))
|
||||||
|
@ -474,7 +486,7 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
def sendChunk():
|
def sendChunk():
|
||||||
if addressCount == 0:
|
if addressCount == 0:
|
||||||
return
|
return
|
||||||
self.append_write_buf(protocol.CreatePacket('addr', \
|
self.writeQueue.put(protocol.CreatePacket('addr', \
|
||||||
addresses.encodeVarint(addressCount) + payload))
|
addresses.encodeVarint(addressCount) + payload))
|
||||||
|
|
||||||
# We are going to share a maximum number of 1000 addrs (per overlapping
|
# We are going to share a maximum number of 1000 addrs (per overlapping
|
||||||
|
@ -563,7 +575,7 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
if objectCount == 0:
|
if objectCount == 0:
|
||||||
return
|
return
|
||||||
logger.debug('Sending huge inv message with %i objects to just this one peer', objectCount)
|
logger.debug('Sending huge inv message with %i objects to just this one peer', objectCount)
|
||||||
self.append_write_buf(protocol.CreatePacket('inv', addresses.encodeVarint(objectCount) + payload))
|
self.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(objectCount) + payload))
|
||||||
|
|
||||||
# Select all hashes for objects in this stream.
|
# Select all hashes for objects in this stream.
|
||||||
bigInvList = {}
|
bigInvList = {}
|
||||||
|
@ -613,6 +625,7 @@ class BMConnection(TLSDispatcher, BMQueues):
|
||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
def close(self, reason=None):
|
def close(self, reason=None):
|
||||||
|
self.set_state("close")
|
||||||
if reason is None:
|
if reason is None:
|
||||||
print "%s:%i: closing" % (self.destination.host, self.destination.port)
|
print "%s:%i: closing" % (self.destination.host, self.destination.port)
|
||||||
#traceback.print_stack()
|
#traceback.print_stack()
|
||||||
|
@ -653,7 +666,10 @@ class BMServer(AdvancedDispatcher):
|
||||||
pair = self.accept()
|
pair = self.accept()
|
||||||
if pair is not None:
|
if pair is not None:
|
||||||
sock, addr = pair
|
sock, addr = pair
|
||||||
network.connectionpool.BMConnectionPool().addConnection(BMConnection(sock=sock))
|
try:
|
||||||
|
network.connectionpool.BMConnectionPool().addConnection(BMConnection(sock=sock))
|
||||||
|
except socket.errno:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
@ -104,29 +104,32 @@ class BMConnectionPool(object):
|
||||||
print "bootstrapping dns"
|
print "bootstrapping dns"
|
||||||
helper_bootstrap.dns()
|
helper_bootstrap.dns()
|
||||||
self.bootstrapped = True
|
self.bootstrapped = True
|
||||||
for i in range(len(self.outboundConnections), BMConfigParser().safeGetInt("bitmessagesettings", "maxoutboundconnections")):
|
established = sum(1 for c in self.outboundConnections.values() if (c.connected and c.fullyEstablished))
|
||||||
chosen = chooseConnection(random.choice(self.streams))
|
pending = len(self.outboundConnections) - established
|
||||||
if chosen in self.outboundConnections:
|
if established < BMConfigParser().safeGetInt("bitmessagesettings", "maxoutboundconnections"):
|
||||||
continue
|
for i in range(state.maximumNumberOfHalfOpenConnections - pending):
|
||||||
if chosen.host in self.inboundConnections:
|
chosen = chooseConnection(random.choice(self.streams))
|
||||||
continue
|
if chosen in self.outboundConnections:
|
||||||
|
|
||||||
#for c in self.outboundConnections:
|
|
||||||
# if chosen == c.destination:
|
|
||||||
# continue
|
|
||||||
#for c in self.inboundConnections:
|
|
||||||
# if chosen.host == c.destination.host:
|
|
||||||
# continue
|
|
||||||
try:
|
|
||||||
if (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS5"):
|
|
||||||
self.addConnection(network.bmproto.Socks5BMConnection(chosen))
|
|
||||||
elif (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS4a"):
|
|
||||||
self.addConnection(network.bmproto.Socks4aBMConnection(chosen))
|
|
||||||
elif not chosen.host.endswith(".onion"):
|
|
||||||
self.addConnection(network.bmproto.BMConnection(chosen))
|
|
||||||
except socket.error as e:
|
|
||||||
if e.errno == errno.ENETUNREACH:
|
|
||||||
continue
|
continue
|
||||||
|
if chosen.host in self.inboundConnections:
|
||||||
|
continue
|
||||||
|
|
||||||
|
#for c in self.outboundConnections:
|
||||||
|
# if chosen == c.destination:
|
||||||
|
# continue
|
||||||
|
#for c in self.inboundConnections:
|
||||||
|
# if chosen.host == c.destination.host:
|
||||||
|
# continue
|
||||||
|
try:
|
||||||
|
if (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS5"):
|
||||||
|
self.addConnection(network.bmproto.Socks5BMConnection(chosen))
|
||||||
|
elif (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS4a"):
|
||||||
|
self.addConnection(network.bmproto.Socks4aBMConnection(chosen))
|
||||||
|
elif not chosen.host.endswith(".onion"):
|
||||||
|
self.addConnection(network.bmproto.BMConnection(chosen))
|
||||||
|
except socket.error as e:
|
||||||
|
if e.errno == errno.ENETUNREACH:
|
||||||
|
continue
|
||||||
|
|
||||||
if acceptConnections and len(self.listeningSockets) == 0:
|
if acceptConnections and len(self.listeningSockets) == 0:
|
||||||
self.startListening()
|
self.startListening()
|
||||||
|
@ -142,10 +145,10 @@ class BMConnectionPool(object):
|
||||||
|
|
||||||
for i in self.inboundConnections.values() + self.outboundConnections.values():
|
for i in self.inboundConnections.values() + self.outboundConnections.values():
|
||||||
minTx = time.time() - 20
|
minTx = time.time() - 20
|
||||||
if i.connectionFullyEstablished:
|
if i.fullyEstablished:
|
||||||
minTx -= 300 - 20
|
minTx -= 300 - 20
|
||||||
if i.lastTx < minTx:
|
if i.lastTx < minTx:
|
||||||
if i.connectionFullyEstablished:
|
if i.fullyEstablished:
|
||||||
i.append_write_buf(protocol.CreatePacket('ping'))
|
i.writeQueue.put(protocol.CreatePacket('ping'))
|
||||||
else:
|
else:
|
||||||
i.close("Timeout (%is)" % (time.time() - i.lastTx))
|
i.close("Timeout (%is)" % (time.time() - i.lastTx))
|
||||||
|
|
|
@ -8,7 +8,7 @@ from network.connectionpool import BMConnectionPool
|
||||||
|
|
||||||
class BMNetworkThread(threading.Thread, StoppableThread):
|
class BMNetworkThread(threading.Thread, StoppableThread):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
threading.Thread.__init__(self, name="BMNetworkThread")
|
threading.Thread.__init__(self, name="AsyncoreThread")
|
||||||
self.initStop()
|
self.initStop()
|
||||||
self.name = "AsyncoreThread"
|
self.name = "AsyncoreThread"
|
||||||
BMConnectionPool()
|
BMConnectionPool()
|
||||||
|
|
44
src/network/receivequeuethread.py
Normal file
44
src/network/receivequeuethread.py
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
import Queue
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from bmconfigparser import BMConfigParser
|
||||||
|
from debug import logger
|
||||||
|
from helper_threading import StoppableThread
|
||||||
|
from inventory import Inventory
|
||||||
|
from network.connectionpool import BMConnectionPool
|
||||||
|
import protocol
|
||||||
|
|
||||||
|
class ReceiveQueueThread(threading.Thread, StoppableThread):
|
||||||
|
def __init__(self):
|
||||||
|
threading.Thread.__init__(self, name="ReceiveQueueThread")
|
||||||
|
self.initStop()
|
||||||
|
self.name = "ReceiveQueueThread"
|
||||||
|
BMConnectionPool()
|
||||||
|
logger.error("init asyncore thread")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while not self._stopped:
|
||||||
|
processed = 0
|
||||||
|
for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
|
||||||
|
try:
|
||||||
|
command, args = i.receiveQueue.get(False)
|
||||||
|
except Queue.Empty:
|
||||||
|
continue
|
||||||
|
processed += 1
|
||||||
|
try:
|
||||||
|
getattr(self, "command_" + str(command))(i, args)
|
||||||
|
except AttributeError:
|
||||||
|
# missing command
|
||||||
|
raise
|
||||||
|
if processed == 0:
|
||||||
|
self.stop.wait(0.2)
|
||||||
|
|
||||||
|
def command_object(self, connection, objHash):
|
||||||
|
try:
|
||||||
|
connection.writeQueue.put(protocol.CreatePacket('object', Inventory()[objHash].payload))
|
||||||
|
except KeyError:
|
||||||
|
connection.antiIntersectionDelay()
|
||||||
|
logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (connection.destination,))
|
||||||
|
|
||||||
|
def stopThread(self):
|
||||||
|
super(ReceiveQueueThread, self).stopThread()
|
|
@ -59,28 +59,28 @@ class Socks4aConnection(Socks4a):
|
||||||
def state_auth_done(self):
|
def state_auth_done(self):
|
||||||
# Now we can request the actual connection
|
# Now we can request the actual connection
|
||||||
rmtrslv = False
|
rmtrslv = False
|
||||||
self.append_write_buf(struct.pack('>BBH', 0x04, 0x01, self.destination[1]))
|
self.writeQueue.put(struct.pack('>BBH', 0x04, 0x01, self.destination[1]))
|
||||||
# If the given destination address is an IP address, we'll
|
# If the given destination address is an IP address, we'll
|
||||||
# use the IPv4 address request even if remote resolving was specified.
|
# use the IPv4 address request even if remote resolving was specified.
|
||||||
try:
|
try:
|
||||||
self.ipaddr = socket.inet_aton(self.destination[0])
|
self.ipaddr = socket.inet_aton(self.destination[0])
|
||||||
self.append_write_buf(self.ipaddr)
|
self.writeQueue.put(self.ipaddr)
|
||||||
except socket.error:
|
except socket.error:
|
||||||
# Well it's not an IP number, so it's probably a DNS name.
|
# Well it's not an IP number, so it's probably a DNS name.
|
||||||
if Proxy._remote_dns:
|
if Proxy._remote_dns:
|
||||||
# Resolve remotely
|
# Resolve remotely
|
||||||
rmtrslv = True
|
rmtrslv = True
|
||||||
self.ipaddr = None
|
self.ipaddr = None
|
||||||
self.append_write_buf(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01))
|
self.writeQueue.put(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01))
|
||||||
else:
|
else:
|
||||||
# Resolve locally
|
# Resolve locally
|
||||||
self.ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0]))
|
self.ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0]))
|
||||||
self.append_write_buf(self.ipaddr)
|
self.writeQueue.put(self.ipaddr)
|
||||||
if self._auth:
|
if self._auth:
|
||||||
self.append_write_buf(self._auth[0])
|
self.writeQueue.put(self._auth[0])
|
||||||
self.append_write_buf(chr(0x00).encode())
|
self.writeQueue.put(chr(0x00).encode())
|
||||||
if rmtrslv:
|
if rmtrslv:
|
||||||
self.append_write_buf(self.destination[0] + chr(0x00).encode())
|
self.writeQueue.put(self.destination[0] + chr(0x00).encode())
|
||||||
self.set_state("pre_connect", 0)
|
self.set_state("pre_connect", 0)
|
||||||
|
|
||||||
|
|
||||||
|
@ -92,12 +92,12 @@ class Socks4aResolver(Socks4a):
|
||||||
|
|
||||||
def state_auth_done(self):
|
def state_auth_done(self):
|
||||||
# Now we can request the actual connection
|
# Now we can request the actual connection
|
||||||
self.append_write_buf(struct.pack('>BBH', 0x04, 0xF0, self.destination[1]))
|
self.writeQueue.put(struct.pack('>BBH', 0x04, 0xF0, self.destination[1]))
|
||||||
self.append_write_buf(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01))
|
self.writeQueue.put(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01))
|
||||||
if self._auth:
|
if self._auth:
|
||||||
self.append_write_buf(self._auth[0])
|
self.writeQueue.put(self._auth[0])
|
||||||
self.append_write_buf(chr(0x00).encode())
|
self.writeQueue.put(chr(0x00).encode())
|
||||||
self.append_write_buf(self.host + chr(0x00).encode())
|
self.writeQueue.put(self.host + chr(0x00).encode())
|
||||||
self.set_state("pre_connect", 0)
|
self.set_state("pre_connect", 0)
|
||||||
|
|
||||||
def resolved(self):
|
def resolved(self):
|
||||||
|
|
|
@ -17,9 +17,9 @@ class Socks5(Proxy):
|
||||||
|
|
||||||
def state_init(self):
|
def state_init(self):
|
||||||
if self._auth:
|
if self._auth:
|
||||||
self.append_write_buf(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02))
|
self.writeQueue.put(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02))
|
||||||
else:
|
else:
|
||||||
self.append_write_buf(struct.pack('BBB', 0x05, 0x01, 0x00))
|
self.writeQueue.put(struct.pack('BBB', 0x05, 0x01, 0x00))
|
||||||
self.set_state("auth_1", 0)
|
self.set_state("auth_1", 0)
|
||||||
|
|
||||||
def state_auth_1(self):
|
def state_auth_1(self):
|
||||||
|
@ -35,7 +35,7 @@ class Socks5(Proxy):
|
||||||
self.set_state("auth_done", 2)
|
self.set_state("auth_done", 2)
|
||||||
elif ret[1] == 2:
|
elif ret[1] == 2:
|
||||||
# username/password
|
# username/password
|
||||||
self.append_write_buf(struct.pack('BB', 1, len(self._auth[0])) + \
|
self.writeQueue.put(struct.pack('BB', 1, len(self._auth[0])) + \
|
||||||
self._auth[0] + struct.pack('B', len(self._auth[1])) + \
|
self._auth[0] + struct.pack('B', len(self._auth[1])) + \
|
||||||
self._auth[1])
|
self._auth[1])
|
||||||
self.set_state("auth_1", 2)
|
self.set_state("auth_1", 2)
|
||||||
|
@ -130,23 +130,23 @@ class Socks5Connection(Socks5):
|
||||||
|
|
||||||
def state_auth_done(self):
|
def state_auth_done(self):
|
||||||
# Now we can request the actual connection
|
# Now we can request the actual connection
|
||||||
self.append_write_buf(struct.pack('BBB', 0x05, 0x01, 0x00))
|
self.writeQueue.put(struct.pack('BBB', 0x05, 0x01, 0x00))
|
||||||
# If the given destination address is an IP address, we'll
|
# If the given destination address is an IP address, we'll
|
||||||
# use the IPv4 address request even if remote resolving was specified.
|
# use the IPv4 address request even if remote resolving was specified.
|
||||||
try:
|
try:
|
||||||
self.ipaddr = socket.inet_aton(self.destination[0])
|
self.ipaddr = socket.inet_aton(self.destination[0])
|
||||||
self.append_write_buf(chr(0x01).encode() + self.ipaddr)
|
self.writeQueue.put(chr(0x01).encode() + self.ipaddr)
|
||||||
except socket.error:
|
except socket.error:
|
||||||
# Well it's not an IP number, so it's probably a DNS name.
|
# Well it's not an IP number, so it's probably a DNS name.
|
||||||
if Proxy._remote_dns:
|
if Proxy._remote_dns:
|
||||||
# Resolve remotely
|
# Resolve remotely
|
||||||
self.ipaddr = None
|
self.ipaddr = None
|
||||||
self.append_write_buf(chr(0x03).encode() + chr(len(self.destination[0])).encode() + self.destination[0])
|
self.writeQueue.put(chr(0x03).encode() + chr(len(self.destination[0])).encode() + self.destination[0])
|
||||||
else:
|
else:
|
||||||
# Resolve locally
|
# Resolve locally
|
||||||
self.ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0]))
|
self.ipaddr = socket.inet_aton(socket.gethostbyname(self.destination[0]))
|
||||||
self.append_write_buf(chr(0x01).encode() + self.ipaddr)
|
self.writeQueue.put(chr(0x01).encode() + self.ipaddr)
|
||||||
self.append_write_buf(struct.pack(">H", self.destination[1]))
|
self.writeQueue.put(struct.pack(">H", self.destination[1]))
|
||||||
self.set_state("pre_connect", 0)
|
self.set_state("pre_connect", 0)
|
||||||
|
|
||||||
|
|
||||||
|
@ -158,9 +158,9 @@ class Socks5Resolver(Socks5):
|
||||||
|
|
||||||
def state_auth_done(self):
|
def state_auth_done(self):
|
||||||
# Now we can request the actual connection
|
# Now we can request the actual connection
|
||||||
self.append_write_buf(struct.pack('BBB', 0x05, 0xF0, 0x00))
|
self.writeQueue.put(struct.pack('BBB', 0x05, 0xF0, 0x00))
|
||||||
self.append_write_buf(chr(0x03).encode() + chr(len(self.host)).encode() + str(self.host))
|
self.writeQueue.put(chr(0x03).encode() + chr(len(self.host)).encode() + str(self.host))
|
||||||
self.append_write_buf(struct.pack(">H", self.port))
|
self.writeQueue.put(struct.pack(">H", self.port))
|
||||||
self.set_state("pre_connect", 0)
|
self.set_state("pre_connect", 0)
|
||||||
|
|
||||||
def resolved(self):
|
def resolved(self):
|
||||||
|
|
|
@ -67,7 +67,7 @@ def isBitSetWithinBitfield(fourByteString, n):
|
||||||
x, = unpack('>L', fourByteString)
|
x, = unpack('>L', fourByteString)
|
||||||
return x & 2**n != 0
|
return x & 2**n != 0
|
||||||
|
|
||||||
# data handling
|
# ip addresses
|
||||||
|
|
||||||
def encodeHost(host):
|
def encodeHost(host):
|
||||||
if host.find('.onion') > -1:
|
if host.find('.onion') > -1:
|
||||||
|
@ -86,6 +86,49 @@ def networkType(host):
|
||||||
else:
|
else:
|
||||||
return 'IPv6'
|
return 'IPv6'
|
||||||
|
|
||||||
|
def checkIPAddress(host):
|
||||||
|
if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||||
|
hostStandardFormat = socket.inet_ntop(socket.AF_INET, host[12:])
|
||||||
|
return checkIPv4Address(host[12:], hostStandardFormat)
|
||||||
|
elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43':
|
||||||
|
# Onion, based on BMD/bitcoind
|
||||||
|
hostStandardFormat = base64.b32encode(host[6:]).lower() + ".onion"
|
||||||
|
return hostStandardFormat
|
||||||
|
else:
|
||||||
|
hostStandardFormat = socket.inet_ntop(socket.AF_INET6, host)
|
||||||
|
if hostStandardFormat == "":
|
||||||
|
# This can happen on Windows systems which are not 64-bit compatible
|
||||||
|
# so let us drop the IPv6 address.
|
||||||
|
return False
|
||||||
|
return checkIPv6Address(host, hostStandardFormat)
|
||||||
|
|
||||||
|
def checkIPv4Address(host, hostStandardFormat):
|
||||||
|
if host[0] == '\x7F': # 127/8
|
||||||
|
logger.debug('Ignoring IP address in loopback range: ' + hostStandardFormat)
|
||||||
|
return False
|
||||||
|
if host[0] == '\x0A': # 10/8
|
||||||
|
logger.debug('Ignoring IP address in private range: ' + hostStandardFormat)
|
||||||
|
return False
|
||||||
|
if host[0:2] == '\xC0\xA8': # 192.168/16
|
||||||
|
logger.debug('Ignoring IP address in private range: ' + hostStandardFormat)
|
||||||
|
return False
|
||||||
|
if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12
|
||||||
|
logger.debug('Ignoring IP address in private range:' + hostStandardFormat)
|
||||||
|
return False
|
||||||
|
return hostStandardFormat
|
||||||
|
|
||||||
|
def checkIPv6Address(host, hostStandardFormat):
|
||||||
|
if host == ('\x00' * 15) + '\x01':
|
||||||
|
logger.debug('Ignoring loopback address: ' + hostStandardFormat)
|
||||||
|
return False
|
||||||
|
if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80:
|
||||||
|
logger.debug ('Ignoring local address: ' + hostStandardFormat)
|
||||||
|
return False
|
||||||
|
if (ord(host[0]) & 0xfe) == 0xfc:
|
||||||
|
logger.debug ('Ignoring unique local address: ' + hostStandardFormat)
|
||||||
|
return False
|
||||||
|
return hostStandardFormat
|
||||||
|
|
||||||
# checks
|
# checks
|
||||||
|
|
||||||
def haveSSL(server = False):
|
def haveSSL(server = False):
|
||||||
|
|
|
@ -21,6 +21,8 @@ curses = False
|
||||||
|
|
||||||
sqlReady = False # set to true by sqlTread when ready for processing
|
sqlReady = False # set to true by sqlTread when ready for processing
|
||||||
|
|
||||||
|
maximumNumberOfHalfOpenConnections = 0
|
||||||
|
|
||||||
# If the trustedpeer option is specified in keys.dat then this will
|
# If the trustedpeer option is specified in keys.dat then this will
|
||||||
# contain a Peer which will be connected to instead of using the
|
# contain a Peer which will be connected to instead of using the
|
||||||
# addresses advertised by other peers. The client will only connect to
|
# addresses advertised by other peers. The client will only connect to
|
||||||
|
|
Loading…
Reference in New Issue
Block a user