some initial objectProcessorThread work
This commit is contained in:
parent
34ca45160d
commit
2a1b6dd86d
|
@ -158,7 +158,7 @@ class receiveDataThread(threading.Thread):
|
||||||
elif remoteCommand == 'getdata\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
|
elif remoteCommand == 'getdata\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
|
||||||
self.recgetdata(self.data[24:self.payloadLength + 24])
|
self.recgetdata(self.data[24:self.payloadLength + 24])
|
||||||
elif remoteCommand == 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
|
elif remoteCommand == 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
|
||||||
self.recmsg(self.data[24:self.payloadLength + 24])
|
self.recmsg(self.data[:self.payloadLength + 24])
|
||||||
elif remoteCommand == 'broadcast\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
|
elif remoteCommand == 'broadcast\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
|
||||||
self.recbroadcast(self.data[24:self.payloadLength + 24])
|
self.recbroadcast(self.data[24:self.payloadLength + 24])
|
||||||
elif remoteCommand == 'ping\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
|
elif remoteCommand == 'ping\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
|
||||||
|
@ -221,19 +221,6 @@ class receiveDataThread(threading.Thread):
|
||||||
self.data = self.ackDataThatWeHaveYetToSend.pop()
|
self.data = self.ackDataThatWeHaveYetToSend.pop()
|
||||||
self.processData()
|
self.processData()
|
||||||
|
|
||||||
def isProofOfWorkSufficient(
|
|
||||||
self,
|
|
||||||
data,
|
|
||||||
nonceTrialsPerByte=0,
|
|
||||||
payloadLengthExtraBytes=0):
|
|
||||||
if nonceTrialsPerByte < shared.networkDefaultProofOfWorkNonceTrialsPerByte:
|
|
||||||
nonceTrialsPerByte = shared.networkDefaultProofOfWorkNonceTrialsPerByte
|
|
||||||
if payloadLengthExtraBytes < shared.networkDefaultPayloadLengthExtraBytes:
|
|
||||||
payloadLengthExtraBytes = shared.networkDefaultPayloadLengthExtraBytes
|
|
||||||
POW, = unpack('>Q', hashlib.sha512(hashlib.sha512(data[
|
|
||||||
:8] + hashlib.sha512(data[8:]).digest()).digest()).digest()[0:8])
|
|
||||||
# print 'POW:', POW
|
|
||||||
return POW <= 2 ** 64 / ((len(data) + payloadLengthExtraBytes) * (nonceTrialsPerByte))
|
|
||||||
|
|
||||||
def sendpong(self):
|
def sendpong(self):
|
||||||
print 'Sending pong'
|
print 'Sending pong'
|
||||||
|
@ -342,7 +329,7 @@ class receiveDataThread(threading.Thread):
|
||||||
def recbroadcast(self, data):
|
def recbroadcast(self, data):
|
||||||
self.messageProcessingStartTime = time.time()
|
self.messageProcessingStartTime = time.time()
|
||||||
# First we must check to make sure the proof of work is sufficient.
|
# First we must check to make sure the proof of work is sufficient.
|
||||||
if not self.isProofOfWorkSufficient(data):
|
if not shared.isProofOfWorkSufficient(data):
|
||||||
print 'Proof of work in broadcast message insufficient.'
|
print 'Proof of work in broadcast message insufficient.'
|
||||||
return
|
return
|
||||||
readPosition = 8 # bypass the nonce
|
readPosition = 8 # bypass the nonce
|
||||||
|
@ -858,13 +845,14 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
# We have received a msg message.
|
# We have received a msg message.
|
||||||
def recmsg(self, data):
|
def recmsg(self, data):
|
||||||
|
readPosition = 24 # bypass the network header
|
||||||
self.messageProcessingStartTime = time.time()
|
self.messageProcessingStartTime = time.time()
|
||||||
# First we must check to make sure the proof of work is sufficient.
|
# First we must check to make sure the proof of work is sufficient.
|
||||||
if not self.isProofOfWorkSufficient(data):
|
if not shared.isProofOfWorkSufficient(data[readPosition:readPosition+10]):
|
||||||
print 'Proof of work in msg message insufficient.'
|
print 'Proof of work in msg message insufficient.'
|
||||||
return
|
return
|
||||||
|
|
||||||
readPosition = 8
|
readPosition += 8 # bypass the POW nonce
|
||||||
embeddedTime, = unpack('>I', data[readPosition:readPosition + 4])
|
embeddedTime, = unpack('>I', data[readPosition:readPosition + 4])
|
||||||
|
|
||||||
# This section is used for the transition from 32 bit time to 64 bit
|
# This section is used for the transition from 32 bit time to 64 bit
|
||||||
|
@ -905,12 +893,15 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.inventorySets[self.streamNumber].add(self.inventoryHash)
|
shared.inventorySets[self.streamNumber].add(self.inventoryHash)
|
||||||
shared.inventoryLock.release()
|
shared.inventoryLock.release()
|
||||||
self.broadcastinv(self.inventoryHash)
|
self.broadcastinv(self.inventoryHash)
|
||||||
shared.numberOfMessagesProcessed += 1
|
#shared.numberOfMessagesProcessed += 1
|
||||||
shared.UISignalQueue.put((
|
#shared.UISignalQueue.put((
|
||||||
'updateNumberOfMessagesProcessed', 'no data'))
|
# 'updateNumberOfMessagesProcessed', 'no data'))
|
||||||
|
|
||||||
self.processmsg(
|
#self.processmsg(
|
||||||
readPosition, data) # When this function returns, we will have either successfully processed the message bound for us, ignored it because it isn't bound for us, or found problem with the message that warranted ignoring it.
|
# readPosition, data) # When this function returns, we will have either successfully processed the message bound for us, ignored it because it isn't bound for us, or found problem with the message that warranted ignoring it.
|
||||||
|
shared.objectProcessorQueue.put(data)
|
||||||
|
with shard.printLock:
|
||||||
|
print 'Size of objectProcessorQueue is:', sys.getsizeof(shared.objectProcessorQueue)
|
||||||
|
|
||||||
# Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. If we
|
# Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. If we
|
||||||
# haven't used the specified amount of time, we shall sleep. These
|
# haven't used the specified amount of time, we shall sleep. These
|
||||||
|
@ -1104,7 +1095,7 @@ class receiveDataThread(threading.Thread):
|
||||||
toAddress, 'noncetrialsperbyte')
|
toAddress, 'noncetrialsperbyte')
|
||||||
requiredPayloadLengthExtraBytes = shared.config.getint(
|
requiredPayloadLengthExtraBytes = shared.config.getint(
|
||||||
toAddress, 'payloadlengthextrabytes')
|
toAddress, 'payloadlengthextrabytes')
|
||||||
if not self.isProofOfWorkSufficient(encryptedData, requiredNonceTrialsPerByte, requiredPayloadLengthExtraBytes):
|
if not shared.isProofOfWorkSufficient(encryptedData, requiredNonceTrialsPerByte, requiredPayloadLengthExtraBytes):
|
||||||
print 'Proof of work in msg message insufficient only because it does not meet our higher requirement.'
|
print 'Proof of work in msg message insufficient only because it does not meet our higher requirement.'
|
||||||
return
|
return
|
||||||
blockMessage = False # Gets set to True if the user shouldn't see the message according to black or white lists.
|
blockMessage = False # Gets set to True if the user shouldn't see the message according to black or white lists.
|
||||||
|
@ -1283,7 +1274,7 @@ class receiveDataThread(threading.Thread):
|
||||||
if len(data) < 146 or len(data) > 420: # sanity check
|
if len(data) < 146 or len(data) > 420: # sanity check
|
||||||
return
|
return
|
||||||
# We must check to make sure the proof of work is sufficient.
|
# We must check to make sure the proof of work is sufficient.
|
||||||
if not self.isProofOfWorkSufficient(data):
|
if not shared.isProofOfWorkSufficient(data):
|
||||||
print 'Proof of work in pubkey message insufficient.'
|
print 'Proof of work in pubkey message insufficient.'
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -1567,10 +1558,9 @@ class receiveDataThread(threading.Thread):
|
||||||
# the messages that require it.
|
# the messages that require it.
|
||||||
self.possibleNewPubkey(address = fromAddress)
|
self.possibleNewPubkey(address = fromAddress)
|
||||||
|
|
||||||
|
|
||||||
# We have received a getpubkey message
|
# We have received a getpubkey message
|
||||||
def recgetpubkey(self, data):
|
def recgetpubkey(self, data):
|
||||||
if not self.isProofOfWorkSufficient(data):
|
if not shared.isProofOfWorkSufficient(data):
|
||||||
print 'Proof of work in getpubkey message insufficient.'
|
print 'Proof of work in getpubkey message insufficient.'
|
||||||
return
|
return
|
||||||
if len(data) < 34:
|
if len(data) < 34:
|
||||||
|
|
|
@ -72,6 +72,9 @@ daemon = False
|
||||||
inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
|
inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
|
||||||
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
|
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
|
||||||
maximumLengthOfTimeToBotherResendingMessages = 0
|
maximumLengthOfTimeToBotherResendingMessages = 0
|
||||||
|
objectProcessorQueue = Queue.Queue(
|
||||||
|
) # receiveDataThreads dump objects they hear on the network into this queue to be processed.
|
||||||
|
streamsInWhichIAmParticipating = {}
|
||||||
|
|
||||||
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
|
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
|
||||||
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
|
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
|
||||||
|
@ -277,6 +280,20 @@ def reloadBroadcastSendersForWhichImWatching():
|
||||||
privEncryptionKey = doubleHashOfAddressData[:32]
|
privEncryptionKey = doubleHashOfAddressData[:32]
|
||||||
MyECSubscriptionCryptorObjects[tag] = highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex'))
|
MyECSubscriptionCryptorObjects[tag] = highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex'))
|
||||||
|
|
||||||
|
def isProofOfWorkSufficient(
|
||||||
|
self,
|
||||||
|
data,
|
||||||
|
nonceTrialsPerByte=0,
|
||||||
|
payloadLengthExtraBytes=0):
|
||||||
|
if nonceTrialsPerByte < shared.networkDefaultProofOfWorkNonceTrialsPerByte:
|
||||||
|
nonceTrialsPerByte = shared.networkDefaultProofOfWorkNonceTrialsPerByte
|
||||||
|
if payloadLengthExtraBytes < shared.networkDefaultPayloadLengthExtraBytes:
|
||||||
|
payloadLengthExtraBytes = shared.networkDefaultPayloadLengthExtraBytes
|
||||||
|
POW, = unpack('>Q', hashlib.sha512(hashlib.sha512(data[
|
||||||
|
:8] + hashlib.sha512(data[8:]).digest()).digest()).digest()[0:8])
|
||||||
|
# print 'POW:', POW
|
||||||
|
return POW <= 2 ** 64 / ((len(data) + payloadLengthExtraBytes) * (nonceTrialsPerByte))
|
||||||
|
|
||||||
def doCleanShutdown():
|
def doCleanShutdown():
|
||||||
global shutdown
|
global shutdown
|
||||||
shutdown = 1 #Used to tell proof of work worker threads to exit.
|
shutdown = 1 #Used to tell proof of work worker threads to exit.
|
||||||
|
|
Loading…
Reference in New Issue
Block a user