Remove non-asyncore network code (partial)

This commit is contained in:
Peter Šurda 2017-08-09 17:36:52 +02:00
parent 0b07b1c89a
commit 6c695c8ac7
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
5 changed files with 65 additions and 125 deletions

View File

@ -864,11 +864,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
objectType, toStreamNumber, encryptedPayload, int(time.time()) + TTL,'') objectType, toStreamNumber, encryptedPayload, int(time.time()) + TTL,'')
with shared.printLock: with shared.printLock:
print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', hexlify(inventoryHash) print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', hexlify(inventoryHash)
if BMConfigParser().get("network", "asyncore"): queues.invQueue.put((toStreamNumber, inventoryHash))
queues.invQueue.put((toStreamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
toStreamNumber, 'advertiseobject', inventoryHash))
def HandleTrashSentMessageByAckDAta(self, params): def HandleTrashSentMessageByAckDAta(self, params):
# This API method should only be used when msgid is not available # This API method should only be used when msgid is not available
@ -914,11 +910,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
objectType, pubkeyStreamNumber, payload, int(time.time()) + TTL,'') objectType, pubkeyStreamNumber, payload, int(time.time()) + TTL,'')
with shared.printLock: with shared.printLock:
print 'broadcasting inv within API command disseminatePubkey with hash:', hexlify(inventoryHash) print 'broadcasting inv within API command disseminatePubkey with hash:', hexlify(inventoryHash)
if BMConfigParser().get("network", "asyncore"): queues.invQueue.put((pubkeyStreamNumber, inventoryHash))
queues.invQueue.put((pubkeyStreamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
pubkeyStreamNumber, 'advertiseobject', inventoryHash))
def HandleGetMessageDataByDestinationHash(self, params): def HandleGetMessageDataByDestinationHash(self, params):
# Method will eventually be used by a particular Android app to # Method will eventually be used by a particular Android app to

View File

@ -92,13 +92,7 @@ def connectToStream(streamNumber):
if streamNumber*2+1 not in knownnodes.knownNodes: if streamNumber*2+1 not in knownnodes.knownNodes:
knownnodes.knownNodes[streamNumber*2+1] = {} knownnodes.knownNodes[streamNumber*2+1] = {}
if BMConfigParser().get("network", "asyncore"): BMConnectionPool().connectToStream(streamNumber)
BMConnectionPool().connectToStream(streamNumber)
else:
for i in range(state.maximumNumberOfHalfOpenConnections):
a = outgoingSynSender()
a.setup(streamNumber, selfInitiatedConnections)
a.start()
def _fixSocket(): def _fixSocket():
if sys.platform.startswith('linux'): if sys.platform.startswith('linux'):
@ -281,36 +275,29 @@ class Main:
singleAPIThread.daemon = True # close the main program even if there are threads left singleAPIThread.daemon = True # close the main program even if there are threads left
singleAPIThread.start() singleAPIThread.start()
if BMConfigParser().get("network", "asyncore"): BMConnectionPool()
BMConnectionPool() asyncoreThread = BMNetworkThread()
asyncoreThread = BMNetworkThread() asyncoreThread.daemon = True
asyncoreThread.daemon = True asyncoreThread.start()
asyncoreThread.start() for i in range(BMConfigParser().getint("threads", "receive")):
for i in range(BMConfigParser().getint("threads", "receive")): receiveQueueThread = ReceiveQueueThread(i)
receiveQueueThread = ReceiveQueueThread(i) receiveQueueThread.daemon = True
receiveQueueThread.daemon = True receiveQueueThread.start()
receiveQueueThread.start() announceThread = AnnounceThread()
announceThread = AnnounceThread() announceThread.daemon = True
announceThread.daemon = True announceThread.start()
announceThread.start() state.invThread = InvThread()
state.invThread = InvThread() state.invThread.daemon = True
state.invThread.daemon = True state.invThread.start()
state.invThread.start() state.addrThread = AddrThread()
state.addrThread = AddrThread() state.addrThread.daemon = True
state.addrThread.daemon = True state.addrThread.start()
state.addrThread.start() state.downloadThread = DownloadThread()
state.downloadThread = DownloadThread() state.downloadThread.daemon = True
state.downloadThread.daemon = True state.downloadThread.start()
state.downloadThread.start()
connectToStream(1) connectToStream(1)
if not BMConfigParser().get("network", "asyncore"):
singleListenerThread = singleListener()
singleListenerThread.setup(selfInitiatedConnections)
singleListenerThread.daemon = True # close the main program even if there are threads left
singleListenerThread.start()
if BMConfigParser().safeGetBoolean('bitmessagesettings','upnp'): if BMConfigParser().safeGetBoolean('bitmessagesettings','upnp'):
import upnp import upnp
upnpThread = upnp.uPnPThread() upnpThread = upnp.uPnPThread()

View File

@ -19,7 +19,6 @@ BMConfigDefaults = {
"receive": 3, "receive": 3,
}, },
"network": { "network": {
"asyncore": True,
"bind": '', "bind": '',
}, },
"inventory": { "inventory": {

View File

@ -192,11 +192,7 @@ class singleWorker(threading.Thread, StoppableThread):
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
if BMConfigParser().get("network", "asyncore"): queues.invQueue.put((streamNumber, inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
BMConfigParser().set( BMConfigParser().set(
@ -286,11 +282,7 @@ class singleWorker(threading.Thread, StoppableThread):
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
if BMConfigParser().get("network", "asyncore"): queues.invQueue.put((streamNumber, inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
BMConfigParser().set( BMConfigParser().set(
@ -380,11 +372,7 @@ class singleWorker(threading.Thread, StoppableThread):
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
if BMConfigParser().get("network", "asyncore"): queues.invQueue.put((streamNumber, inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
BMConfigParser().set( BMConfigParser().set(
@ -513,11 +501,7 @@ class singleWorker(threading.Thread, StoppableThread):
objectType, streamNumber, payload, embeddedTime, tag) objectType, streamNumber, payload, embeddedTime, tag)
PendingUpload().add(inventoryHash) PendingUpload().add(inventoryHash)
logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash)) logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash))
if BMConfigParser().get("network", "asyncore"): queues.invQueue.put((streamNumber, inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Broadcast sent on %1").arg(l10n.formatTimestamp())))) queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Broadcast sent on %1").arg(l10n.formatTimestamp()))))
@ -846,11 +830,7 @@ class singleWorker(threading.Thread, StoppableThread):
# not sending to a chan or one of my addresses # not sending to a chan or one of my addresses
queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Waiting for acknowledgement. Sent on %1").arg(l10n.formatTimestamp())))) queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Waiting for acknowledgement. Sent on %1").arg(l10n.formatTimestamp()))))
logger.info('Broadcasting inv for my msg(within sendmsg function):' + hexlify(inventoryHash)) logger.info('Broadcasting inv for my msg(within sendmsg function):' + hexlify(inventoryHash))
if BMConfigParser().get("network", "asyncore"): queues.invQueue.put((toStreamNumber, inventoryHash))
queues.invQueue.put((toStreamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
toStreamNumber, 'advertiseobject', inventoryHash))
# Update the sent message in the sent table with the necessary information. # Update the sent message in the sent table with the necessary information.
if BMConfigParser().has_section(toaddress) or not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK): if BMConfigParser().has_section(toaddress) or not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK):
@ -952,11 +932,7 @@ class singleWorker(threading.Thread, StoppableThread):
objectType, streamNumber, payload, embeddedTime, '') objectType, streamNumber, payload, embeddedTime, '')
PendingUpload().add(inventoryHash) PendingUpload().add(inventoryHash)
logger.info('sending inv (for the getpubkey message)') logger.info('sending inv (for the getpubkey message)')
if BMConfigParser().get("network", "asyncore"): queues.invQueue.put((streamNumber, inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
else:
protocol.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash))
# wait 10% past expiration # wait 10% past expiration
sleeptill = int(time.time() + TTL * 1.1) sleeptill = int(time.time() + TTL * 1.1)

View File

@ -15,67 +15,53 @@ lastSentBytes = 0
currentSentSpeed = 0 currentSentSpeed = 0
def connectedHostsList(): def connectedHostsList():
if BMConfigParser().get("network", "asyncore"): retval = []
retval = [] for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): if not i.fullyEstablished:
if not i.fullyEstablished: continue
continue try:
try: retval.append(i)
retval.append(i) except AttributeError:
except AttributeError: pass
pass return retval
return retval
return shared.connectedHostsList.items()
def sentBytes(): def sentBytes():
if BMConfigParser().get("network", "asyncore"): return asyncore.sentBytes
return asyncore.sentBytes
return throttle.SendThrottle().total
def uploadSpeed(): def uploadSpeed():
global lastSentTimestamp, lastSentBytes, currentSentSpeed global lastSentTimestamp, lastSentBytes, currentSentSpeed
if BMConfigParser().get("network", "asyncore"): currentTimestamp = time.time()
currentTimestamp = time.time() if int(lastSentTimestamp) < int(currentTimestamp):
if int(lastSentTimestamp) < int(currentTimestamp): currentSentBytes = asyncore.sentBytes
currentSentBytes = asyncore.sentBytes currentSentSpeed = int((currentSentBytes - lastSentBytes) / (currentTimestamp - lastSentTimestamp))
currentSentSpeed = int((currentSentBytes - lastSentBytes) / (currentTimestamp - lastSentTimestamp)) lastSentBytes = currentSentBytes
lastSentBytes = currentSentBytes lastSentTimestamp = currentTimestamp
lastSentTimestamp = currentTimestamp return currentSentSpeed
return currentSentSpeed
return throttle.sendThrottle().getSpeed()
def receivedBytes(): def receivedBytes():
if BMConfigParser().get("network", "asyncore"): return asyncore.receivedBytes
return asyncore.receivedBytes
return throttle.ReceiveThrottle().total
def downloadSpeed(): def downloadSpeed():
global lastReceivedTimestamp, lastReceivedBytes, currentReceivedSpeed global lastReceivedTimestamp, lastReceivedBytes, currentReceivedSpeed
if BMConfigParser().get("network", "asyncore"): currentTimestamp = time.time()
currentTimestamp = time.time() if int(lastReceivedTimestamp) < int(currentTimestamp):
if int(lastReceivedTimestamp) < int(currentTimestamp): currentReceivedBytes = asyncore.receivedBytes
currentReceivedBytes = asyncore.receivedBytes currentReceivedSpeed = int((currentReceivedBytes - lastReceivedBytes) / (currentTimestamp - lastReceivedTimestamp))
currentReceivedSpeed = int((currentReceivedBytes - lastReceivedBytes) / (currentTimestamp - lastReceivedTimestamp)) lastReceivedBytes = currentReceivedBytes
lastReceivedBytes = currentReceivedBytes lastReceivedTimestamp = currentTimestamp
lastReceivedTimestamp = currentTimestamp return currentReceivedSpeed
return currentReceivedSpeed
return throttle.ReceiveThrottle().getSpeed()
def pendingDownload(): def pendingDownload():
if BMConfigParser().get("network", "asyncore"): tmp = {}
tmp = {} for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): for k in connection.objectsNewToMe.keys():
for k in connection.objectsNewToMe.keys(): tmp[k] = True
tmp[k] = True return len(tmp)
return len(tmp)
return PendingDownloadQueue.totalSize()
def pendingUpload(): def pendingUpload():
if BMConfigParser().get("network", "asyncore"): return 0
return 0 tmp = {}
tmp = {} for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): for k in connection.objectsNewToThem.keys():
for k in connection.objectsNewToThem.keys(): tmp[k] = True
tmp[k] = True return len(tmp)
return len(tmp)
return PendingUpload().len()