Make objects.vectors_to_send() a generator of chunks
This commit is contained in:
parent
de4d1655e0
commit
e0fd798d22
|
@ -278,14 +278,11 @@ class ConnectionBase(threading.Thread):
|
|||
self.send_queue.put(message.Addr(addr))
|
||||
|
||||
if shared.objects:
|
||||
to_send = shared.objects.vectors_to_send()
|
||||
offset = 0
|
||||
while offset < len(to_send):
|
||||
for chunk in shared.objects.vectors_to_send(10000):
|
||||
# We limit size of inv messaged to 10000 entries
|
||||
# because they might time out
|
||||
# in very slow networks (I2P)
|
||||
self.send_queue.put(message.Inv(to_send[offset:offset+10000]))
|
||||
offset += 10000
|
||||
self.send_queue.put(message.Inv(chunk))
|
||||
self.status = 'fully_established'
|
||||
|
||||
def _process_queue(self):
|
||||
|
|
|
@ -111,13 +111,20 @@ class Inventory():
|
|||
self._pending.update(vectors)
|
||||
return vectors
|
||||
|
||||
def vectors_to_send(self, stream=None):
|
||||
def vectors_to_send(self, chunk_size=10000, stream=None):
|
||||
if stream is None:
|
||||
stream = shared.stream
|
||||
now = int(time.time())
|
||||
cur = self._db.execute(
|
||||
'SELECT vector FROM objects WHERE expires > ? AND stream = ?'
|
||||
' ORDER BY random()',
|
||||
(int(time.time()), stream or shared.stream)
|
||||
' ORDER BY random()', (now, stream)
|
||||
)
|
||||
return [v for v, in cur]
|
||||
cur.arraysize = chunk_size
|
||||
while True:
|
||||
vectors = cur.fetchmany()
|
||||
if not vectors:
|
||||
return
|
||||
yield [v for v, in vectors]
|
||||
|
||||
def get(self, vector, default=None):
|
||||
try:
|
||||
|
|
|
@ -65,8 +65,10 @@ class TestObjects():
|
|||
self.objects[obj.vector] = obj
|
||||
needed.add(obj.vector)
|
||||
|
||||
self.assertEqual(set(self.objects.vectors_to_send(4)), needed)
|
||||
self.assertTrue(set(self.objects.vectors_to_send()).difference(needed))
|
||||
self.assertEqual(
|
||||
set(next(self.objects.vectors_to_send(stream=4))), needed)
|
||||
self.assertTrue(
|
||||
set(next(self.objects.vectors_to_send())).difference(needed))
|
||||
|
||||
def test_filter(self):
|
||||
"""Check the objects filtering"""
|
||||
|
|
Loading…
Reference in New Issue
Block a user