WIP: Implementing sqlite objects storage #13

Draft
lee.miller wants to merge 11 commits from lee.miller/MiNode:sqlite into v0.3
3 changed files with 17 additions and 11 deletions
Showing only changes of commit 9fc6e65f2d - Show all commits

View File

@ -278,14 +278,11 @@ class ConnectionBase(threading.Thread):
self.send_queue.put(message.Addr(addr))
if shared.objects:
to_send = shared.objects.vectors_to_send()
offset = 0
while offset < len(to_send):
for chunk in shared.objects.vectors_to_send(10000):
# We limit size of inv messaged to 10000 entries
# because they might time out
# in very slow networks (I2P)
self.send_queue.put(message.Inv(to_send[offset:offset+10000]))
offset += 10000
self.send_queue.put(message.Inv(chunk))
self.status = 'fully_established'
def _process_queue(self):

View File

@ -119,13 +119,20 @@ class Inventory():
self._pending.update(vectors)
return vectors
def vectors_to_send(self, stream=None):
def vectors_to_send(self, chunk_size=10000, stream=None):
if stream is None:
stream = shared.stream
now = int(time.time())
cur = self._db.execute(
'SELECT vector FROM objects WHERE expires > ? AND stream = ?'
' ORDER BY random()',
(int(time.time()), stream or shared.stream)
' ORDER BY random()', (now, stream)
)
return [v for v, in cur]
cur.arraysize = chunk_size
while True:
vectors = cur.fetchmany()
if not vectors:
return
yield [v for v, in vectors]
def get(self, vector, default=None):
try:

View File

@ -65,8 +65,10 @@ class TestObjects():
self.objects[obj.vector] = obj
needed.add(obj.vector)
self.assertEqual(set(self.objects.vectors_to_send(4)), needed)
self.assertTrue(set(self.objects.vectors_to_send()).difference(needed))
self.assertEqual(
set(next(self.objects.vectors_to_send(stream=4))), needed)
self.assertTrue(
set(next(self.objects.vectors_to_send())).difference(needed))
def test_filter(self):
"""Check the objects filtering"""