2017-05-25 23:04:33 +02:00
import Queue
2017-06-21 12:16:33 +02:00
import sys
2017-05-25 23:04:33 +02:00
import threading
2017-05-27 19:09:21 +02:00
import time
2017-05-25 23:04:33 +02:00
2017-05-27 19:09:21 +02:00
import addresses
2017-05-25 23:04:33 +02:00
from bmconfigparser import BMConfigParser
from debug import logger
from helper_threading import StoppableThread
from inventory import Inventory
from network . connectionpool import BMConnectionPool
2017-05-27 19:09:21 +02:00
from network . bmproto import BMProto
2017-05-25 23:04:33 +02:00
import protocol
2017-05-29 14:35:08 +02:00
import state
2017-05-25 23:04:33 +02:00
class ReceiveQueueThread ( threading . Thread , StoppableThread ) :
def __init__ ( self ) :
threading . Thread . __init__ ( self , name = " ReceiveQueueThread " )
self . initStop ( )
self . name = " ReceiveQueueThread "
2017-05-29 00:24:07 +02:00
logger . info ( " init receive queue thread " )
2017-05-25 23:04:33 +02:00
def run ( self ) :
2017-05-27 19:09:21 +02:00
lastprinted = int ( time . time ( ) )
2017-05-29 14:35:08 +02:00
while not self . _stopped and state . shutdown == 0 :
2017-05-27 19:09:21 +02:00
if lastprinted < int ( time . time ( ) ) :
lastprinted = int ( time . time ( ) )
2017-06-24 12:19:19 +02:00
# try:
# sys._getframe(200)
# logger.error("Stack depth warning")
# except ValueError:
# pass
2017-05-25 23:04:33 +02:00
processed = 0
for i in BMConnectionPool ( ) . inboundConnections . values ( ) + BMConnectionPool ( ) . outboundConnections . values ( ) :
2017-05-27 19:09:21 +02:00
if self . _stopped :
break
2017-05-25 23:04:33 +02:00
try :
command , args = i . receiveQueue . get ( False )
except Queue . Empty :
continue
processed + = 1
try :
getattr ( self , " command_ " + str ( command ) ) ( i , args )
2017-05-27 19:39:19 +02:00
i . receiveQueue . task_done ( )
2017-05-25 23:04:33 +02:00
except AttributeError :
2017-05-27 19:39:19 +02:00
i . receiveQueue . task_done ( )
2017-05-25 23:04:33 +02:00
# missing command
raise
if processed == 0 :
2017-05-27 19:09:21 +02:00
self . stop . wait ( 2 )
2017-05-25 23:04:33 +02:00
def command_object ( self , connection , objHash ) :
try :
connection . writeQueue . put ( protocol . CreatePacket ( ' object ' , Inventory ( ) [ objHash ] . payload ) )
except KeyError :
connection . antiIntersectionDelay ( )
logger . warning ( ' %s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it. ' % ( connection . destination , ) )
2017-05-27 19:09:21 +02:00
def command_biginv ( self , connection , dummy ) :
def sendChunk ( ) :
if objectCount == 0 :
return
logger . debug ( ' Sending huge inv message with %i objects to just this one peer ' , objectCount )
connection . writeQueue . put ( protocol . CreatePacket ( ' inv ' , addresses . encodeVarint ( objectCount ) + payload ) )
# Select all hashes for objects in this stream.
bigInvList = { }
for stream in connection . streams :
2017-06-21 12:16:33 +02:00
# may lock for a long time, but I think it's better than thousands of small locks
with connection . objectsNewToThemLock :
for objHash in Inventory ( ) . unexpired_hashes_by_stream ( stream ) :
bigInvList [ objHash ] = 0
connection . objectsNewToThem [ objHash ] = True
2017-05-27 19:09:21 +02:00
objectCount = 0
payload = b ' '
# Now let us start appending all of these hashes together. They will be
# sent out in a big inv message to our new peer.
for hash , storedValue in bigInvList . items ( ) :
payload + = hash
objectCount + = 1
if objectCount > = BMProto . maxObjectCount :
self . sendChunk ( )
payload = b ' '
objectCount = 0
# flush
sendChunk ( )
def command_inv ( self , connection , hashId ) :
connection . handleReceivedInventory ( hashId )
2017-05-25 23:04:33 +02:00
def stopThread ( self ) :
super ( ReceiveQueueThread , self ) . stopThread ( )