Added API for raw objects
This commit is contained in:
parent
373157db45
commit
00c4ee881f
72
src/api.py
72
src/api.py
|
@ -17,6 +17,7 @@ from binascii import hexlify, unhexlify
|
||||||
from random import randint
|
from random import randint
|
||||||
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
|
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
|
||||||
from struct import pack
|
from struct import pack
|
||||||
|
import errno
|
||||||
|
|
||||||
import shared
|
import shared
|
||||||
from addresses import (
|
from addresses import (
|
||||||
|
@ -32,6 +33,7 @@ import state
|
||||||
import queues
|
import queues
|
||||||
import shutdown
|
import shutdown
|
||||||
import network.stats
|
import network.stats
|
||||||
|
import protocol
|
||||||
|
|
||||||
# Classes
|
# Classes
|
||||||
from helper_sql import sqlQuery, sqlExecute, SqlBulkExecute, sqlStoredProcedure
|
from helper_sql import sqlQuery, sqlExecute, SqlBulkExecute, sqlStoredProcedure
|
||||||
|
@ -1165,6 +1167,73 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
data += ']}'
|
data += ']}'
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
def HandleDisseminateRawObject(self, arguments):
|
||||||
|
if len(arguments) != 1:
|
||||||
|
raise APIError(0, "1 argument needed")
|
||||||
|
|
||||||
|
payload = self._decode(arguments[0], "hex")
|
||||||
|
|
||||||
|
inventoryHash = protocol.checkAndShareObjectWithPeers(payload)
|
||||||
|
|
||||||
|
if inventoryHash is None:
|
||||||
|
raise APIError(30, "Invalid object or insufficient POW")
|
||||||
|
else:
|
||||||
|
return hexlify(inventoryHash)
|
||||||
|
|
||||||
|
def HandleGetRawObject(self, arguments):
|
||||||
|
if len(arguments) != 1:
|
||||||
|
raise APIError(0, "1 argument needed")
|
||||||
|
|
||||||
|
inventoryHash, = arguments
|
||||||
|
|
||||||
|
if len(inventoryHash) != 64:
|
||||||
|
raise APIError(19, "The length of hash should be 32 bytes (encoded in hex thus 64 characters)")
|
||||||
|
|
||||||
|
inventoryHash = self._decode(inventoryHash, "hex")
|
||||||
|
|
||||||
|
try:
|
||||||
|
inventoryItem = Inventory()[inventoryHash]
|
||||||
|
except KeyError:
|
||||||
|
raise APIError(31, "Object not found")
|
||||||
|
|
||||||
|
return json.dumps({
|
||||||
|
"hash": hexlify(inventoryHash),
|
||||||
|
"expiryTime": inventoryItem.expires,
|
||||||
|
"objectType": inventoryItem.type,
|
||||||
|
"stream": inventoryItem.stream,
|
||||||
|
"tag": hexlify(inventoryItem.tag),
|
||||||
|
"payload": hexlify(inventoryItem.payload)
|
||||||
|
}, indent = 4, separators = (",", ": "))
|
||||||
|
|
||||||
|
def HandleListRawObjects(self, arguments):
|
||||||
|
if len(arguments) != 3:
|
||||||
|
raise APIError(0, "3 arguments needed")
|
||||||
|
|
||||||
|
objectType, stream, tag = arguments
|
||||||
|
|
||||||
|
if tag is not None:
|
||||||
|
tag = buffer(self._decode(tag, "hex"))
|
||||||
|
|
||||||
|
result = []
|
||||||
|
|
||||||
|
inventory = Inventory()
|
||||||
|
|
||||||
|
for i in inventory:
|
||||||
|
inventoryItem = inventory[str(i)]
|
||||||
|
|
||||||
|
if objectType is not None and inventoryItem.type != objectType:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if stream is not None and inventoryItem.stream != stream:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if tag is not None and inventoryItem.tag != tag:
|
||||||
|
continue
|
||||||
|
|
||||||
|
result.append(hexlify(i))
|
||||||
|
|
||||||
|
return json.dumps(result, indent = 4, separators = (",", ": "))
|
||||||
|
|
||||||
def HandleClientStatus(self, params):
|
def HandleClientStatus(self, params):
|
||||||
if len(network.stats.connectedHostsList()) == 0:
|
if len(network.stats.connectedHostsList()) == 0:
|
||||||
networkStatus = 'notConnected'
|
networkStatus = 'notConnected'
|
||||||
|
@ -1272,6 +1341,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
HandleGetMessageDataByDestinationHash
|
HandleGetMessageDataByDestinationHash
|
||||||
handlers['getMessageDataByDestinationTag'] = \
|
handlers['getMessageDataByDestinationTag'] = \
|
||||||
HandleGetMessageDataByDestinationHash
|
HandleGetMessageDataByDestinationHash
|
||||||
|
handlers["disseminateRawObject"] = HandleDisseminateRawObject
|
||||||
|
handlers["getRawObject"] = HandleGetRawObject
|
||||||
|
handlers["listRawObjects"] = HandleListRawObjects
|
||||||
handlers['clientStatus'] = HandleClientStatus
|
handlers['clientStatus'] = HandleClientStatus
|
||||||
handlers['decodeAddress'] = HandleDecodeAddress
|
handlers['decodeAddress'] = HandleDecodeAddress
|
||||||
handlers['deleteAndVacuum'] = HandleDeleteAndVacuum
|
handlers['deleteAndVacuum'] = HandleDeleteAndVacuum
|
||||||
|
|
|
@ -410,7 +410,6 @@ if __name__ == "__main__":
|
||||||
import signal
|
import signal
|
||||||
# The next 3 are used for the API
|
# The next 3 are used for the API
|
||||||
from singleinstance import singleinstance
|
from singleinstance import singleinstance
|
||||||
import errno
|
|
||||||
import socket
|
import socket
|
||||||
import ctypes
|
import ctypes
|
||||||
from struct import pack
|
from struct import pack
|
||||||
|
|
|
@ -1884,9 +1884,9 @@ class MyForm(settingsmixin.SMainWindow):
|
||||||
_translate("MainWindow", "%1 kiH / s").arg("{:.1f}".format(status.speed / 1024))
|
_translate("MainWindow", "%1 kiH / s").arg("{:.1f}".format(status.speed / 1024))
|
||||||
)
|
)
|
||||||
|
|
||||||
self.ui.workProverSpeed.setToolTip("Difficulty: {}, 80 % completion time: {:.1f} s".format(
|
self.ui.workProverSpeed.setToolTip("Difficulty: {}, 95 % completion time: {:.1f} s".format(
|
||||||
status.difficulty,
|
status.difficulty,
|
||||||
workprover.utils.estimateMaximumIterationsCount(status.difficulty, .8) / status.speed
|
workprover.utils.estimateMaximumIterationsCount(status.difficulty, .95) / status.speed
|
||||||
))
|
))
|
||||||
|
|
||||||
def rerenderMessagelistFromLabels(self):
|
def rerenderMessagelistFromLabels(self):
|
||||||
|
@ -4597,11 +4597,14 @@ class settingsDialog(QtGui.QDialog):
|
||||||
self.ui.spinBoxForkingSolverParallelism.setValue(forkingSolverParallelism)
|
self.ui.spinBoxForkingSolverParallelism.setValue(forkingSolverParallelism)
|
||||||
self.ui.spinBoxFastSolverParallelism.setValue(fastSolverParallelism)
|
self.ui.spinBoxFastSolverParallelism.setValue(fastSolverParallelism)
|
||||||
|
|
||||||
vendors = set(singleworker.workProver.availableSolvers["gpu"].vendors)
|
vendors = set()
|
||||||
|
|
||||||
if GPUVendor is not None:
|
if GPUVendor is not None:
|
||||||
vendors.add(GPUVendor)
|
vendors.add(GPUVendor)
|
||||||
|
|
||||||
|
if "gpu" in singleworker.workProver.availableSolvers:
|
||||||
|
vendors |= set(singleworker.workProver.availableSolvers["gpu"].vendors)
|
||||||
|
|
||||||
self.ui.comboBoxGPUVendor.clear()
|
self.ui.comboBoxGPUVendor.clear()
|
||||||
self.ui.comboBoxGPUVendor.addItems(list(vendors))
|
self.ui.comboBoxGPUVendor.addItems(list(vendors))
|
||||||
self.ui.comboBoxGPUVendor.setCurrentIndex(0)
|
self.ui.comboBoxGPUVendor.setCurrentIndex(0)
|
||||||
|
|
|
@ -747,7 +747,7 @@ class objectProcessor(threading.Thread):
|
||||||
not BMConfigParser().safeGetBoolean(toAddress, 'dontsendack')
|
not BMConfigParser().safeGetBoolean(toAddress, 'dontsendack')
|
||||||
and not BMConfigParser().safeGetBoolean(toAddress, 'chan')
|
and not BMConfigParser().safeGetBoolean(toAddress, 'chan')
|
||||||
):
|
):
|
||||||
shared.checkAndShareObjectWithPeers(ackData[24:])
|
protocol.checkAndShareObjectWithPeers(ackData[24:])
|
||||||
|
|
||||||
# Display timing data
|
# Display timing data
|
||||||
timeRequiredToAttemptToDecryptMessage = time.time(
|
timeRequiredToAttemptToDecryptMessage = time.time(
|
||||||
|
@ -1075,7 +1075,7 @@ class objectProcessor(threading.Thread):
|
||||||
# The largest message should be either an inv or a getdata
|
# The largest message should be either an inv or a getdata
|
||||||
# message at 1.6 MB in size.
|
# message at 1.6 MB in size.
|
||||||
# That doesn't mean that the object may be that big. The
|
# That doesn't mean that the object may be that big. The
|
||||||
# shared.checkAndShareObjectWithPeers function will verify
|
# protocol.checkAndShareObjectWithPeers function will verify
|
||||||
# that it is no larger than 2^18 bytes.
|
# that it is no larger than 2^18 bytes.
|
||||||
return False
|
return False
|
||||||
# test the checksum in the message.
|
# test the checksum in the message.
|
||||||
|
|
270
src/protocol.py
270
src/protocol.py
|
@ -27,9 +27,9 @@ from addresses import calculateInventoryHash, encodeVarint, decodeVarint, decode
|
||||||
from bmconfigparser import BMConfigParser
|
from bmconfigparser import BMConfigParser
|
||||||
from debug import logger
|
from debug import logger
|
||||||
from helper_sql import sqlExecute
|
from helper_sql import sqlExecute
|
||||||
from inventory import Inventory
|
|
||||||
from queues import objectProcessorQueue
|
|
||||||
from version import softwareVersion
|
from version import softwareVersion
|
||||||
|
import inventory
|
||||||
|
import queues
|
||||||
|
|
||||||
|
|
||||||
# Service flags
|
# Service flags
|
||||||
|
@ -400,227 +400,89 @@ def decryptAndCheckV4Pubkey(payload, address, cryptor):
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def checkAndShareObjectWithPeers(data):
|
def checkAndShareObjectWithPeers(payload):
|
||||||
"""
|
if len(payload) > 2 ** 18:
|
||||||
This function is called after either receiving an object off of the wire
|
logger.info("The payload length of this object is too large (%i bytes)", len(payload))
|
||||||
or after receiving one as ackdata.
|
|
||||||
Returns the length of time that we should reserve to process this message
|
return None
|
||||||
if we are receiving it off of the wire.
|
|
||||||
"""
|
if not isProofOfWorkSufficient(payload):
|
||||||
if len(data) > 2 ** 18:
|
logger.info("Proof of work is insufficient")
|
||||||
logger.info('The payload length of this object is too large (%s bytes). Ignoring it.', len(data))
|
|
||||||
return 0
|
return None
|
||||||
# Let us check to make sure that the proof of work is sufficient.
|
|
||||||
if not isProofOfWorkSufficient(data):
|
readPosition = 8
|
||||||
logger.info('Proof of work is insufficient.')
|
|
||||||
return 0
|
|
||||||
|
|
||||||
endOfLifeTime, = unpack('>Q', data[8:16])
|
|
||||||
# The TTL may not be larger than 28 days + 3 hours of wiggle room
|
|
||||||
if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800:
|
|
||||||
logger.info('This object\'s End of Life time is too far in the future. Ignoring it. Time is %s', endOfLifeTime)
|
|
||||||
return 0
|
|
||||||
if endOfLifeTime - int(time.time()) < - 3600: # The EOL time was more than an hour ago. That's too much.
|
|
||||||
logger.info(
|
|
||||||
'This object\'s End of Life time was more than an hour ago. Ignoring the object. Time is %s',
|
|
||||||
endOfLifeTime)
|
|
||||||
return 0
|
|
||||||
intObjectType, = unpack('>I', data[16:20])
|
|
||||||
try:
|
try:
|
||||||
if intObjectType == 0:
|
expiryTime, objectType = unpack(">QI", payload[readPosition: readPosition + 12])
|
||||||
_checkAndShareGetpubkeyWithPeers(data)
|
readPosition += 12
|
||||||
return 0.1
|
|
||||||
elif intObjectType == 1:
|
|
||||||
_checkAndSharePubkeyWithPeers(data)
|
|
||||||
return 0.1
|
|
||||||
elif intObjectType == 2:
|
|
||||||
_checkAndShareMsgWithPeers(data)
|
|
||||||
return 0.6
|
|
||||||
elif intObjectType == 3:
|
|
||||||
_checkAndShareBroadcastWithPeers(data)
|
|
||||||
return 0.6
|
|
||||||
_checkAndShareUndefinedObjectWithPeers(data)
|
|
||||||
return 0.6
|
|
||||||
except varintDecodeError as err:
|
|
||||||
logger.debug(
|
|
||||||
"There was a problem with a varint while checking to see whether it was appropriate to share an object"
|
|
||||||
" with peers. Some details: %s", err
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
logger.critical(
|
|
||||||
'There was a problem while checking to see whether it was appropriate to share an object with peers.'
|
|
||||||
' This is definitely a bug! %s%s' % os.linesep, traceback.format_exc()
|
|
||||||
)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
version, readLength = decodeVarint(payload[readPosition: readPosition + 8])
|
||||||
|
readPosition += readLength
|
||||||
|
|
||||||
def _checkAndShareUndefinedObjectWithPeers(data):
|
stream, readLength = decodeVarint(payload[readPosition: readPosition + 8])
|
||||||
# pylint: disable=unused-variable
|
readPosition += readLength
|
||||||
embeddedTime, = unpack('>Q', data[8:16])
|
except:
|
||||||
readPosition = 20 # bypass nonce, time, and object type
|
logger.info("Error parsing object header")
|
||||||
objectVersion, objectVersionLength = decodeVarint(
|
|
||||||
data[readPosition:readPosition + 9])
|
|
||||||
readPosition += objectVersionLength
|
|
||||||
streamNumber, streamNumberLength = decodeVarint(
|
|
||||||
data[readPosition:readPosition + 9])
|
|
||||||
if streamNumber not in state.streamsInWhichIAmParticipating:
|
|
||||||
logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber)
|
|
||||||
return
|
|
||||||
|
|
||||||
inventoryHash = calculateInventoryHash(data)
|
return None
|
||||||
if inventoryHash in Inventory():
|
|
||||||
logger.debug('We have already received this undefined object. Ignoring.')
|
|
||||||
return
|
|
||||||
objectType, = unpack('>I', data[16:20])
|
|
||||||
Inventory()[inventoryHash] = (
|
|
||||||
objectType, streamNumber, data, embeddedTime, '')
|
|
||||||
logger.debug('advertising inv with hash: %s', hexlify(inventoryHash))
|
|
||||||
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
|
||||||
|
|
||||||
|
tag = payload[readPosition: readPosition + 32]
|
||||||
|
|
||||||
def _checkAndShareMsgWithPeers(data):
|
TTL = expiryTime - int(time.time())
|
||||||
embeddedTime, = unpack('>Q', data[8:16])
|
|
||||||
readPosition = 20 # bypass nonce, time, and object type
|
|
||||||
objectVersion, objectVersionLength = decodeVarint( # pylint: disable=unused-variable
|
|
||||||
data[readPosition:readPosition + 9])
|
|
||||||
readPosition += objectVersionLength
|
|
||||||
streamNumber, streamNumberLength = decodeVarint(
|
|
||||||
data[readPosition:readPosition + 9])
|
|
||||||
if streamNumber not in state.streamsInWhichIAmParticipating:
|
|
||||||
logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber)
|
|
||||||
return
|
|
||||||
readPosition += streamNumberLength
|
|
||||||
inventoryHash = calculateInventoryHash(data)
|
|
||||||
if inventoryHash in Inventory():
|
|
||||||
logger.debug('We have already received this msg message. Ignoring.')
|
|
||||||
return
|
|
||||||
# This msg message is valid. Let's let our peers know about it.
|
|
||||||
objectType = 2
|
|
||||||
Inventory()[inventoryHash] = (
|
|
||||||
objectType, streamNumber, data, embeddedTime, '')
|
|
||||||
logger.debug('advertising inv with hash: %s', hexlify(inventoryHash))
|
|
||||||
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
|
||||||
|
|
||||||
# Now let's enqueue it to be processed ourselves.
|
# TTL may not be lesser than -1 hour or larger than 28 days + 3 hours of wiggle room
|
||||||
objectProcessorQueue.put((objectType, data))
|
|
||||||
|
|
||||||
|
if TTL < -3600:
|
||||||
|
logger.info("This object\'s expiry time was more than an hour ago: %s", expiryTime)
|
||||||
|
|
||||||
def _checkAndShareGetpubkeyWithPeers(data):
|
return None
|
||||||
# pylint: disable=unused-variable
|
elif TTL > 28 * 24 * 60 * 60 + 10800:
|
||||||
if len(data) < 42:
|
logger.info("This object\'s expiry time is too far in the future: %s", expiryTime)
|
||||||
logger.info('getpubkey message doesn\'t contain enough data. Ignoring.')
|
|
||||||
return
|
|
||||||
if len(data) > 200:
|
|
||||||
logger.info('getpubkey is abnormally long. Sanity check failed. Ignoring object.')
|
|
||||||
embeddedTime, = unpack('>Q', data[8:16])
|
|
||||||
readPosition = 20 # bypass the nonce, time, and object type
|
|
||||||
requestedAddressVersionNumber, addressVersionLength = decodeVarint(
|
|
||||||
data[readPosition:readPosition + 10])
|
|
||||||
readPosition += addressVersionLength
|
|
||||||
streamNumber, streamNumberLength = decodeVarint(
|
|
||||||
data[readPosition:readPosition + 10])
|
|
||||||
if streamNumber not in state.streamsInWhichIAmParticipating:
|
|
||||||
logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber)
|
|
||||||
return
|
|
||||||
readPosition += streamNumberLength
|
|
||||||
|
|
||||||
inventoryHash = calculateInventoryHash(data)
|
return None
|
||||||
if inventoryHash in Inventory():
|
|
||||||
logger.debug('We have already received this getpubkey request. Ignoring it.')
|
|
||||||
return
|
|
||||||
|
|
||||||
objectType = 0
|
if stream not in state.streamsInWhichIAmParticipating:
|
||||||
Inventory()[inventoryHash] = (
|
logger.info("The stream number %i isn\'t one we are interested in", stream)
|
||||||
objectType, streamNumber, data, embeddedTime, '')
|
|
||||||
# This getpubkey request is valid. Forward to peers.
|
|
||||||
logger.debug('advertising inv with hash: %s', hexlify(inventoryHash))
|
|
||||||
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
|
||||||
|
|
||||||
# Now let's queue it to be processed ourselves.
|
return None
|
||||||
objectProcessorQueue.put((objectType, data))
|
|
||||||
|
|
||||||
|
if objectType == 0:
|
||||||
|
if len(payload) < 42:
|
||||||
|
logger.info("Too short \"getpubkey\" message")
|
||||||
|
|
||||||
def _checkAndSharePubkeyWithPeers(data):
|
return None
|
||||||
if len(data) < 146 or len(data) > 440: # sanity check
|
elif objectType == 1:
|
||||||
return
|
if len(payload) < 146 or len(payload) > 440:
|
||||||
embeddedTime, = unpack('>Q', data[8:16])
|
logger.info("Invalid length \"pubkey\"")
|
||||||
readPosition = 20 # bypass the nonce, time, and object type
|
|
||||||
addressVersion, varintLength = decodeVarint(
|
return None
|
||||||
data[readPosition:readPosition + 10])
|
elif objectType == 3:
|
||||||
readPosition += varintLength
|
if len(payload) < 180:
|
||||||
streamNumber, varintLength = decodeVarint(
|
logger.info("Too short \"broadcast\" message")
|
||||||
data[readPosition:readPosition + 10])
|
|
||||||
readPosition += varintLength
|
return None
|
||||||
if streamNumber not in state.streamsInWhichIAmParticipating:
|
|
||||||
logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber)
|
if version == 1:
|
||||||
return
|
logger.info("Obsolete \"broadcast\" message version")
|
||||||
if addressVersion >= 4:
|
|
||||||
tag = data[readPosition:readPosition + 32]
|
return None
|
||||||
logger.debug('tag in received pubkey is: %s', hexlify(tag))
|
|
||||||
|
inventoryHash = calculateDoubleHash(payload)[: 32]
|
||||||
|
|
||||||
|
if inventoryHash in inventory.Inventory():
|
||||||
|
logger.info("We already have this object")
|
||||||
|
|
||||||
|
return inventoryHash
|
||||||
else:
|
else:
|
||||||
tag = ''
|
inventory.Inventory()[inventoryHash] = objectType, stream, payload, expiryTime, buffer(tag)
|
||||||
|
queues.invQueue.put((stream, inventoryHash))
|
||||||
|
|
||||||
inventoryHash = calculateInventoryHash(data)
|
logger.info("Broadcasting inventory object with hash: %s", hexlify(inventoryHash))
|
||||||
if inventoryHash in Inventory():
|
|
||||||
logger.debug('We have already received this pubkey. Ignoring it.')
|
|
||||||
return
|
|
||||||
objectType = 1
|
|
||||||
Inventory()[inventoryHash] = (
|
|
||||||
objectType, streamNumber, data, embeddedTime, tag)
|
|
||||||
# This object is valid. Forward it to peers.
|
|
||||||
logger.debug('advertising inv with hash: %s', hexlify(inventoryHash))
|
|
||||||
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
|
||||||
|
|
||||||
# Now let's queue it to be processed ourselves.
|
queues.objectProcessorQueue.put((objectType, payload))
|
||||||
objectProcessorQueue.put((objectType, data))
|
|
||||||
|
|
||||||
|
|
||||||
def _checkAndShareBroadcastWithPeers(data):
|
|
||||||
if len(data) < 180:
|
|
||||||
logger.debug(
|
|
||||||
'The payload length of this broadcast packet is unreasonably low. '
|
|
||||||
'Someone is probably trying funny business. Ignoring message.')
|
|
||||||
return
|
|
||||||
embeddedTime, = unpack('>Q', data[8:16])
|
|
||||||
readPosition = 20 # bypass the nonce, time, and object type
|
|
||||||
broadcastVersion, broadcastVersionLength = decodeVarint(
|
|
||||||
data[readPosition:readPosition + 10])
|
|
||||||
readPosition += broadcastVersionLength
|
|
||||||
if broadcastVersion >= 2:
|
|
||||||
streamNumber, streamNumberLength = decodeVarint(data[readPosition:readPosition + 10])
|
|
||||||
readPosition += streamNumberLength
|
|
||||||
if streamNumber not in state.streamsInWhichIAmParticipating:
|
|
||||||
logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber)
|
|
||||||
return
|
|
||||||
if broadcastVersion >= 3:
|
|
||||||
tag = data[readPosition:readPosition + 32]
|
|
||||||
else:
|
|
||||||
tag = ''
|
|
||||||
inventoryHash = calculateInventoryHash(data)
|
|
||||||
if inventoryHash in Inventory():
|
|
||||||
logger.debug('We have already received this broadcast object. Ignoring.')
|
|
||||||
return
|
|
||||||
# It is valid. Let's let our peers know about it.
|
|
||||||
objectType = 3
|
|
||||||
Inventory()[inventoryHash] = (
|
|
||||||
objectType, streamNumber, data, embeddedTime, tag)
|
|
||||||
# This object is valid. Forward it to peers.
|
|
||||||
logger.debug('advertising inv with hash: %s', hexlify(inventoryHash))
|
|
||||||
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
|
||||||
|
|
||||||
# Now let's queue it to be processed ourselves.
|
|
||||||
objectProcessorQueue.put((objectType, data))
|
|
||||||
|
|
||||||
|
|
||||||
def broadcastToSendDataQueues(data):
|
|
||||||
"""
|
|
||||||
If you want to command all of the sendDataThreads to do something, like shutdown or send some data, this
|
|
||||||
function puts your data into the queues for each of the sendDataThreads. The sendDataThreads are
|
|
||||||
responsible for putting their queue into (and out of) the sendDataQueues list.
|
|
||||||
"""
|
|
||||||
for q in state.sendDataQueues:
|
|
||||||
q.put(data)
|
|
||||||
|
|
||||||
|
return inventoryHash
|
||||||
|
|
||||||
# sslProtocolVersion
|
# sslProtocolVersion
|
||||||
if sys.version_info >= (2, 7, 13):
|
if sys.version_info >= (2, 7, 13):
|
||||||
|
|
244
src/shared.py
244
src/shared.py
|
@ -24,8 +24,6 @@ from addresses import (
|
||||||
calculateInventoryHash
|
calculateInventoryHash
|
||||||
)
|
)
|
||||||
from helper_sql import sqlQuery, sqlExecute
|
from helper_sql import sqlQuery, sqlExecute
|
||||||
from inventory import Inventory
|
|
||||||
from queues import objectProcessorQueue
|
|
||||||
|
|
||||||
|
|
||||||
verbose = 1
|
verbose = 1
|
||||||
|
@ -286,248 +284,6 @@ def fixSensitiveFilePermissions(filename, hasEnabledKeys):
|
||||||
logger.exception('Keyfile permissions could not be fixed.')
|
logger.exception('Keyfile permissions could not be fixed.')
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def checkAndShareObjectWithPeers(data):
|
|
||||||
"""
|
|
||||||
This function is called after either receiving an object
|
|
||||||
off of the wire or after receiving one as ackdata.
|
|
||||||
Returns the length of time that we should reserve to process
|
|
||||||
this message if we are receiving it off of the wire.
|
|
||||||
"""
|
|
||||||
if len(data) > 2 ** 18:
|
|
||||||
logger.info(
|
|
||||||
'The payload length of this object is too large (%i bytes).'
|
|
||||||
' Ignoring it.', len(data)
|
|
||||||
)
|
|
||||||
return 0
|
|
||||||
# Let us check to make sure that the proof of work is sufficient.
|
|
||||||
if not protocol.isProofOfWorkSufficient(data):
|
|
||||||
logger.info('Proof of work is insufficient.')
|
|
||||||
return 0
|
|
||||||
|
|
||||||
endOfLifeTime, = unpack('>Q', data[8:16])
|
|
||||||
# The TTL may not be larger than 28 days + 3 hours of wiggle room
|
|
||||||
if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800:
|
|
||||||
logger.info(
|
|
||||||
'This object\'s End of Life time is too far in the future.'
|
|
||||||
' Ignoring it. Time is %s', endOfLifeTime
|
|
||||||
)
|
|
||||||
return 0
|
|
||||||
# The EOL time was more than an hour ago. That's too much.
|
|
||||||
if endOfLifeTime - int(time.time()) < -3600:
|
|
||||||
logger.info(
|
|
||||||
'This object\'s End of Life time was more than an hour ago.'
|
|
||||||
' Ignoring the object. Time is %s' % endOfLifeTime
|
|
||||||
)
|
|
||||||
return 0
|
|
||||||
intObjectType, = unpack('>I', data[16:20])
|
|
||||||
try:
|
|
||||||
if intObjectType == 0:
|
|
||||||
_checkAndShareGetpubkeyWithPeers(data)
|
|
||||||
return 0.1
|
|
||||||
elif intObjectType == 1:
|
|
||||||
_checkAndSharePubkeyWithPeers(data)
|
|
||||||
return 0.1
|
|
||||||
elif intObjectType == 2:
|
|
||||||
_checkAndShareMsgWithPeers(data)
|
|
||||||
return 0.6
|
|
||||||
elif intObjectType == 3:
|
|
||||||
_checkAndShareBroadcastWithPeers(data)
|
|
||||||
return 0.6
|
|
||||||
else:
|
|
||||||
_checkAndShareUndefinedObjectWithPeers(data)
|
|
||||||
return 0.6
|
|
||||||
except varintDecodeError as e:
|
|
||||||
logger.debug(
|
|
||||||
'There was a problem with a varint while checking'
|
|
||||||
' to see whether it was appropriate to share an object'
|
|
||||||
' with peers. Some details: %s' % e)
|
|
||||||
except Exception:
|
|
||||||
logger.critical(
|
|
||||||
'There was a problem while checking to see whether it was'
|
|
||||||
' appropriate to share an object with peers. This is'
|
|
||||||
' definitely a bug! \n%s' % traceback.format_exc())
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
def _checkAndShareUndefinedObjectWithPeers(data):
|
|
||||||
embeddedTime, = unpack('>Q', data[8:16])
|
|
||||||
readPosition = 20 # bypass nonce, time, and object type
|
|
||||||
objectVersion, objectVersionLength = decodeVarint(
|
|
||||||
data[readPosition:readPosition + 9])
|
|
||||||
readPosition += objectVersionLength
|
|
||||||
streamNumber, streamNumberLength = decodeVarint(
|
|
||||||
data[readPosition:readPosition + 9])
|
|
||||||
if streamNumber not in state.streamsInWhichIAmParticipating:
|
|
||||||
logger.debug(
|
|
||||||
'The streamNumber %i isn\'t one we are interested in.',
|
|
||||||
streamNumber
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
inventoryHash = calculateInventoryHash(data)
|
|
||||||
if inventoryHash in Inventory():
|
|
||||||
logger.debug(
|
|
||||||
'We have already received this undefined object. Ignoring.')
|
|
||||||
return
|
|
||||||
objectType, = unpack('>I', data[16:20])
|
|
||||||
Inventory()[inventoryHash] = (
|
|
||||||
objectType, streamNumber, data, embeddedTime, '')
|
|
||||||
logger.debug('advertising inv with hash: %s', hexlify(inventoryHash))
|
|
||||||
protocol.broadcastToSendDataQueues(
|
|
||||||
(streamNumber, 'advertiseobject', inventoryHash))
|
|
||||||
|
|
||||||
|
|
||||||
def _checkAndShareMsgWithPeers(data):
|
|
||||||
embeddedTime, = unpack('>Q', data[8:16])
|
|
||||||
readPosition = 20 # bypass nonce, time, and object type
|
|
||||||
objectVersion, objectVersionLength = \
|
|
||||||
decodeVarint(data[readPosition:readPosition + 9])
|
|
||||||
readPosition += objectVersionLength
|
|
||||||
streamNumber, streamNumberLength = \
|
|
||||||
decodeVarint(data[readPosition:readPosition + 9])
|
|
||||||
if streamNumber not in state.streamsInWhichIAmParticipating:
|
|
||||||
logger.debug(
|
|
||||||
'The streamNumber %i isn\'t one we are interested in.',
|
|
||||||
streamNumber
|
|
||||||
)
|
|
||||||
return
|
|
||||||
readPosition += streamNumberLength
|
|
||||||
inventoryHash = calculateInventoryHash(data)
|
|
||||||
if inventoryHash in Inventory():
|
|
||||||
logger.debug('We have already received this msg message. Ignoring.')
|
|
||||||
return
|
|
||||||
# This msg message is valid. Let's let our peers know about it.
|
|
||||||
objectType = 2
|
|
||||||
Inventory()[inventoryHash] = (
|
|
||||||
objectType, streamNumber, data, embeddedTime, '')
|
|
||||||
logger.debug('advertising inv with hash: %s', hexlify(inventoryHash))
|
|
||||||
protocol.broadcastToSendDataQueues(
|
|
||||||
(streamNumber, 'advertiseobject', inventoryHash))
|
|
||||||
|
|
||||||
# Now let's enqueue it to be processed ourselves.
|
|
||||||
objectProcessorQueue.put((objectType, data))
|
|
||||||
|
|
||||||
|
|
||||||
def _checkAndShareGetpubkeyWithPeers(data):
|
|
||||||
if len(data) < 42:
|
|
||||||
logger.info(
|
|
||||||
'getpubkey message doesn\'t contain enough data. Ignoring.')
|
|
||||||
return
|
|
||||||
embeddedTime, = unpack('>Q', data[8:16])
|
|
||||||
readPosition = 20 # bypass the nonce, time, and object type
|
|
||||||
requestedAddressVersionNumber, addressVersionLength = \
|
|
||||||
decodeVarint(data[readPosition:readPosition + 10])
|
|
||||||
readPosition += addressVersionLength
|
|
||||||
streamNumber, streamNumberLength = \
|
|
||||||
decodeVarint(data[readPosition:readPosition + 10])
|
|
||||||
if streamNumber not in state.streamsInWhichIAmParticipating:
|
|
||||||
logger.debug(
|
|
||||||
'The streamNumber %i isn\'t one we are interested in.',
|
|
||||||
streamNumber
|
|
||||||
)
|
|
||||||
return
|
|
||||||
readPosition += streamNumberLength
|
|
||||||
|
|
||||||
inventoryHash = calculateInventoryHash(data)
|
|
||||||
if inventoryHash in Inventory():
|
|
||||||
logger.debug(
|
|
||||||
'We have already received this getpubkey request. Ignoring it.')
|
|
||||||
return
|
|
||||||
|
|
||||||
objectType = 0
|
|
||||||
Inventory()[inventoryHash] = (
|
|
||||||
objectType, streamNumber, data, embeddedTime, '')
|
|
||||||
# This getpubkey request is valid. Forward to peers.
|
|
||||||
logger.debug('advertising inv with hash: %s', hexlify(inventoryHash))
|
|
||||||
protocol.broadcastToSendDataQueues(
|
|
||||||
(streamNumber, 'advertiseobject', inventoryHash))
|
|
||||||
|
|
||||||
# Now let's queue it to be processed ourselves.
|
|
||||||
objectProcessorQueue.put((objectType, data))
|
|
||||||
|
|
||||||
|
|
||||||
def _checkAndSharePubkeyWithPeers(data):
|
|
||||||
if len(data) < 146 or len(data) > 440: # sanity check
|
|
||||||
return
|
|
||||||
embeddedTime, = unpack('>Q', data[8:16])
|
|
||||||
readPosition = 20 # bypass the nonce, time, and object type
|
|
||||||
addressVersion, varintLength = \
|
|
||||||
decodeVarint(data[readPosition:readPosition + 10])
|
|
||||||
readPosition += varintLength
|
|
||||||
streamNumber, varintLength = \
|
|
||||||
decodeVarint(data[readPosition:readPosition + 10])
|
|
||||||
readPosition += varintLength
|
|
||||||
if streamNumber not in state.streamsInWhichIAmParticipating:
|
|
||||||
logger.debug(
|
|
||||||
'The streamNumber %i isn\'t one we are interested in.',
|
|
||||||
streamNumber
|
|
||||||
)
|
|
||||||
return
|
|
||||||
if addressVersion >= 4:
|
|
||||||
tag = data[readPosition:readPosition + 32]
|
|
||||||
logger.debug('tag in received pubkey is: %s', hexlify(tag))
|
|
||||||
else:
|
|
||||||
tag = ''
|
|
||||||
|
|
||||||
inventoryHash = calculateInventoryHash(data)
|
|
||||||
if inventoryHash in Inventory():
|
|
||||||
logger.debug('We have already received this pubkey. Ignoring it.')
|
|
||||||
return
|
|
||||||
objectType = 1
|
|
||||||
Inventory()[inventoryHash] = (
|
|
||||||
objectType, streamNumber, data, embeddedTime, tag)
|
|
||||||
# This object is valid. Forward it to peers.
|
|
||||||
logger.debug('advertising inv with hash: %s', hexlify(inventoryHash))
|
|
||||||
protocol.broadcastToSendDataQueues(
|
|
||||||
(streamNumber, 'advertiseobject', inventoryHash))
|
|
||||||
|
|
||||||
# Now let's queue it to be processed ourselves.
|
|
||||||
objectProcessorQueue.put((objectType, data))
|
|
||||||
|
|
||||||
|
|
||||||
def _checkAndShareBroadcastWithPeers(data):
|
|
||||||
if len(data) < 180:
|
|
||||||
logger.debug(
|
|
||||||
'The payload length of this broadcast packet is unreasonably low.'
|
|
||||||
' Someone is probably trying funny business. Ignoring message.')
|
|
||||||
return
|
|
||||||
embeddedTime, = unpack('>Q', data[8:16])
|
|
||||||
readPosition = 20 # bypass the nonce, time, and object type
|
|
||||||
broadcastVersion, broadcastVersionLength = \
|
|
||||||
decodeVarint(data[readPosition:readPosition + 10])
|
|
||||||
readPosition += broadcastVersionLength
|
|
||||||
if broadcastVersion >= 2:
|
|
||||||
streamNumber, streamNumberLength = \
|
|
||||||
decodeVarint(data[readPosition:readPosition + 10])
|
|
||||||
readPosition += streamNumberLength
|
|
||||||
if streamNumber not in state.streamsInWhichIAmParticipating:
|
|
||||||
logger.debug(
|
|
||||||
'The streamNumber %i isn\'t one we are interested in.',
|
|
||||||
streamNumber
|
|
||||||
)
|
|
||||||
return
|
|
||||||
if broadcastVersion >= 3:
|
|
||||||
tag = data[readPosition:readPosition+32]
|
|
||||||
else:
|
|
||||||
tag = ''
|
|
||||||
inventoryHash = calculateInventoryHash(data)
|
|
||||||
if inventoryHash in Inventory():
|
|
||||||
logger.debug(
|
|
||||||
'We have already received this broadcast object. Ignoring.')
|
|
||||||
return
|
|
||||||
# It is valid. Let's let our peers know about it.
|
|
||||||
objectType = 3
|
|
||||||
Inventory()[inventoryHash] = (
|
|
||||||
objectType, streamNumber, data, embeddedTime, tag)
|
|
||||||
# This object is valid. Forward it to peers.
|
|
||||||
logger.debug('advertising inv with hash: %s', hexlify(inventoryHash))
|
|
||||||
protocol.broadcastToSendDataQueues(
|
|
||||||
(streamNumber, 'advertiseobject', inventoryHash))
|
|
||||||
|
|
||||||
# Now let's queue it to be processed ourselves.
|
|
||||||
objectProcessorQueue.put((objectType, data))
|
|
||||||
|
|
||||||
|
|
||||||
def openKeysFile():
|
def openKeysFile():
|
||||||
if 'linux' in sys.platform:
|
if 'linux' in sys.platform:
|
||||||
subprocess.call(["xdg-open", state.appdata + 'keys.dat'])
|
subprocess.call(["xdg-open", state.appdata + 'keys.dat'])
|
||||||
|
|
|
@ -210,17 +210,6 @@ def getDestinationAddressProperties(address):
|
||||||
def randomizeTTL(TTL):
|
def randomizeTTL(TTL):
|
||||||
return TTL + helper_random.randomrandrange(-300, 300)
|
return TTL + helper_random.randomrandrange(-300, 300)
|
||||||
|
|
||||||
def disseminateObject(nonce, expiryTime, headlessPayload, objectType, stream, tag):
|
|
||||||
payload = nonce + struct.pack(">Q", expiryTime) + headlessPayload
|
|
||||||
inventoryHash = protocol.calculateDoubleHash(payload)[: 32]
|
|
||||||
|
|
||||||
inventory.Inventory()[inventoryHash] = objectType, stream, payload, expiryTime, buffer(tag)
|
|
||||||
queues.invQueue.put((stream, inventoryHash))
|
|
||||||
|
|
||||||
debug.logger.info("Broadcasting inventory object with hash: %s", binascii.hexlify(inventoryHash))
|
|
||||||
|
|
||||||
return inventoryHash, payload
|
|
||||||
|
|
||||||
workProver = workprover.WorkProver(
|
workProver = workprover.WorkProver(
|
||||||
os.path.join(paths.codePath(), "workprover"),
|
os.path.join(paths.codePath(), "workprover"),
|
||||||
helper_random.randomBytes(32),
|
helper_random.randomBytes(32),
|
||||||
|
@ -370,7 +359,7 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
|
||||||
debug.logger.info("Found proof of work %s", ID)
|
debug.logger.info("Found proof of work %s", ID)
|
||||||
|
|
||||||
if ID in self.startedWorks:
|
if ID in self.startedWorks:
|
||||||
self.startedWorks[ID](nonce, expiryTime)
|
self.startedWorks[ID](nonce + struct.pack(">Q", expiryTime))
|
||||||
|
|
||||||
del self.startedWorks[ID]
|
del self.startedWorks[ID]
|
||||||
|
|
||||||
|
@ -412,8 +401,6 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
|
||||||
headlessPayload += addresses.encodeVarint(addressProperties.version)
|
headlessPayload += addresses.encodeVarint(addressProperties.version)
|
||||||
headlessPayload += addresses.encodeVarint(addressProperties.stream)
|
headlessPayload += addresses.encodeVarint(addressProperties.stream)
|
||||||
|
|
||||||
inventoryTagPosition = len(headlessPayload)
|
|
||||||
|
|
||||||
headlessPayload += tag
|
headlessPayload += tag
|
||||||
|
|
||||||
if addressProperties.version == 4:
|
if addressProperties.version == 4:
|
||||||
|
@ -449,10 +436,8 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
|
||||||
headlessPayload += addresses.encodeVarint(len(signature))
|
headlessPayload += addresses.encodeVarint(len(signature))
|
||||||
headlessPayload += signature
|
headlessPayload += signature
|
||||||
|
|
||||||
def workDone(nonce, expiryTime):
|
def workDone(head):
|
||||||
inventoryTag = headlessPayload[inventoryTagPosition: inventoryTagPosition + 32]
|
protocol.checkAndShareObjectWithPeers(head + headlessPayload)
|
||||||
|
|
||||||
disseminateObject(nonce, expiryTime, headlessPayload, 1, addressProperties.stream, inventoryTag)
|
|
||||||
|
|
||||||
# TODO: not atomic with the addition to the inventory, the "lastpubkeysendtime" property should be removed
|
# TODO: not atomic with the addition to the inventory, the "lastpubkeysendtime" property should be removed
|
||||||
# Instead check if the pubkey is present in the inventory
|
# Instead check if the pubkey is present in the inventory
|
||||||
|
@ -528,8 +513,6 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
|
||||||
|
|
||||||
headlessPayload += addresses.encodeVarint(addressProperties.stream)
|
headlessPayload += addresses.encodeVarint(addressProperties.stream)
|
||||||
|
|
||||||
inventoryTagPosition = len(headlessPayload)
|
|
||||||
|
|
||||||
headlessPayload += tag
|
headlessPayload += tag
|
||||||
|
|
||||||
plaintext = addresses.encodeVarint(addressProperties.version)
|
plaintext = addresses.encodeVarint(addressProperties.version)
|
||||||
|
@ -566,15 +549,10 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def workDone(nonce, expiryTime):
|
def workDone(head):
|
||||||
inventoryTag = headlessPayload[inventoryTagPosition: inventoryTagPosition + 32]
|
|
||||||
|
|
||||||
# TODO: adding to the inventory, adding to inbox and setting the sent status should be within a single SQL transaction
|
# TODO: adding to the inventory, adding to inbox and setting the sent status should be within a single SQL transaction
|
||||||
|
|
||||||
inventoryHash, payload = disseminateObject(
|
inventoryHash = protocol.checkAndShareObjectWithPeers(head + headlessPayload)
|
||||||
nonce, expiryTime, headlessPayload,
|
|
||||||
3, addressProperties.stream, inventoryTag
|
|
||||||
)
|
|
||||||
|
|
||||||
helper_sql.sqlExecute("""
|
helper_sql.sqlExecute("""
|
||||||
UPDATE "sent" SET "msgid" = ?, "status" = 'broadcastsent', "lastactiontime" = ?
|
UPDATE "sent" SET "msgid" = ?, "status" = 'broadcastsent', "lastactiontime" = ?
|
||||||
|
@ -587,15 +565,6 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
|
||||||
tr._translate("MainWindow", "Broadcast sent on %1").arg(l10n.formatTimestamp())
|
tr._translate("MainWindow", "Broadcast sent on %1").arg(l10n.formatTimestamp())
|
||||||
)))
|
)))
|
||||||
|
|
||||||
# Add to own inbox
|
|
||||||
|
|
||||||
if addressProperties.version == 4:
|
|
||||||
if tag in shared.MyECSubscriptionCryptorObjects:
|
|
||||||
queues.objectProcessorQueue.put((3, payload))
|
|
||||||
else:
|
|
||||||
if addressProperties.ripe in shared.MyECSubscriptionCryptorObjects:
|
|
||||||
queues.objectProcessorQueue.put((3, payload))
|
|
||||||
|
|
||||||
helper_sql.sqlExecute("""UPDATE "sent" SET "status" = 'doingbroadcastpow' WHERE "ackdata" == ?;""", ackData)
|
helper_sql.sqlExecute("""UPDATE "sent" SET "status" = 'doingbroadcastpow' WHERE "ackdata" == ?;""", ackData)
|
||||||
|
|
||||||
queues.UISignalQueue.put(("updateSentItemStatusByAckdata", (
|
queues.UISignalQueue.put(("updateSentItemStatusByAckdata", (
|
||||||
|
@ -679,10 +648,8 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
|
||||||
TTL = randomizeTTL(TTL)
|
TTL = randomizeTTL(TTL)
|
||||||
expiryTime = int(time.time() + TTL)
|
expiryTime = int(time.time() + TTL)
|
||||||
|
|
||||||
def workDone(nonce, expiryTime):
|
def workDone(head):
|
||||||
payload = nonce + struct.pack(">Q", expiryTime) + ackData
|
callback(protocol.CreatePacket("object", head + ackData))
|
||||||
|
|
||||||
callback(protocol.CreatePacket("object", payload))
|
|
||||||
|
|
||||||
self.startWork(
|
self.startWork(
|
||||||
ID, ackData, TTL, expiryTime,
|
ID, ackData, TTL, expiryTime,
|
||||||
|
@ -834,7 +801,6 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
|
||||||
return
|
return
|
||||||
|
|
||||||
headlessPayload += ciphertext
|
headlessPayload += ciphertext
|
||||||
inventoryTag = ciphertext[: 32]
|
|
||||||
|
|
||||||
if len(headlessPayload) > 2 ** 18 - (8 + 8): # 256 kiB
|
if len(headlessPayload) > 2 ** 18 - (8 + 8): # 256 kiB
|
||||||
debug.logger.critical(
|
debug.logger.critical(
|
||||||
|
@ -844,16 +810,13 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def workDone(nonce, expiryTime):
|
def workDone(head):
|
||||||
if ackMessage is not None:
|
if ackMessage is not None:
|
||||||
state.watchedAckData.add(ackData)
|
state.watchedAckData.add(ackData)
|
||||||
|
|
||||||
#TODO: adding to the inventory, adding to inbox and setting the sent status should be within a single SQL transaction
|
#TODO: adding to the inventory, adding to inbox and setting the sent status should be within a single SQL transaction
|
||||||
|
|
||||||
inventoryHash, payload = disseminateObject(
|
inventoryHash = protocol.checkAndShareObjectWithPeers(head + headlessPayload)
|
||||||
nonce, expiryTime, headlessPayload,
|
|
||||||
2, destinationProperties.stream, inventoryTag
|
|
||||||
)
|
|
||||||
|
|
||||||
if ackMessage is None:
|
if ackMessage is None:
|
||||||
newStatus = "msgsentnoackexpected"
|
newStatus = "msgsentnoackexpected"
|
||||||
|
@ -868,11 +831,6 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
|
||||||
WHERE "status" == 'doingmsgpow' AND "ackdata" == ?;
|
WHERE "status" == 'doingmsgpow' AND "ackdata" == ?;
|
||||||
""", inventoryHash, newStatus, retryNumber + 1, sleepTill, int(time.time()), ackData)
|
""", inventoryHash, newStatus, retryNumber + 1, sleepTill, int(time.time()), ackData)
|
||||||
|
|
||||||
# Add to own inbox
|
|
||||||
|
|
||||||
if destinationProperties.own:
|
|
||||||
queues.objectProcessorQueue.put((2, payload))
|
|
||||||
|
|
||||||
if ackMessage is None:
|
if ackMessage is None:
|
||||||
queues.UISignalQueue.put(("updateSentItemStatusByAckdata", (
|
queues.UISignalQueue.put(("updateSentItemStatusByAckdata", (
|
||||||
"msgsentnoackexpected",
|
"msgsentnoackexpected",
|
||||||
|
@ -1061,10 +1019,10 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
|
||||||
|
|
||||||
headlessPayload += tag
|
headlessPayload += tag
|
||||||
|
|
||||||
def workDone(nonce, expiryTime):
|
def workDone(head):
|
||||||
# TODO: adding to the inventory and setting the sent status should be within a single SQL transaction
|
# TODO: adding to the inventory and setting the sent status should be within a single SQL transaction
|
||||||
|
|
||||||
disseminateObject(nonce, expiryTime, headlessPayload, 0, stream, tag)
|
protocol.checkAndShareObjectWithPeers(head + headlessPayload)
|
||||||
|
|
||||||
sleepTill = int(time.time() + TTL * 1.1)
|
sleepTill = int(time.time() + TTL * 1.1)
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,6 @@ neededPubkeys = {}
|
||||||
watchedAckData = set()
|
watchedAckData = set()
|
||||||
|
|
||||||
streamsInWhichIAmParticipating = []
|
streamsInWhichIAmParticipating = []
|
||||||
sendDataQueues = [] # each sendData thread puts its queue in this list.
|
|
||||||
|
|
||||||
# For UPnP
|
# For UPnP
|
||||||
extPort = None
|
extPort = None
|
||||||
|
|
Loading…
Reference in New Issue
Block a user