fix some parts of inventory syncing
Remained works: * Objects are not saved. * Decryption does not work. * Can not shutdown.
This commit is contained in:
parent
e646377f22
commit
741f8dd461
|
@ -1347,7 +1347,7 @@ class BMRPCDispatcher(object):
|
||||||
'Broadcasting inv for msg(API disseminatePreEncryptedMsg'
|
'Broadcasting inv for msg(API disseminatePreEncryptedMsg'
|
||||||
' command): %s', hexlify(inventoryHash))
|
' command): %s', hexlify(inventoryHash))
|
||||||
queues.invQueue.put((toStreamNumber, inventoryHash))
|
queues.invQueue.put((toStreamNumber, inventoryHash))
|
||||||
return hexlify(inventoryHash).decode()
|
return hexlify(inventoryHash).decode('ascii')
|
||||||
|
|
||||||
@command('trashSentMessageByAckData')
|
@command('trashSentMessageByAckData')
|
||||||
def HandleTrashSentMessageByAckDAta(self, ackdata):
|
def HandleTrashSentMessageByAckDAta(self, ackdata):
|
||||||
|
|
|
@ -2683,7 +2683,7 @@ class MyForm(settingsmixin.SMainWindow):
|
||||||
" %n object(s) to be downloaded. If you quit now,"
|
" %n object(s) to be downloaded. If you quit now,"
|
||||||
" it may cause delivery delays. Wait until the"
|
" it may cause delivery delays. Wait until the"
|
||||||
" synchronisation finishes?", None,
|
" synchronisation finishes?", None,
|
||||||
QtCore.QCoreApplication.CodecForTr, pendingDownload()
|
pendingDownload()
|
||||||
),
|
),
|
||||||
QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No
|
QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No
|
||||||
| QtWidgets.QMessageBox.StandardButton.Cancel, QtWidgets.QMessageBox.StandardButton.Cancel)
|
| QtWidgets.QMessageBox.StandardButton.Cancel, QtWidgets.QMessageBox.StandardButton.Cancel)
|
||||||
|
|
|
@ -164,7 +164,7 @@ class NetworkStatus(QtWidgets.QWidget, RetranslateMixin):
|
||||||
)
|
)
|
||||||
self.tableWidgetConnectionCount.setItem(
|
self.tableWidgetConnectionCount.setItem(
|
||||||
0, 2,
|
0, 2,
|
||||||
QtWidgets.QTableWidgetItem("%s" % (c.userAgent.decode()))
|
QtWidgets.QTableWidgetItem("%s" % (c.userAgent.decode('utf-8', 'backslashreplace')))
|
||||||
)
|
)
|
||||||
self.tableWidgetConnectionCount.setItem(
|
self.tableWidgetConnectionCount.setItem(
|
||||||
0, 3,
|
0, 3,
|
||||||
|
|
|
@ -172,10 +172,10 @@ class addressGenerator(StoppableThread):
|
||||||
config.set(address, 'payloadlengthextrabytes', str(
|
config.set(address, 'payloadlengthextrabytes', str(
|
||||||
payloadLengthExtraBytes))
|
payloadLengthExtraBytes))
|
||||||
config.set(
|
config.set(
|
||||||
address, 'privsigningkey', privSigningKeyWIF.decode())
|
address, 'privsigningkey', privSigningKeyWIF.decode('ascii'))
|
||||||
config.set(
|
config.set(
|
||||||
address, 'privencryptionkey',
|
address, 'privencryptionkey',
|
||||||
privEncryptionKeyWIF.decode())
|
privEncryptionKeyWIF.decode('ascii'))
|
||||||
config.save()
|
config.save()
|
||||||
|
|
||||||
# The API and the join and create Chan functionality
|
# The API and the join and create Chan functionality
|
||||||
|
@ -325,10 +325,10 @@ class addressGenerator(StoppableThread):
|
||||||
str(payloadLengthExtraBytes))
|
str(payloadLengthExtraBytes))
|
||||||
config.set(
|
config.set(
|
||||||
address, 'privsigningkey',
|
address, 'privsigningkey',
|
||||||
privSigningKeyWIF.decode())
|
privSigningKeyWIF.decode('ascii'))
|
||||||
config.set(
|
config.set(
|
||||||
address, 'privencryptionkey',
|
address, 'privencryptionkey',
|
||||||
privEncryptionKeyWIF.decode())
|
privEncryptionKeyWIF.decode('ascii'))
|
||||||
config.save()
|
config.save()
|
||||||
|
|
||||||
queues.UISignalQueue.put((
|
queues.UISignalQueue.put((
|
||||||
|
@ -340,12 +340,14 @@ class addressGenerator(StoppableThread):
|
||||||
shared.myECCryptorObjects[ripe] = \
|
shared.myECCryptorObjects[ripe] = \
|
||||||
highlevelcrypto.makeCryptor(
|
highlevelcrypto.makeCryptor(
|
||||||
hexlify(potentialPrivEncryptionKey))
|
hexlify(potentialPrivEncryptionKey))
|
||||||
shared.myAddressesByHash[ripe] = address
|
hex_ripe = hexlify(ripe).decode('ascii')
|
||||||
|
shared.myAddressesByHash[hex_ripe] = address
|
||||||
tag = highlevelcrypto.double_sha512(
|
tag = highlevelcrypto.double_sha512(
|
||||||
encodeVarint(addressVersionNumber)
|
encodeVarint(addressVersionNumber)
|
||||||
+ encodeVarint(streamNumber) + ripe
|
+ encodeVarint(streamNumber) + ripe
|
||||||
)[32:]
|
)[32:]
|
||||||
shared.myAddressesByTag[tag] = address
|
hex_tag = hexlify(tag).decode('ascii')
|
||||||
|
shared.myAddressesByTag[hex_tag] = address
|
||||||
if addressVersionNumber == 3:
|
if addressVersionNumber == 3:
|
||||||
# If this is a chan address,
|
# If this is a chan address,
|
||||||
# the worker thread won't send out
|
# the worker thread won't send out
|
||||||
|
|
|
@ -141,9 +141,10 @@ class objectProcessor(threading.Thread):
|
||||||
# bypass nonce and time, retain object type/version/stream + body
|
# bypass nonce and time, retain object type/version/stream + body
|
||||||
readPosition = 16
|
readPosition = 16
|
||||||
|
|
||||||
if data[readPosition:] in state.ackdataForWhichImWatching:
|
hex_data = hexlify(data[readPosition:]).decode('ascii')
|
||||||
|
if hex_data in state.ackdataForWhichImWatching:
|
||||||
logger.info('This object is an acknowledgement bound for me.')
|
logger.info('This object is an acknowledgement bound for me.')
|
||||||
del state.ackdataForWhichImWatching[data[readPosition:]]
|
del state.ackdataForWhichImWatching[hex_data]
|
||||||
sqlExecute(
|
sqlExecute(
|
||||||
"UPDATE sent SET status='ackreceived', lastactiontime=?"
|
"UPDATE sent SET status='ackreceived', lastactiontime=?"
|
||||||
" WHERE ackdata=?", int(time.time()), data[readPosition:])
|
" WHERE ackdata=?", int(time.time()), data[readPosition:])
|
||||||
|
@ -217,19 +218,20 @@ class objectProcessor(threading.Thread):
|
||||||
'the hash requested in this getpubkey request is: %s',
|
'the hash requested in this getpubkey request is: %s',
|
||||||
hexlify(requestedHash))
|
hexlify(requestedHash))
|
||||||
# if this address hash is one of mine
|
# if this address hash is one of mine
|
||||||
if requestedHash in shared.myAddressesByHash:
|
hex_hash = hexlify(requestedHash).decode('ascii')
|
||||||
myAddress = shared.myAddressesByHash[requestedHash]
|
if hex_hash in shared.myAddressesByHash:
|
||||||
|
myAddress = shared.myAddressesByHash[hex_hash]
|
||||||
elif requestedAddressVersionNumber >= 4:
|
elif requestedAddressVersionNumber >= 4:
|
||||||
requestedTag = data[readPosition:readPosition + 32]
|
requestedTag = data[readPosition:readPosition + 32]
|
||||||
if len(requestedTag) != 32:
|
if len(requestedTag) != 32:
|
||||||
return logger.debug(
|
return logger.debug(
|
||||||
'The length of the requested tag is not 32 bytes.'
|
'The length of the requested tag is not 32 bytes.'
|
||||||
' Something is wrong. Ignoring.')
|
' Something is wrong. Ignoring.')
|
||||||
|
hex_tag = hexlify(requestedTag).decode('ascii')
|
||||||
logger.debug(
|
logger.debug(
|
||||||
'the tag requested in this getpubkey request is: %s',
|
'the tag requested in this getpubkey request is: %s', hex_tag)
|
||||||
hexlify(requestedTag))
|
if hex_tag in shared.myAddressesByTag:
|
||||||
if requestedTag in shared.myAddressesByTag:
|
myAddress = shared.myAddressesByTag[hex_tag]
|
||||||
myAddress = shared.myAddressesByTag[requestedTag]
|
|
||||||
|
|
||||||
if myAddress == '':
|
if myAddress == '':
|
||||||
logger.info('This getpubkey request is not for any of my keys.')
|
logger.info('This getpubkey request is not for any of my keys.')
|
||||||
|
@ -419,12 +421,13 @@ class objectProcessor(threading.Thread):
|
||||||
' Sanity check failed.')
|
' Sanity check failed.')
|
||||||
|
|
||||||
tag = data[readPosition:readPosition + 32]
|
tag = data[readPosition:readPosition + 32]
|
||||||
if tag not in state.neededPubkeys:
|
hex_tag = 'tag-' + hexlify(tag).decode('ascii')
|
||||||
|
if hex_tag not in state.neededPubkeys:
|
||||||
return logger.info(
|
return logger.info(
|
||||||
'We don\'t need this v4 pubkey. We didn\'t ask for it.')
|
'We don\'t need this v4 pubkey. We didn\'t ask for it.')
|
||||||
|
|
||||||
# Let us try to decrypt the pubkey
|
# Let us try to decrypt the pubkey
|
||||||
toAddress = state.neededPubkeys[tag][0]
|
toAddress = state.neededPubkeys[hex_tag][0]
|
||||||
if protocol.decryptAndCheckPubkeyPayload(data, toAddress) == \
|
if protocol.decryptAndCheckPubkeyPayload(data, toAddress) == \
|
||||||
'successful':
|
'successful':
|
||||||
# At this point we know that we have been waiting on this
|
# At this point we know that we have been waiting on this
|
||||||
|
@ -489,7 +492,8 @@ class objectProcessor(threading.Thread):
|
||||||
|
|
||||||
# This is a message bound for me.
|
# This is a message bound for me.
|
||||||
# Look up my address based on the RIPE hash.
|
# Look up my address based on the RIPE hash.
|
||||||
toAddress = shared.myAddressesByHash[toRipe]
|
hex_ripe = hexlify(toRipe).decode('ascii')
|
||||||
|
toAddress = shared.myAddressesByHash[hex_ripe]
|
||||||
readPosition = 0
|
readPosition = 0
|
||||||
sendersAddressVersionNumber, sendersAddressVersionNumberLength = \
|
sendersAddressVersionNumber, sendersAddressVersionNumberLength = \
|
||||||
decodeVarint(decryptedData[readPosition:readPosition + 10])
|
decodeVarint(decryptedData[readPosition:readPosition + 10])
|
||||||
|
@ -1006,8 +1010,9 @@ class objectProcessor(threading.Thread):
|
||||||
encodeVarint(addressVersion) + encodeVarint(streamNumber)
|
encodeVarint(addressVersion) + encodeVarint(streamNumber)
|
||||||
+ ripe
|
+ ripe
|
||||||
)[32:]
|
)[32:]
|
||||||
if tag in state.neededPubkeys:
|
hex_tag = 'tag-' + hexlify(tag).decode('ascii')
|
||||||
del state.neededPubkeys[tag]
|
if hex_tag in state.neededPubkeys:
|
||||||
|
del state.neededPubkeys[hex_tag]
|
||||||
self.sendMessages(address)
|
self.sendMessages(address)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|
|
@ -87,7 +87,8 @@ class singleWorker(StoppableThread):
|
||||||
tag = doubleHashOfAddressData[32:]
|
tag = doubleHashOfAddressData[32:]
|
||||||
# We'll need this for when we receive a pubkey reply:
|
# We'll need this for when we receive a pubkey reply:
|
||||||
# it will be encrypted and we'll need to decrypt it.
|
# it will be encrypted and we'll need to decrypt it.
|
||||||
state.neededPubkeys[tag] = (
|
hex_tag = 'tag-' + hexlify(tag).decode('ascii')
|
||||||
|
state.neededPubkeys[hex_tag] = (
|
||||||
toAddress,
|
toAddress,
|
||||||
highlevelcrypto.makeCryptor(
|
highlevelcrypto.makeCryptor(
|
||||||
hexlify(privEncryptionKey))
|
hexlify(privEncryptionKey))
|
||||||
|
@ -99,19 +100,22 @@ class singleWorker(StoppableThread):
|
||||||
for row in queryreturn:
|
for row in queryreturn:
|
||||||
ackdata, = row
|
ackdata, = row
|
||||||
self.logger.info('Watching for ackdata %s', hexlify(ackdata))
|
self.logger.info('Watching for ackdata %s', hexlify(ackdata))
|
||||||
state.ackdataForWhichImWatching[ackdata] = 0
|
hex_ackdata = hexlify(ackdata).decode('ascii')
|
||||||
|
state.ackdataForWhichImWatching[hex_ackdata] = 0
|
||||||
|
|
||||||
# Fix legacy (headerless) watched ackdata to include header
|
# Fix legacy (headerless) watched ackdata to include header
|
||||||
for oldack in state.ackdataForWhichImWatching:
|
for hex_oldack in state.ackdataForWhichImWatching:
|
||||||
|
oldack = unhexlify(hex_oldack)
|
||||||
if len(oldack) == 32:
|
if len(oldack) == 32:
|
||||||
# attach legacy header, always constant (msg/1/1)
|
# attach legacy header, always constant (msg/1/1)
|
||||||
newack = '\x00\x00\x00\x02\x01\x01' + oldack
|
newack = b'\x00\x00\x00\x02\x01\x01' + oldack
|
||||||
state.ackdataForWhichImWatching[newack] = 0
|
hex_newack = hexlify(newack).decode('ascii')
|
||||||
|
state.ackdataForWhichImWatching[hex_newack] = 0
|
||||||
sqlExecute(
|
sqlExecute(
|
||||||
'''UPDATE sent SET ackdata=? WHERE ackdata=? AND folder = 'sent' ''',
|
'''UPDATE sent SET ackdata=? WHERE ackdata=? AND folder = 'sent' ''',
|
||||||
newack, oldack
|
newack, oldack
|
||||||
)
|
)
|
||||||
del state.ackdataForWhichImWatching[oldack]
|
del state.ackdataForWhichImWatching[hex_oldack]
|
||||||
|
|
||||||
# For the case if user deleted knownnodes
|
# For the case if user deleted knownnodes
|
||||||
# but is still having onionpeer objects in inventory
|
# but is still having onionpeer objects in inventory
|
||||||
|
@ -516,8 +520,8 @@ class singleWorker(StoppableThread):
|
||||||
|
|
||||||
inventoryHash = highlevelcrypto.calculateInventoryHash(payload)
|
inventoryHash = highlevelcrypto.calculateInventoryHash(payload)
|
||||||
state.Inventory[inventoryHash] = (
|
state.Inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, buffer(payload), # noqa: F821
|
objectType, streamNumber, memoryview(payload), # noqa: F821
|
||||||
embeddedTime, buffer(tag) # noqa: F821
|
embeddedTime, memoryview(tag) # noqa: F821
|
||||||
)
|
)
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'sending inv (within sendOnionPeerObj function) for object: %s',
|
'sending inv (within sendOnionPeerObj function) for object: %s',
|
||||||
|
@ -794,8 +798,9 @@ class singleWorker(StoppableThread):
|
||||||
encodeVarint(toAddressVersionNumber)
|
encodeVarint(toAddressVersionNumber)
|
||||||
+ encodeVarint(toStreamNumber) + toRipe
|
+ encodeVarint(toStreamNumber) + toRipe
|
||||||
)[32:]
|
)[32:]
|
||||||
|
hex_tag = 'tag-' + hexlify(toTag).decode('ascii')
|
||||||
if toaddress in state.neededPubkeys or \
|
if toaddress in state.neededPubkeys or \
|
||||||
toTag in state.neededPubkeys:
|
hex_tag in state.neededPubkeys:
|
||||||
# We already sent a request for the pubkey
|
# We already sent a request for the pubkey
|
||||||
sqlExecute(
|
sqlExecute(
|
||||||
'''UPDATE sent SET status='awaitingpubkey', '''
|
'''UPDATE sent SET status='awaitingpubkey', '''
|
||||||
|
@ -836,7 +841,8 @@ class singleWorker(StoppableThread):
|
||||||
privEncryptionKey = doubleHashOfToAddressData[:32]
|
privEncryptionKey = doubleHashOfToAddressData[:32]
|
||||||
# The second half of the sha512 hash.
|
# The second half of the sha512 hash.
|
||||||
tag = doubleHashOfToAddressData[32:]
|
tag = doubleHashOfToAddressData[32:]
|
||||||
state.neededPubkeys[tag] = (
|
hex_tag = 'tag-' + hexlify(tag).decode('ascii')
|
||||||
|
state.neededPubkeys[hex_tag] = (
|
||||||
toaddress,
|
toaddress,
|
||||||
highlevelcrypto.makeCryptor(
|
highlevelcrypto.makeCryptor(
|
||||||
hexlify(privEncryptionKey))
|
hexlify(privEncryptionKey))
|
||||||
|
@ -859,7 +865,7 @@ class singleWorker(StoppableThread):
|
||||||
''' status='doingpubkeypow') AND '''
|
''' status='doingpubkeypow') AND '''
|
||||||
''' folder='sent' ''',
|
''' folder='sent' ''',
|
||||||
toaddress)
|
toaddress)
|
||||||
del state.neededPubkeys[tag]
|
del state.neededPubkeys[hex_tag]
|
||||||
break
|
break
|
||||||
# else:
|
# else:
|
||||||
# There was something wrong with this
|
# There was something wrong with this
|
||||||
|
@ -901,7 +907,8 @@ class singleWorker(StoppableThread):
|
||||||
|
|
||||||
# if we aren't sending this to ourselves or a chan
|
# if we aren't sending this to ourselves or a chan
|
||||||
if not config.has_section(toaddress):
|
if not config.has_section(toaddress):
|
||||||
state.ackdataForWhichImWatching[ackdata] = 0
|
hex_ackdata = hexlify(ackdata).decode('ascii')
|
||||||
|
state.ackdataForWhichImWatching[hex_ackdata] = 0
|
||||||
queues.UISignalQueue.put((
|
queues.UISignalQueue.put((
|
||||||
'updateSentItemStatusByAckdata', (
|
'updateSentItemStatusByAckdata', (
|
||||||
ackdata,
|
ackdata,
|
||||||
|
@ -1412,10 +1419,11 @@ class singleWorker(StoppableThread):
|
||||||
privEncryptionKey = doubleHashOfAddressData[:32]
|
privEncryptionKey = doubleHashOfAddressData[:32]
|
||||||
# Note that this is the second half of the sha512 hash.
|
# Note that this is the second half of the sha512 hash.
|
||||||
tag = doubleHashOfAddressData[32:]
|
tag = doubleHashOfAddressData[32:]
|
||||||
if tag not in state.neededPubkeys:
|
hex_tag = 'tag-' + hexlify(tag).decode('ascii')
|
||||||
|
if hex_tag not in state.neededPubkeys:
|
||||||
# We'll need this for when we receive a pubkey reply:
|
# We'll need this for when we receive a pubkey reply:
|
||||||
# it will be encrypted and we'll need to decrypt it.
|
# it will be encrypted and we'll need to decrypt it.
|
||||||
state.neededPubkeys[tag] = (
|
state.neededPubkeys[hex_tag] = (
|
||||||
toAddress,
|
toAddress,
|
||||||
highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))
|
highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))
|
||||||
)
|
)
|
||||||
|
|
|
@ -28,8 +28,6 @@ class Inventory:
|
||||||
|
|
||||||
# cheap inheritance copied from asyncore
|
# cheap inheritance copied from asyncore
|
||||||
def __getattr__(self, attr):
|
def __getattr__(self, attr):
|
||||||
if attr == "__contains__":
|
|
||||||
self.numberOfInventoryLookupsPerformed += 1
|
|
||||||
try:
|
try:
|
||||||
realRet = getattr(self._realInventory, attr)
|
realRet = getattr(self._realInventory, attr)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
|
@ -40,6 +38,10 @@ class Inventory:
|
||||||
else:
|
else:
|
||||||
return realRet
|
return realRet
|
||||||
|
|
||||||
|
def __contains__(self, key):
|
||||||
|
self.numberOfInventoryLookupsPerformed += 1
|
||||||
|
return key in self._realInventory
|
||||||
|
|
||||||
# hint for pylint: this is dictionary like object
|
# hint for pylint: this is dictionary like object
|
||||||
def __getitem__(self, key):
|
def __getitem__(self, key):
|
||||||
return self._realInventory[key]
|
return self._realInventory[key]
|
||||||
|
|
|
@ -9,6 +9,7 @@ import re
|
||||||
import socket
|
import socket
|
||||||
import struct
|
import struct
|
||||||
import time
|
import time
|
||||||
|
from binascii import hexlify
|
||||||
|
|
||||||
# magic imports!
|
# magic imports!
|
||||||
import addresses
|
import addresses
|
||||||
|
@ -110,15 +111,16 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
b"error", b"version", b"verack"):
|
b"error", b"version", b"verack"):
|
||||||
logger.error(
|
logger.error(
|
||||||
'Received command %s before connection was fully'
|
'Received command %s before connection was fully'
|
||||||
' established, ignoring', self.command.decode())
|
' established, ignoring', self.command.decode('ascii', 'backslashreplace'))
|
||||||
self.invalid = True
|
self.invalid = True
|
||||||
if not self.invalid:
|
if not self.invalid:
|
||||||
try:
|
try:
|
||||||
retval = getattr(
|
retval = getattr(
|
||||||
self, "bm_command_" + self.command.decode().lower())()
|
self, "bm_command_" + self.command.decode('ascii', 'backslashreplace').lower())()
|
||||||
except AttributeError:
|
except AttributeError as err:
|
||||||
|
logger.debug('command = {}, err = {}'.format(self.command, err))
|
||||||
# unimplemented command
|
# unimplemented command
|
||||||
logger.debug('unimplemented command %s', self.command.decode())
|
logger.debug('unimplemented command %s', self.command.decode('ascii', 'backslashreplace'))
|
||||||
except BMProtoInsufficientDataError:
|
except BMProtoInsufficientDataError:
|
||||||
logger.debug('packet length too short, skipping')
|
logger.debug('packet length too short, skipping')
|
||||||
except BMProtoExcessiveDataError:
|
except BMProtoExcessiveDataError:
|
||||||
|
@ -141,8 +143,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
# broken read, ignore
|
# broken read, ignore
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
logger.debug('Closing due to invalid command %s', self.command.decode())
|
logger.debug('Closing due to invalid command %s', self.command.decode('ascii', 'backslashreplace'))
|
||||||
self.close_reason = "Invalid command %s" % self.command.decode()
|
self.close_reason = "Invalid command %s" % self.command.decode('ascii', 'backslashreplace')
|
||||||
self.set_state("close")
|
self.set_state("close")
|
||||||
return False
|
return False
|
||||||
if retval:
|
if retval:
|
||||||
|
@ -353,7 +355,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
if dandelion and not state.dandelion_enabled:
|
if dandelion and not state.dandelion_enabled:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
for i in map(str, items):
|
for i in items:
|
||||||
if i in state.Inventory and not state.Dandelion.hasHash(i):
|
if i in state.Inventory and not state.Dandelion.hasHash(i):
|
||||||
continue
|
continue
|
||||||
if dandelion and not state.Dandelion.hasHash(i):
|
if dandelion and not state.Dandelion.hasHash(i):
|
||||||
|
@ -410,12 +412,13 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
try:
|
try:
|
||||||
self.object.checkObjectByType()
|
self.object.checkObjectByType()
|
||||||
objectProcessorQueue.put((
|
objectProcessorQueue.put((
|
||||||
self.object.objectType, buffer(self.object.data))) # noqa: F821
|
self.object.objectType, memoryview(self.object.data))) # noqa: F821
|
||||||
except BMObjectInvalidError:
|
except BMObjectInvalidError:
|
||||||
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
|
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
del missingObjects[self.object.inventoryHash]
|
hex_hash = hexlify(self.object.inventoryHash).decode('ascii')
|
||||||
|
del missingObjects[hex_hash]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -426,8 +429,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
|
|
||||||
state.Inventory[self.object.inventoryHash] = (
|
state.Inventory[self.object.inventoryHash] = (
|
||||||
self.object.objectType, self.object.streamNumber,
|
self.object.objectType, self.object.streamNumber,
|
||||||
buffer(self.payload[objectOffset:]), self.object.expiresTime, # noqa: F821
|
memoryview(self.payload[objectOffset:]), self.object.expiresTime, # noqa: F821
|
||||||
buffer(self.object.tag) # noqa: F821
|
memoryview(self.object.tag) # noqa: F821
|
||||||
)
|
)
|
||||||
self.handleReceivedObject(
|
self.handleReceivedObject(
|
||||||
self.object.streamNumber, self.object.inventoryHash)
|
self.object.streamNumber, self.object.inventoryHash)
|
||||||
|
@ -448,7 +451,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
try:
|
try:
|
||||||
if (
|
if (
|
||||||
# FIXME: should check against complete list
|
# FIXME: should check against complete list
|
||||||
ip.decode().startswith('bootstrap')
|
ip.decode('ascii', 'backslashreplace').startswith('bootstrap')
|
||||||
):
|
):
|
||||||
continue
|
continue
|
||||||
except UnicodeDecodeError:
|
except UnicodeDecodeError:
|
||||||
|
@ -529,7 +532,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
'remote node incoming address: %s:%i',
|
'remote node incoming address: %s:%i',
|
||||||
self.destination.host, self.peerNode.port)
|
self.destination.host, self.peerNode.port)
|
||||||
logger.debug('user agent: %s', self.userAgent)
|
logger.debug('user agent: %s', self.userAgent.decode('utf-8', 'backslashreplace'))
|
||||||
logger.debug('streams: [%s]', ','.join(map(str, self.streams)))
|
logger.debug('streams: [%s]', ','.join(map(str, self.streams)))
|
||||||
if not self.peerValidityChecks():
|
if not self.peerValidityChecks():
|
||||||
# ABORT afterwards
|
# ABORT afterwards
|
||||||
|
@ -537,7 +540,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
self.append_write_buf(protocol.CreatePacket(b'verack'))
|
self.append_write_buf(protocol.CreatePacket(b'verack'))
|
||||||
self.verackSent = True
|
self.verackSent = True
|
||||||
ua_valid = re.match(
|
ua_valid = re.match(
|
||||||
r'^/[a-zA-Z]+:[0-9]+\.?[\w\s\(\)\./:;-]*/$', self.userAgent.decode())
|
r'^/[a-zA-Z]+:[0-9]+\.?[\w\s\(\)\./:;-]*/$', self.userAgent.decode('utf-8', 'backslashreplace'))
|
||||||
if not ua_valid:
|
if not ua_valid:
|
||||||
self.userAgent = b'/INVALID:0/'
|
self.userAgent = b'/INVALID:0/'
|
||||||
if not self.isOutbound:
|
if not self.isOutbound:
|
||||||
|
@ -656,7 +659,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
try:
|
try:
|
||||||
del missingObjects[hashId]
|
hex_hash = hexlify(hashId).decode('ascii')
|
||||||
|
del missingObjects[hex_hash]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ from collections import namedtuple
|
||||||
from random import choice, expovariate, sample
|
from random import choice, expovariate, sample
|
||||||
from threading import RLock
|
from threading import RLock
|
||||||
from time import time
|
from time import time
|
||||||
|
from binascii import hexlify
|
||||||
|
|
||||||
import network.connectionpool as connectionpool
|
import network.connectionpool as connectionpool
|
||||||
import state
|
import state
|
||||||
|
@ -52,7 +53,8 @@ class Dandelion: # pylint: disable=old-style-class
|
||||||
if not state.dandelion_enabled:
|
if not state.dandelion_enabled:
|
||||||
return
|
return
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.hashMap[hashId] = Stem(
|
hex_hash = hexlify(hashId).decode('ascii')
|
||||||
|
self.hashMap[hex_hash] = Stem(
|
||||||
self.getNodeStem(source),
|
self.getNodeStem(source),
|
||||||
stream,
|
stream,
|
||||||
self.poissonTimeout())
|
self.poissonTimeout())
|
||||||
|
@ -63,9 +65,10 @@ class Dandelion: # pylint: disable=old-style-class
|
||||||
include streams, we only learn this after receiving the object)
|
include streams, we only learn this after receiving the object)
|
||||||
"""
|
"""
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if hashId in self.hashMap:
|
hex_hash = hexlify(hashId).decode('ascii')
|
||||||
self.hashMap[hashId] = Stem(
|
if hex_hash in self.hashMap:
|
||||||
self.hashMap[hashId].child,
|
self.hashMap[hex_hash] = Stem(
|
||||||
|
self.hashMap[hex_hash].child,
|
||||||
stream,
|
stream,
|
||||||
self.poissonTimeout())
|
self.poissonTimeout())
|
||||||
|
|
||||||
|
@ -77,17 +80,20 @@ class Dandelion: # pylint: disable=old-style-class
|
||||||
''.join('%02x' % ord(i) for i in hashId), reason)
|
''.join('%02x' % ord(i) for i in hashId), reason)
|
||||||
with self.lock:
|
with self.lock:
|
||||||
try:
|
try:
|
||||||
del self.hashMap[hashId]
|
hex_hash = hexlify(hashId).decode('ascii')
|
||||||
|
del self.hashMap[hex_hash]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def hasHash(self, hashId):
|
def hasHash(self, hashId):
|
||||||
"""Is inventory vector in stem mode?"""
|
"""Is inventory vector in stem mode?"""
|
||||||
return hashId in self.hashMap
|
hex_hash = hexlify(hashId).decode('ascii')
|
||||||
|
return hex_hash in self.hashMap
|
||||||
|
|
||||||
def objectChildStem(self, hashId):
|
def objectChildStem(self, hashId):
|
||||||
"""Child (i.e. next) node for an inventory vector during stem mode"""
|
"""Child (i.e. next) node for an inventory vector during stem mode"""
|
||||||
return self.hashMap[hashId].child
|
hex_hash = hexlify(hashId).decode('ascii')
|
||||||
|
return self.hashMap[hex_hash].child
|
||||||
|
|
||||||
def maybeAddStem(self, connection):
|
def maybeAddStem(self, connection):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -9,6 +9,7 @@ import protocol
|
||||||
import network.connectionpool as connectionpool
|
import network.connectionpool as connectionpool
|
||||||
from .objectracker import missingObjects
|
from .objectracker import missingObjects
|
||||||
from .threads import StoppableThread
|
from .threads import StoppableThread
|
||||||
|
from binascii import hexlify
|
||||||
|
|
||||||
|
|
||||||
class DownloadThread(StoppableThread):
|
class DownloadThread(StoppableThread):
|
||||||
|
@ -67,7 +68,8 @@ class DownloadThread(StoppableThread):
|
||||||
continue
|
continue
|
||||||
payload.extend(chunk)
|
payload.extend(chunk)
|
||||||
chunkCount += 1
|
chunkCount += 1
|
||||||
missingObjects[chunk] = now
|
hex_chunk = hexlify(chunk).decode('ascii')
|
||||||
|
missingObjects[hex_chunk] = now
|
||||||
if not chunkCount:
|
if not chunkCount:
|
||||||
continue
|
continue
|
||||||
payload[0:0] = addresses.encodeVarint(chunkCount)
|
payload[0:0] = addresses.encodeVarint(chunkCount)
|
||||||
|
|
|
@ -109,7 +109,11 @@ def addKnownNode(stream, peer, lastseen=None, is_self=False):
|
||||||
"""
|
"""
|
||||||
# pylint: disable=too-many-branches
|
# pylint: disable=too-many-branches
|
||||||
if not isinstance(peer.host, str):
|
if not isinstance(peer.host, str):
|
||||||
peer = Peer(peer.host.decode(), peer.port)
|
try:
|
||||||
|
peer = Peer(peer.host.decode('ascii'), peer.port)
|
||||||
|
except UnicodeDecodeError as err:
|
||||||
|
logger.warning("Invalid host: {}".format(peer.host.decode('ascii', 'backslashreplace')))
|
||||||
|
return
|
||||||
if isinstance(stream, Iterable):
|
if isinstance(stream, Iterable):
|
||||||
with knownNodesLock:
|
with knownNodesLock:
|
||||||
for s in stream:
|
for s in stream:
|
||||||
|
|
|
@ -3,6 +3,7 @@ Module for tracking objects
|
||||||
"""
|
"""
|
||||||
import time
|
import time
|
||||||
from threading import RLock
|
from threading import RLock
|
||||||
|
from binascii import hexlify
|
||||||
|
|
||||||
import state
|
import state
|
||||||
import network.connectionpool as connectionpool
|
import network.connectionpool as connectionpool
|
||||||
|
@ -87,19 +88,21 @@ class ObjectTracker(object):
|
||||||
|
|
||||||
def handleReceivedInventory(self, hashId):
|
def handleReceivedInventory(self, hashId):
|
||||||
"""Handling received inventory"""
|
"""Handling received inventory"""
|
||||||
|
hex_hash = hexlify(hashId).decode('ascii')
|
||||||
if haveBloom:
|
if haveBloom:
|
||||||
self.invBloom.add(hashId)
|
self.invBloom.add(hex_hash)
|
||||||
try:
|
try:
|
||||||
with self.objectsNewToThemLock:
|
with self.objectsNewToThemLock:
|
||||||
del self.objectsNewToThem[hashId]
|
del self.objectsNewToThem[hex_hash]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
if hashId not in missingObjects:
|
if hex_hash not in missingObjects:
|
||||||
missingObjects[hashId] = time.time()
|
missingObjects[hex_hash] = time.time()
|
||||||
self.objectsNewToMe[hashId] = True
|
self.objectsNewToMe[hashId] = True
|
||||||
|
|
||||||
def handleReceivedObject(self, streamNumber, hashid):
|
def handleReceivedObject(self, streamNumber, hashid):
|
||||||
"""Handling received object"""
|
"""Handling received object"""
|
||||||
|
hex_hash = hexlify(hashid).decode('ascii')
|
||||||
for i in connectionpool.pool.connections():
|
for i in connectionpool.pool.connections():
|
||||||
if not i.fullyEstablished:
|
if not i.fullyEstablished:
|
||||||
continue
|
continue
|
||||||
|
@ -110,7 +113,7 @@ class ObjectTracker(object):
|
||||||
not state.Dandelion.hasHash(hashid)
|
not state.Dandelion.hasHash(hashid)
|
||||||
or state.Dandelion.objectChildStem(hashid) == i):
|
or state.Dandelion.objectChildStem(hashid) == i):
|
||||||
with i.objectsNewToThemLock:
|
with i.objectsNewToThemLock:
|
||||||
i.objectsNewToThem[hashid] = time.time()
|
i.objectsNewToThem[hex_hash] = time.time()
|
||||||
# update stream number,
|
# update stream number,
|
||||||
# which we didn't have when we just received the dinv
|
# which we didn't have when we just received the dinv
|
||||||
# also resets expiration of the stem mode
|
# also resets expiration of the stem mode
|
||||||
|
@ -119,7 +122,7 @@ class ObjectTracker(object):
|
||||||
if i == self:
|
if i == self:
|
||||||
try:
|
try:
|
||||||
with i.objectsNewToThemLock:
|
with i.objectsNewToThemLock:
|
||||||
del i.objectsNewToThem[hashid]
|
del i.objectsNewToThem[hex_hash]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
self.objectsNewToMe.setLastObject()
|
self.objectsNewToMe.setLastObject()
|
||||||
|
|
|
@ -185,7 +185,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
||||||
return s.endswith(tail)
|
return s.endswith(tail)
|
||||||
except:
|
except:
|
||||||
try:
|
try:
|
||||||
return s.decode().endswith(tail)
|
return s.decode('ascii').endswith(tail)
|
||||||
except UnicodeDecodeError:
|
except UnicodeDecodeError:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
|
@ -487,7 +487,8 @@ def decryptAndCheckPubkeyPayload(data, address):
|
||||||
encryptedData = data[readPosition:]
|
encryptedData = data[readPosition:]
|
||||||
|
|
||||||
# Let us try to decrypt the pubkey
|
# Let us try to decrypt the pubkey
|
||||||
toAddress, cryptorObject = state.neededPubkeys[tag]
|
hex_tag = 'tag-' + hexlify(tag).decode('ascii')
|
||||||
|
toAddress, cryptorObject = state.neededPubkeys[hex_tag]
|
||||||
if toAddress != address:
|
if toAddress != address:
|
||||||
logger.critical(
|
logger.critical(
|
||||||
'decryptAndCheckPubkeyPayload failed due to toAddress'
|
'decryptAndCheckPubkeyPayload failed due to toAddress'
|
||||||
|
|
|
@ -3,6 +3,7 @@ Track randomize ordered dict
|
||||||
"""
|
"""
|
||||||
from threading import RLock
|
from threading import RLock
|
||||||
from time import time
|
from time import time
|
||||||
|
from binascii import hexlify
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import helper_random
|
import helper_random
|
||||||
|
@ -38,10 +39,12 @@ class RandomTrackingDict(object):
|
||||||
return self.len
|
return self.len
|
||||||
|
|
||||||
def __contains__(self, key):
|
def __contains__(self, key):
|
||||||
return key in self.dictionary
|
hex_key = hexlify(key).decode('ascii')
|
||||||
|
return hex_key in self.dictionary
|
||||||
|
|
||||||
def __getitem__(self, key):
|
def __getitem__(self, key):
|
||||||
return self.dictionary[key][1]
|
hex_key = hexlify(key).decode('ascii')
|
||||||
|
return self.dictionary[hex_key][1]
|
||||||
|
|
||||||
def _swap(self, i1, i2):
|
def _swap(self, i1, i2):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
|
@ -49,26 +52,30 @@ class RandomTrackingDict(object):
|
||||||
key2 = self.indexDict[i2]
|
key2 = self.indexDict[i2]
|
||||||
self.indexDict[i1] = key2
|
self.indexDict[i1] = key2
|
||||||
self.indexDict[i2] = key1
|
self.indexDict[i2] = key1
|
||||||
self.dictionary[key1][0] = i2
|
hex_key1 = hexlify(key1).decode('ascii')
|
||||||
self.dictionary[key2][0] = i1
|
hex_key2 = hexlify(key2).decode('ascii')
|
||||||
|
self.dictionary[hex_key1][0] = i2
|
||||||
|
self.dictionary[hex_key2][0] = i1
|
||||||
# for quick reassignment
|
# for quick reassignment
|
||||||
return i2
|
return i2
|
||||||
|
|
||||||
def __setitem__(self, key, value):
|
def __setitem__(self, key, value):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if key in self.dictionary:
|
hex_key = hexlify(key).decode('ascii')
|
||||||
self.dictionary[key][1] = value
|
if hex_key in self.dictionary:
|
||||||
|
self.dictionary[hex_key][1] = value
|
||||||
else:
|
else:
|
||||||
self.indexDict.append(key)
|
self.indexDict.append(key)
|
||||||
self.dictionary[key] = [self.len, value]
|
self.dictionary[hex_key] = [self.len, value]
|
||||||
self._swap(self.len, self.len - self.pendingLen)
|
self._swap(self.len, self.len - self.pendingLen)
|
||||||
self.len += 1
|
self.len += 1
|
||||||
|
|
||||||
def __delitem__(self, key):
|
def __delitem__(self, key):
|
||||||
if key not in self.dictionary:
|
hex_key = hexlify(key).decode('ascii')
|
||||||
|
if hex_key not in self.dictionary:
|
||||||
raise KeyError
|
raise KeyError
|
||||||
with self.lock:
|
with self.lock:
|
||||||
index = self.dictionary[key][0]
|
index = self.dictionary[hex_key][0]
|
||||||
# not pending
|
# not pending
|
||||||
if index < self.len - self.pendingLen:
|
if index < self.len - self.pendingLen:
|
||||||
# left of pending part
|
# left of pending part
|
||||||
|
@ -82,7 +89,7 @@ class RandomTrackingDict(object):
|
||||||
# operation can improve 4x, but it's already very fast so we'll
|
# operation can improve 4x, but it's already very fast so we'll
|
||||||
# ignore it for the time being
|
# ignore it for the time being
|
||||||
del self.indexDict[-1]
|
del self.indexDict[-1]
|
||||||
del self.dictionary[key]
|
del self.dictionary[hex_key]
|
||||||
self.len -= 1
|
self.len -= 1
|
||||||
|
|
||||||
def setMaxPending(self, maxPending):
|
def setMaxPending(self, maxPending):
|
||||||
|
|
|
@ -114,11 +114,13 @@ def reloadMyAddressHashes():
|
||||||
if len(privEncryptionKey) == 64:
|
if len(privEncryptionKey) == 64:
|
||||||
myECCryptorObjects[hashobj] = \
|
myECCryptorObjects[hashobj] = \
|
||||||
highlevelcrypto.makeCryptor(privEncryptionKey)
|
highlevelcrypto.makeCryptor(privEncryptionKey)
|
||||||
myAddressesByHash[hashobj] = addressInKeysFile
|
hex_hash = hexlify(hashobj).decode('ascii')
|
||||||
|
myAddressesByHash[hex_hash] = addressInKeysFile
|
||||||
tag = highlevelcrypto.double_sha512(
|
tag = highlevelcrypto.double_sha512(
|
||||||
encodeVarint(addressVersionNumber)
|
encodeVarint(addressVersionNumber)
|
||||||
+ encodeVarint(streamNumber) + hashobj)[32:]
|
+ encodeVarint(streamNumber) + hashobj)[32:]
|
||||||
myAddressesByTag[tag] = addressInKeysFile
|
hex_tag = hexlify(tag).decode('ascii')
|
||||||
|
myAddressesByTag[hex_tag] = addressInKeysFile
|
||||||
|
|
||||||
if not keyfileSecure:
|
if not keyfileSecure:
|
||||||
fixSensitiveFilePermissions(os.path.join(
|
fixSensitiveFilePermissions(os.path.join(
|
||||||
|
|
|
@ -70,7 +70,7 @@ class FilesystemInventory(InventoryStorage):
|
||||||
os.makedirs(os.path.join(
|
os.makedirs(os.path.join(
|
||||||
self.baseDir,
|
self.baseDir,
|
||||||
FilesystemInventory.objectDir,
|
FilesystemInventory.objectDir,
|
||||||
hexlify(hashval).decode()))
|
hexlify(hashval).decode('ascii')))
|
||||||
except OSError:
|
except OSError:
|
||||||
pass
|
pass
|
||||||
try:
|
try:
|
||||||
|
@ -78,7 +78,7 @@ class FilesystemInventory(InventoryStorage):
|
||||||
os.path.join(
|
os.path.join(
|
||||||
self.baseDir,
|
self.baseDir,
|
||||||
FilesystemInventory.objectDir,
|
FilesystemInventory.objectDir,
|
||||||
hexlify(hashval).decode(),
|
hexlify(hashval).decode('ascii'),
|
||||||
FilesystemInventory.metadataFilename,
|
FilesystemInventory.metadataFilename,
|
||||||
),
|
),
|
||||||
"w",
|
"w",
|
||||||
|
@ -87,12 +87,12 @@ class FilesystemInventory(InventoryStorage):
|
||||||
value.type,
|
value.type,
|
||||||
value.stream,
|
value.stream,
|
||||||
value.expires,
|
value.expires,
|
||||||
hexlify(value.tag).decode()))
|
hexlify(value.tag).decode('ascii')))
|
||||||
with open(
|
with open(
|
||||||
os.path.join(
|
os.path.join(
|
||||||
self.baseDir,
|
self.baseDir,
|
||||||
FilesystemInventory.objectDir,
|
FilesystemInventory.objectDir,
|
||||||
hexlify(hashval).decode(),
|
hexlify(hashval).decode('ascii'),
|
||||||
FilesystemInventory.dataFilename,
|
FilesystemInventory.dataFilename,
|
||||||
),
|
),
|
||||||
"wb",
|
"wb",
|
||||||
|
@ -119,7 +119,7 @@ class FilesystemInventory(InventoryStorage):
|
||||||
os.path.join(
|
os.path.join(
|
||||||
self.baseDir,
|
self.baseDir,
|
||||||
FilesystemInventory.objectDir,
|
FilesystemInventory.objectDir,
|
||||||
hexlify(hashval).decode(),
|
hexlify(hashval).decode('ascii'),
|
||||||
FilesystemInventory.metadataFilename))
|
FilesystemInventory.metadataFilename))
|
||||||
except IOError:
|
except IOError:
|
||||||
pass
|
pass
|
||||||
|
@ -128,7 +128,7 @@ class FilesystemInventory(InventoryStorage):
|
||||||
os.path.join(
|
os.path.join(
|
||||||
self.baseDir,
|
self.baseDir,
|
||||||
FilesystemInventory.objectDir,
|
FilesystemInventory.objectDir,
|
||||||
hexlify(hashval).decode(),
|
hexlify(hashval).decode('ascii'),
|
||||||
FilesystemInventory.dataFilename))
|
FilesystemInventory.dataFilename))
|
||||||
except IOError:
|
except IOError:
|
||||||
pass
|
pass
|
||||||
|
@ -136,7 +136,7 @@ class FilesystemInventory(InventoryStorage):
|
||||||
os.rmdir(os.path.join(
|
os.rmdir(os.path.join(
|
||||||
self.baseDir,
|
self.baseDir,
|
||||||
FilesystemInventory.objectDir,
|
FilesystemInventory.objectDir,
|
||||||
hexlify(hashval).decode()))
|
hexlify(hashval).decode('ascii')))
|
||||||
except IOError:
|
except IOError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -186,7 +186,7 @@ class FilesystemInventory(InventoryStorage):
|
||||||
os.path.join(
|
os.path.join(
|
||||||
self.baseDir,
|
self.baseDir,
|
||||||
FilesystemInventory.objectDir,
|
FilesystemInventory.objectDir,
|
||||||
hexlify(hashId).decode(),
|
hexlify(hashId).decode('ascii'),
|
||||||
FilesystemInventory.dataFilename,
|
FilesystemInventory.dataFilename,
|
||||||
),
|
),
|
||||||
"r",
|
"r",
|
||||||
|
@ -202,7 +202,7 @@ class FilesystemInventory(InventoryStorage):
|
||||||
os.path.join(
|
os.path.join(
|
||||||
self.baseDir,
|
self.baseDir,
|
||||||
FilesystemInventory.objectDir,
|
FilesystemInventory.objectDir,
|
||||||
hexlify(hashId).decode(),
|
hexlify(hashId).decode('ascii'),
|
||||||
FilesystemInventory.metadataFilename,
|
FilesystemInventory.metadataFilename,
|
||||||
),
|
),
|
||||||
"r",
|
"r",
|
||||||
|
|
|
@ -4,6 +4,7 @@ Sqlite Inventory
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import time
|
import time
|
||||||
from threading import RLock
|
from threading import RLock
|
||||||
|
from binascii import hexlify, unhexlify
|
||||||
|
|
||||||
from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery
|
from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery
|
||||||
from .storage import InventoryItem, InventoryStorage
|
from .storage import InventoryItem, InventoryStorage
|
||||||
|
@ -29,23 +30,24 @@ class SqliteInventory(InventoryStorage):
|
||||||
|
|
||||||
def __contains__(self, hash_):
|
def __contains__(self, hash_):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if hash_ in self._objects:
|
hex_hash = hexlify(hash_).decode('ascii')
|
||||||
|
if hex_hash in self._objects:
|
||||||
return True
|
return True
|
||||||
rows = sqlQuery(
|
rows = sqlQuery(
|
||||||
'SELECT streamnumber FROM inventory WHERE hash=?',
|
'SELECT streamnumber FROM inventory WHERE hash=?', hash_)
|
||||||
sqlite3.Binary(hash_.encode()))
|
|
||||||
if not rows:
|
if not rows:
|
||||||
return False
|
return False
|
||||||
self._objects[hash_] = rows[0][0]
|
self._objects[hex_hash] = rows[0][0]
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def __getitem__(self, hash_):
|
def __getitem__(self, hash_):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if hash_ in self._inventory:
|
hex_hash = hexlify(hash_).decode('ascii')
|
||||||
return self._inventory[hash_]
|
if hex_hash in self._inventory:
|
||||||
|
return self._inventory[hex_hash]
|
||||||
rows = sqlQuery(
|
rows = sqlQuery(
|
||||||
b'SELECT objecttype, streamnumber, payload, expirestime, tag'
|
'SELECT objecttype, streamnumber, payload, expirestime, tag'
|
||||||
b' FROM inventory WHERE hash=?', sqlite3.Binary(hash_.encode()))
|
' FROM inventory WHERE hash=?', hash_)
|
||||||
if not rows:
|
if not rows:
|
||||||
raise KeyError(hash_)
|
raise KeyError(hash_)
|
||||||
return InventoryItem(*rows[0])
|
return InventoryItem(*rows[0])
|
||||||
|
@ -53,16 +55,17 @@ class SqliteInventory(InventoryStorage):
|
||||||
def __setitem__(self, hash_, value):
|
def __setitem__(self, hash_, value):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
value = InventoryItem(*value)
|
value = InventoryItem(*value)
|
||||||
self._inventory[hash_] = value
|
hex_hash = hexlify(hash_).decode('ascii')
|
||||||
self._objects[hash_] = value.stream
|
self._inventory[hex_hash] = value
|
||||||
|
self._objects[hex_hash] = value.stream
|
||||||
|
|
||||||
def __delitem__(self, hash_):
|
def __delitem__(self, hash_):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
hashes = self._inventory.keys()[:]
|
hashes = map(unhexlify, self._inventory.keys()[:])
|
||||||
hashes += (x for x, in sqlQuery('SELECT hash FROM inventory'))
|
hashes += (unhexlify(x) for x, in sqlQuery('SELECT hash FROM inventory'))
|
||||||
return hashes.__iter__()
|
return hashes.__iter__()
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
|
@ -80,7 +83,7 @@ class SqliteInventory(InventoryStorage):
|
||||||
' FROM inventory WHERE objecttype=?', objectType]
|
' FROM inventory WHERE objecttype=?', objectType]
|
||||||
if tag:
|
if tag:
|
||||||
query[0] += ' AND tag=?'
|
query[0] += ' AND tag=?'
|
||||||
query.append(sqlite3.Binary(tag))
|
query.append(tag)
|
||||||
with self.lock:
|
with self.lock:
|
||||||
values = [
|
values = [
|
||||||
value for value in self._inventory.values()
|
value for value in self._inventory.values()
|
||||||
|
@ -93,9 +96,9 @@ class SqliteInventory(InventoryStorage):
|
||||||
"""Return unexpired inventory vectors filtered by stream"""
|
"""Return unexpired inventory vectors filtered by stream"""
|
||||||
with self.lock:
|
with self.lock:
|
||||||
t = int(time.time())
|
t = int(time.time())
|
||||||
hashes = [x for x, value in self._inventory.items()
|
hashes = [unhexlify(x) for x, value in self._inventory.items()
|
||||||
if value.stream == stream and value.expires > t]
|
if value.stream == stream and value.expires > t]
|
||||||
hashes += (str(payload) for payload, in sqlQuery(
|
hashes += (payload for payload, in sqlQuery(
|
||||||
'SELECT hash FROM inventory WHERE streamnumber=?'
|
'SELECT hash FROM inventory WHERE streamnumber=?'
|
||||||
' AND expirestime>?', stream, t))
|
' AND expirestime>?', stream, t))
|
||||||
return hashes
|
return hashes
|
||||||
|
@ -109,7 +112,7 @@ class SqliteInventory(InventoryStorage):
|
||||||
for objectHash, value in self._inventory.items():
|
for objectHash, value in self._inventory.items():
|
||||||
sql.execute(
|
sql.execute(
|
||||||
'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
|
'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
|
||||||
sqlite3.Binary(objectHash), *value)
|
unhexlify(objectHash), *value)
|
||||||
self._inventory.clear()
|
self._inventory.clear()
|
||||||
|
|
||||||
def clean(self):
|
def clean(self):
|
||||||
|
|
|
@ -82,12 +82,12 @@ class TestAPIThread(TestPartialRun):
|
||||||
proofofwork.init()
|
proofofwork.init()
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
unhexlify(self.api.disseminatePreparedObject(
|
unhexlify(self.api.disseminatePreparedObject(
|
||||||
hexlify(sample_object_data).decode())),
|
hexlify(sample_object_data).decode('ascii'))),
|
||||||
calculateInventoryHash(sample_object_data))
|
calculateInventoryHash(sample_object_data))
|
||||||
update_object = b'\x00' * 8 + pack(
|
update_object = b'\x00' * 8 + pack(
|
||||||
'>Q', int(time.time() + 7200)) + sample_object_data[16:]
|
'>Q', int(time.time() + 7200)) + sample_object_data[16:]
|
||||||
invhash = unhexlify(self.api.disseminatePreEncryptedMsg(
|
invhash = unhexlify(self.api.disseminatePreEncryptedMsg(
|
||||||
hexlify(update_object).decode()
|
hexlify(update_object).decode('ascii')
|
||||||
))
|
))
|
||||||
obj_type, obj_stream, obj_data = state.Inventory[invhash][:3]
|
obj_type, obj_stream, obj_data = state.Inventory[invhash][:3]
|
||||||
self.assertEqual(obj_type, 42)
|
self.assertEqual(obj_type, 42)
|
||||||
|
|
Reference in New Issue
Block a user