PyBitmessage/src/network/invthread.py

89 lines
3.0 KiB
Python
Raw Normal View History

from binascii import hexlify
import collections
import Queue
import random
import threading
import time
import addresses
from bmconfigparser import BMConfigParser
from debug import logger
from helper_threading import StoppableThread
from network.bmproto import BMProto
from network.connectionpool import BMConnectionPool
from queues import invQueue
import protocol
import state
class InvThread(threading.Thread, StoppableThread):
size = 10
def __init__(self):
threading.Thread.__init__(self, name="InvThread")
self.initStop()
self.name = "InvThread"
self.shutdown = False
self.collectionOfInvs = []
for i in range(InvThread.size):
self.collectionOfInvs.append({})
def run(self):
iterator = 0
while not state.shutdown:
while True:
try:
data = invQueue.get(False)
if len(data) == 2:
2017-06-24 12:13:35 +02:00
BMConnectionPool().handleReceivedObject(data[0], data[1])
else:
2017-06-24 12:13:35 +02:00
BMConnectionPool().handleReceivedObject(data[0], data[1], data[2])
2017-05-29 15:04:22 +02:00
self.holdHash (data[0], data[1])
except Queue.Empty:
break
2017-06-24 12:13:35 +02:00
if self.collectionOfInvs[iterator]:
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
hashes = []
for stream in connection.streams:
try:
for hashId in self.collectionOfInvs[iterator][stream]:
try:
with connection.objectsNewToThemLock:
del connection.objectsNewToThem[hashId]
hashes.append(hashId)
except KeyError:
pass
except KeyError:
continue
2017-06-24 12:13:35 +02:00
if hashes:
2017-05-29 14:35:08 +02:00
connection.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(len(hashes)) + "".join(hashes)))
self.collectionOfInvs[iterator] = {}
iterator += 1
iterator %= InvThread.size
self.stop.wait(1)
2017-06-24 12:13:35 +02:00
def holdHash(self, stream, hashId):
i = random.randrange(0, InvThread.size)
if stream not in self.collectionOfInvs[i]:
self.collectionOfInvs[i][stream] = []
2017-06-24 12:13:35 +02:00
self.collectionOfInvs[i][stream].append(hashId)
2017-06-24 12:13:35 +02:00
def hasHash(self, hashId):
for streamlist in self.collectionOfInvs:
for stream in streamlist:
2017-06-24 12:13:35 +02:00
if hashId in streamlist[stream]:
return True
return False
def hashCount(self):
retval = 0
for streamlist in self.collectionOfInvs:
for stream in streamlist:
retval += len(streamlist[stream])
return retval
def close(self):
self.shutdown = True