From 741f8dd4617eeed76f2d59172d25ed32613f9bb6 Mon Sep 17 00:00:00 2001 From: Kashiko Koibumi Date: Tue, 14 May 2024 23:28:16 +0900 Subject: [PATCH] fix some parts of inventory syncing Remained works: * Objects are not saved. * Decryption does not work. * Can not shutdown. --- src/api.py | 2 +- src/bitmessageqt/__init__.py | 2 +- src/bitmessageqt/networkstatus.py | 2 +- src/class_addressGenerator.py | 14 ++++++------ src/class_objectProcessor.py | 31 +++++++++++++++----------- src/class_singleWorker.py | 36 +++++++++++++++++++------------ src/inventory.py | 6 ++++-- src/network/bmproto.py | 34 ++++++++++++++++------------- src/network/dandelion.py | 20 +++++++++++------ src/network/downloadthread.py | 4 +++- src/network/knownnodes.py | 6 +++++- src/network/objectracker.py | 15 +++++++------ src/network/tcp.py | 2 +- src/protocol.py | 3 ++- src/randomtrackingdict.py | 27 ++++++++++++++--------- src/shared.py | 6 ++++-- src/storage/filesystem.py | 18 ++++++++-------- src/storage/sqlite.py | 35 ++++++++++++++++-------------- src/tests/test_api_thread.py | 4 ++-- 19 files changed, 158 insertions(+), 109 deletions(-) diff --git a/src/api.py b/src/api.py index a4445569..656564c1 100644 --- a/src/api.py +++ b/src/api.py @@ -1347,7 +1347,7 @@ class BMRPCDispatcher(object): 'Broadcasting inv for msg(API disseminatePreEncryptedMsg' ' command): %s', hexlify(inventoryHash)) queues.invQueue.put((toStreamNumber, inventoryHash)) - return hexlify(inventoryHash).decode() + return hexlify(inventoryHash).decode('ascii') @command('trashSentMessageByAckData') def HandleTrashSentMessageByAckDAta(self, ackdata): diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index b7c19bfd..f612e48a 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -2683,7 +2683,7 @@ class MyForm(settingsmixin.SMainWindow): " %n object(s) to be downloaded. If you quit now," " it may cause delivery delays. Wait until the" " synchronisation finishes?", None, - QtCore.QCoreApplication.CodecForTr, pendingDownload() + pendingDownload() ), QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No | QtWidgets.QMessageBox.StandardButton.Cancel, QtWidgets.QMessageBox.StandardButton.Cancel) diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index e89e0fd4..ac84f524 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -164,7 +164,7 @@ class NetworkStatus(QtWidgets.QWidget, RetranslateMixin): ) self.tableWidgetConnectionCount.setItem( 0, 2, - QtWidgets.QTableWidgetItem("%s" % (c.userAgent.decode())) + QtWidgets.QTableWidgetItem("%s" % (c.userAgent.decode('utf-8', 'backslashreplace'))) ) self.tableWidgetConnectionCount.setItem( 0, 3, diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index 4e3112a5..58e33bad 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -172,10 +172,10 @@ class addressGenerator(StoppableThread): config.set(address, 'payloadlengthextrabytes', str( payloadLengthExtraBytes)) config.set( - address, 'privsigningkey', privSigningKeyWIF.decode()) + address, 'privsigningkey', privSigningKeyWIF.decode('ascii')) config.set( address, 'privencryptionkey', - privEncryptionKeyWIF.decode()) + privEncryptionKeyWIF.decode('ascii')) config.save() # The API and the join and create Chan functionality @@ -325,10 +325,10 @@ class addressGenerator(StoppableThread): str(payloadLengthExtraBytes)) config.set( address, 'privsigningkey', - privSigningKeyWIF.decode()) + privSigningKeyWIF.decode('ascii')) config.set( address, 'privencryptionkey', - privEncryptionKeyWIF.decode()) + privEncryptionKeyWIF.decode('ascii')) config.save() queues.UISignalQueue.put(( @@ -340,12 +340,14 @@ class addressGenerator(StoppableThread): shared.myECCryptorObjects[ripe] = \ highlevelcrypto.makeCryptor( hexlify(potentialPrivEncryptionKey)) - shared.myAddressesByHash[ripe] = address + hex_ripe = hexlify(ripe).decode('ascii') + shared.myAddressesByHash[hex_ripe] = address tag = highlevelcrypto.double_sha512( encodeVarint(addressVersionNumber) + encodeVarint(streamNumber) + ripe )[32:] - shared.myAddressesByTag[tag] = address + hex_tag = hexlify(tag).decode('ascii') + shared.myAddressesByTag[hex_tag] = address if addressVersionNumber == 3: # If this is a chan address, # the worker thread won't send out diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 50c23e2c..95c1f7e2 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -141,9 +141,10 @@ class objectProcessor(threading.Thread): # bypass nonce and time, retain object type/version/stream + body readPosition = 16 - if data[readPosition:] in state.ackdataForWhichImWatching: + hex_data = hexlify(data[readPosition:]).decode('ascii') + if hex_data in state.ackdataForWhichImWatching: logger.info('This object is an acknowledgement bound for me.') - del state.ackdataForWhichImWatching[data[readPosition:]] + del state.ackdataForWhichImWatching[hex_data] sqlExecute( "UPDATE sent SET status='ackreceived', lastactiontime=?" " WHERE ackdata=?", int(time.time()), data[readPosition:]) @@ -217,19 +218,20 @@ class objectProcessor(threading.Thread): 'the hash requested in this getpubkey request is: %s', hexlify(requestedHash)) # if this address hash is one of mine - if requestedHash in shared.myAddressesByHash: - myAddress = shared.myAddressesByHash[requestedHash] + hex_hash = hexlify(requestedHash).decode('ascii') + if hex_hash in shared.myAddressesByHash: + myAddress = shared.myAddressesByHash[hex_hash] elif requestedAddressVersionNumber >= 4: requestedTag = data[readPosition:readPosition + 32] if len(requestedTag) != 32: return logger.debug( 'The length of the requested tag is not 32 bytes.' ' Something is wrong. Ignoring.') + hex_tag = hexlify(requestedTag).decode('ascii') logger.debug( - 'the tag requested in this getpubkey request is: %s', - hexlify(requestedTag)) - if requestedTag in shared.myAddressesByTag: - myAddress = shared.myAddressesByTag[requestedTag] + 'the tag requested in this getpubkey request is: %s', hex_tag) + if hex_tag in shared.myAddressesByTag: + myAddress = shared.myAddressesByTag[hex_tag] if myAddress == '': logger.info('This getpubkey request is not for any of my keys.') @@ -419,12 +421,13 @@ class objectProcessor(threading.Thread): ' Sanity check failed.') tag = data[readPosition:readPosition + 32] - if tag not in state.neededPubkeys: + hex_tag = 'tag-' + hexlify(tag).decode('ascii') + if hex_tag not in state.neededPubkeys: return logger.info( 'We don\'t need this v4 pubkey. We didn\'t ask for it.') # Let us try to decrypt the pubkey - toAddress = state.neededPubkeys[tag][0] + toAddress = state.neededPubkeys[hex_tag][0] if protocol.decryptAndCheckPubkeyPayload(data, toAddress) == \ 'successful': # At this point we know that we have been waiting on this @@ -489,7 +492,8 @@ class objectProcessor(threading.Thread): # This is a message bound for me. # Look up my address based on the RIPE hash. - toAddress = shared.myAddressesByHash[toRipe] + hex_ripe = hexlify(toRipe).decode('ascii') + toAddress = shared.myAddressesByHash[hex_ripe] readPosition = 0 sendersAddressVersionNumber, sendersAddressVersionNumberLength = \ decodeVarint(decryptedData[readPosition:readPosition + 10]) @@ -1006,8 +1010,9 @@ class objectProcessor(threading.Thread): encodeVarint(addressVersion) + encodeVarint(streamNumber) + ripe )[32:] - if tag in state.neededPubkeys: - del state.neededPubkeys[tag] + hex_tag = 'tag-' + hexlify(tag).decode('ascii') + if hex_tag in state.neededPubkeys: + del state.neededPubkeys[hex_tag] self.sendMessages(address) @staticmethod diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index e7537f6b..6172aa1d 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -87,7 +87,8 @@ class singleWorker(StoppableThread): tag = doubleHashOfAddressData[32:] # We'll need this for when we receive a pubkey reply: # it will be encrypted and we'll need to decrypt it. - state.neededPubkeys[tag] = ( + hex_tag = 'tag-' + hexlify(tag).decode('ascii') + state.neededPubkeys[hex_tag] = ( toAddress, highlevelcrypto.makeCryptor( hexlify(privEncryptionKey)) @@ -99,19 +100,22 @@ class singleWorker(StoppableThread): for row in queryreturn: ackdata, = row self.logger.info('Watching for ackdata %s', hexlify(ackdata)) - state.ackdataForWhichImWatching[ackdata] = 0 + hex_ackdata = hexlify(ackdata).decode('ascii') + state.ackdataForWhichImWatching[hex_ackdata] = 0 # Fix legacy (headerless) watched ackdata to include header - for oldack in state.ackdataForWhichImWatching: + for hex_oldack in state.ackdataForWhichImWatching: + oldack = unhexlify(hex_oldack) if len(oldack) == 32: # attach legacy header, always constant (msg/1/1) - newack = '\x00\x00\x00\x02\x01\x01' + oldack - state.ackdataForWhichImWatching[newack] = 0 + newack = b'\x00\x00\x00\x02\x01\x01' + oldack + hex_newack = hexlify(newack).decode('ascii') + state.ackdataForWhichImWatching[hex_newack] = 0 sqlExecute( '''UPDATE sent SET ackdata=? WHERE ackdata=? AND folder = 'sent' ''', newack, oldack ) - del state.ackdataForWhichImWatching[oldack] + del state.ackdataForWhichImWatching[hex_oldack] # For the case if user deleted knownnodes # but is still having onionpeer objects in inventory @@ -516,8 +520,8 @@ class singleWorker(StoppableThread): inventoryHash = highlevelcrypto.calculateInventoryHash(payload) state.Inventory[inventoryHash] = ( - objectType, streamNumber, buffer(payload), # noqa: F821 - embeddedTime, buffer(tag) # noqa: F821 + objectType, streamNumber, memoryview(payload), # noqa: F821 + embeddedTime, memoryview(tag) # noqa: F821 ) self.logger.info( 'sending inv (within sendOnionPeerObj function) for object: %s', @@ -794,8 +798,9 @@ class singleWorker(StoppableThread): encodeVarint(toAddressVersionNumber) + encodeVarint(toStreamNumber) + toRipe )[32:] + hex_tag = 'tag-' + hexlify(toTag).decode('ascii') if toaddress in state.neededPubkeys or \ - toTag in state.neededPubkeys: + hex_tag in state.neededPubkeys: # We already sent a request for the pubkey sqlExecute( '''UPDATE sent SET status='awaitingpubkey', ''' @@ -836,7 +841,8 @@ class singleWorker(StoppableThread): privEncryptionKey = doubleHashOfToAddressData[:32] # The second half of the sha512 hash. tag = doubleHashOfToAddressData[32:] - state.neededPubkeys[tag] = ( + hex_tag = 'tag-' + hexlify(tag).decode('ascii') + state.neededPubkeys[hex_tag] = ( toaddress, highlevelcrypto.makeCryptor( hexlify(privEncryptionKey)) @@ -859,7 +865,7 @@ class singleWorker(StoppableThread): ''' status='doingpubkeypow') AND ''' ''' folder='sent' ''', toaddress) - del state.neededPubkeys[tag] + del state.neededPubkeys[hex_tag] break # else: # There was something wrong with this @@ -901,7 +907,8 @@ class singleWorker(StoppableThread): # if we aren't sending this to ourselves or a chan if not config.has_section(toaddress): - state.ackdataForWhichImWatching[ackdata] = 0 + hex_ackdata = hexlify(ackdata).decode('ascii') + state.ackdataForWhichImWatching[hex_ackdata] = 0 queues.UISignalQueue.put(( 'updateSentItemStatusByAckdata', ( ackdata, @@ -1412,10 +1419,11 @@ class singleWorker(StoppableThread): privEncryptionKey = doubleHashOfAddressData[:32] # Note that this is the second half of the sha512 hash. tag = doubleHashOfAddressData[32:] - if tag not in state.neededPubkeys: + hex_tag = 'tag-' + hexlify(tag).decode('ascii') + if hex_tag not in state.neededPubkeys: # We'll need this for when we receive a pubkey reply: # it will be encrypted and we'll need to decrypt it. - state.neededPubkeys[tag] = ( + state.neededPubkeys[hex_tag] = ( toAddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey)) ) diff --git a/src/inventory.py b/src/inventory.py index 5b739e84..8356262c 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -28,8 +28,6 @@ class Inventory: # cheap inheritance copied from asyncore def __getattr__(self, attr): - if attr == "__contains__": - self.numberOfInventoryLookupsPerformed += 1 try: realRet = getattr(self._realInventory, attr) except AttributeError: @@ -40,6 +38,10 @@ class Inventory: else: return realRet + def __contains__(self, key): + self.numberOfInventoryLookupsPerformed += 1 + return key in self._realInventory + # hint for pylint: this is dictionary like object def __getitem__(self, key): return self._realInventory[key] diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 00967d1a..980b3022 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -9,6 +9,7 @@ import re import socket import struct import time +from binascii import hexlify # magic imports! import addresses @@ -110,15 +111,16 @@ class BMProto(AdvancedDispatcher, ObjectTracker): b"error", b"version", b"verack"): logger.error( 'Received command %s before connection was fully' - ' established, ignoring', self.command.decode()) + ' established, ignoring', self.command.decode('ascii', 'backslashreplace')) self.invalid = True if not self.invalid: try: retval = getattr( - self, "bm_command_" + self.command.decode().lower())() - except AttributeError: + self, "bm_command_" + self.command.decode('ascii', 'backslashreplace').lower())() + except AttributeError as err: + logger.debug('command = {}, err = {}'.format(self.command, err)) # unimplemented command - logger.debug('unimplemented command %s', self.command.decode()) + logger.debug('unimplemented command %s', self.command.decode('ascii', 'backslashreplace')) except BMProtoInsufficientDataError: logger.debug('packet length too short, skipping') except BMProtoExcessiveDataError: @@ -141,8 +143,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # broken read, ignore pass else: - logger.debug('Closing due to invalid command %s', self.command.decode()) - self.close_reason = "Invalid command %s" % self.command.decode() + logger.debug('Closing due to invalid command %s', self.command.decode('ascii', 'backslashreplace')) + self.close_reason = "Invalid command %s" % self.command.decode('ascii', 'backslashreplace') self.set_state("close") return False if retval: @@ -353,7 +355,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): if dandelion and not state.dandelion_enabled: return True - for i in map(str, items): + for i in items: if i in state.Inventory and not state.Dandelion.hasHash(i): continue if dandelion and not state.Dandelion.hasHash(i): @@ -410,12 +412,13 @@ class BMProto(AdvancedDispatcher, ObjectTracker): try: self.object.checkObjectByType() objectProcessorQueue.put(( - self.object.objectType, buffer(self.object.data))) # noqa: F821 + self.object.objectType, memoryview(self.object.data))) # noqa: F821 except BMObjectInvalidError: BMProto.stopDownloadingObject(self.object.inventoryHash, True) else: try: - del missingObjects[self.object.inventoryHash] + hex_hash = hexlify(self.object.inventoryHash).decode('ascii') + del missingObjects[hex_hash] except KeyError: pass @@ -426,8 +429,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): state.Inventory[self.object.inventoryHash] = ( self.object.objectType, self.object.streamNumber, - buffer(self.payload[objectOffset:]), self.object.expiresTime, # noqa: F821 - buffer(self.object.tag) # noqa: F821 + memoryview(self.payload[objectOffset:]), self.object.expiresTime, # noqa: F821 + memoryview(self.object.tag) # noqa: F821 ) self.handleReceivedObject( self.object.streamNumber, self.object.inventoryHash) @@ -448,7 +451,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): try: if ( # FIXME: should check against complete list - ip.decode().startswith('bootstrap') + ip.decode('ascii', 'backslashreplace').startswith('bootstrap') ): continue except UnicodeDecodeError: @@ -529,7 +532,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): logger.debug( 'remote node incoming address: %s:%i', self.destination.host, self.peerNode.port) - logger.debug('user agent: %s', self.userAgent) + logger.debug('user agent: %s', self.userAgent.decode('utf-8', 'backslashreplace')) logger.debug('streams: [%s]', ','.join(map(str, self.streams))) if not self.peerValidityChecks(): # ABORT afterwards @@ -537,7 +540,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.append_write_buf(protocol.CreatePacket(b'verack')) self.verackSent = True ua_valid = re.match( - r'^/[a-zA-Z]+:[0-9]+\.?[\w\s\(\)\./:;-]*/$', self.userAgent.decode()) + r'^/[a-zA-Z]+:[0-9]+\.?[\w\s\(\)\./:;-]*/$', self.userAgent.decode('utf-8', 'backslashreplace')) if not ua_valid: self.userAgent = b'/INVALID:0/' if not self.isOutbound: @@ -656,7 +659,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): except KeyError: pass try: - del missingObjects[hashId] + hex_hash = hexlify(hashId).decode('ascii') + del missingObjects[hex_hash] except KeyError: pass diff --git a/src/network/dandelion.py b/src/network/dandelion.py index 02b34024..b5e6e495 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -6,6 +6,7 @@ from collections import namedtuple from random import choice, expovariate, sample from threading import RLock from time import time +from binascii import hexlify import network.connectionpool as connectionpool import state @@ -52,7 +53,8 @@ class Dandelion: # pylint: disable=old-style-class if not state.dandelion_enabled: return with self.lock: - self.hashMap[hashId] = Stem( + hex_hash = hexlify(hashId).decode('ascii') + self.hashMap[hex_hash] = Stem( self.getNodeStem(source), stream, self.poissonTimeout()) @@ -63,9 +65,10 @@ class Dandelion: # pylint: disable=old-style-class include streams, we only learn this after receiving the object) """ with self.lock: - if hashId in self.hashMap: - self.hashMap[hashId] = Stem( - self.hashMap[hashId].child, + hex_hash = hexlify(hashId).decode('ascii') + if hex_hash in self.hashMap: + self.hashMap[hex_hash] = Stem( + self.hashMap[hex_hash].child, stream, self.poissonTimeout()) @@ -77,17 +80,20 @@ class Dandelion: # pylint: disable=old-style-class ''.join('%02x' % ord(i) for i in hashId), reason) with self.lock: try: - del self.hashMap[hashId] + hex_hash = hexlify(hashId).decode('ascii') + del self.hashMap[hex_hash] except KeyError: pass def hasHash(self, hashId): """Is inventory vector in stem mode?""" - return hashId in self.hashMap + hex_hash = hexlify(hashId).decode('ascii') + return hex_hash in self.hashMap def objectChildStem(self, hashId): """Child (i.e. next) node for an inventory vector during stem mode""" - return self.hashMap[hashId].child + hex_hash = hexlify(hashId).decode('ascii') + return self.hashMap[hex_hash].child def maybeAddStem(self, connection): """ diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index a2e343f5..9900cc5a 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -9,6 +9,7 @@ import protocol import network.connectionpool as connectionpool from .objectracker import missingObjects from .threads import StoppableThread +from binascii import hexlify class DownloadThread(StoppableThread): @@ -67,7 +68,8 @@ class DownloadThread(StoppableThread): continue payload.extend(chunk) chunkCount += 1 - missingObjects[chunk] = now + hex_chunk = hexlify(chunk).decode('ascii') + missingObjects[hex_chunk] = now if not chunkCount: continue payload[0:0] = addresses.encodeVarint(chunkCount) diff --git a/src/network/knownnodes.py b/src/network/knownnodes.py index 7d214001..93ce0b9e 100644 --- a/src/network/knownnodes.py +++ b/src/network/knownnodes.py @@ -109,7 +109,11 @@ def addKnownNode(stream, peer, lastseen=None, is_self=False): """ # pylint: disable=too-many-branches if not isinstance(peer.host, str): - peer = Peer(peer.host.decode(), peer.port) + try: + peer = Peer(peer.host.decode('ascii'), peer.port) + except UnicodeDecodeError as err: + logger.warning("Invalid host: {}".format(peer.host.decode('ascii', 'backslashreplace'))) + return if isinstance(stream, Iterable): with knownNodesLock: for s in stream: diff --git a/src/network/objectracker.py b/src/network/objectracker.py index abaefad7..dcea8d23 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -3,6 +3,7 @@ Module for tracking objects """ import time from threading import RLock +from binascii import hexlify import state import network.connectionpool as connectionpool @@ -87,19 +88,21 @@ class ObjectTracker(object): def handleReceivedInventory(self, hashId): """Handling received inventory""" + hex_hash = hexlify(hashId).decode('ascii') if haveBloom: - self.invBloom.add(hashId) + self.invBloom.add(hex_hash) try: with self.objectsNewToThemLock: - del self.objectsNewToThem[hashId] + del self.objectsNewToThem[hex_hash] except KeyError: pass - if hashId not in missingObjects: - missingObjects[hashId] = time.time() + if hex_hash not in missingObjects: + missingObjects[hex_hash] = time.time() self.objectsNewToMe[hashId] = True def handleReceivedObject(self, streamNumber, hashid): """Handling received object""" + hex_hash = hexlify(hashid).decode('ascii') for i in connectionpool.pool.connections(): if not i.fullyEstablished: continue @@ -110,7 +113,7 @@ class ObjectTracker(object): not state.Dandelion.hasHash(hashid) or state.Dandelion.objectChildStem(hashid) == i): with i.objectsNewToThemLock: - i.objectsNewToThem[hashid] = time.time() + i.objectsNewToThem[hex_hash] = time.time() # update stream number, # which we didn't have when we just received the dinv # also resets expiration of the stem mode @@ -119,7 +122,7 @@ class ObjectTracker(object): if i == self: try: with i.objectsNewToThemLock: - del i.objectsNewToThem[hashid] + del i.objectsNewToThem[hex_hash] except KeyError: pass self.objectsNewToMe.setLastObject() diff --git a/src/network/tcp.py b/src/network/tcp.py index 23866c9e..75768f22 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -185,7 +185,7 @@ class TCPConnection(BMProto, TLSDispatcher): return s.endswith(tail) except: try: - return s.decode().endswith(tail) + return s.decode('ascii').endswith(tail) except UnicodeDecodeError: return False diff --git a/src/protocol.py b/src/protocol.py index d1d2771f..eb2cc86f 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -487,7 +487,8 @@ def decryptAndCheckPubkeyPayload(data, address): encryptedData = data[readPosition:] # Let us try to decrypt the pubkey - toAddress, cryptorObject = state.neededPubkeys[tag] + hex_tag = 'tag-' + hexlify(tag).decode('ascii') + toAddress, cryptorObject = state.neededPubkeys[hex_tag] if toAddress != address: logger.critical( 'decryptAndCheckPubkeyPayload failed due to toAddress' diff --git a/src/randomtrackingdict.py b/src/randomtrackingdict.py index 5bf19181..26a40e36 100644 --- a/src/randomtrackingdict.py +++ b/src/randomtrackingdict.py @@ -3,6 +3,7 @@ Track randomize ordered dict """ from threading import RLock from time import time +from binascii import hexlify try: import helper_random @@ -38,10 +39,12 @@ class RandomTrackingDict(object): return self.len def __contains__(self, key): - return key in self.dictionary + hex_key = hexlify(key).decode('ascii') + return hex_key in self.dictionary def __getitem__(self, key): - return self.dictionary[key][1] + hex_key = hexlify(key).decode('ascii') + return self.dictionary[hex_key][1] def _swap(self, i1, i2): with self.lock: @@ -49,26 +52,30 @@ class RandomTrackingDict(object): key2 = self.indexDict[i2] self.indexDict[i1] = key2 self.indexDict[i2] = key1 - self.dictionary[key1][0] = i2 - self.dictionary[key2][0] = i1 + hex_key1 = hexlify(key1).decode('ascii') + hex_key2 = hexlify(key2).decode('ascii') + self.dictionary[hex_key1][0] = i2 + self.dictionary[hex_key2][0] = i1 # for quick reassignment return i2 def __setitem__(self, key, value): with self.lock: - if key in self.dictionary: - self.dictionary[key][1] = value + hex_key = hexlify(key).decode('ascii') + if hex_key in self.dictionary: + self.dictionary[hex_key][1] = value else: self.indexDict.append(key) - self.dictionary[key] = [self.len, value] + self.dictionary[hex_key] = [self.len, value] self._swap(self.len, self.len - self.pendingLen) self.len += 1 def __delitem__(self, key): - if key not in self.dictionary: + hex_key = hexlify(key).decode('ascii') + if hex_key not in self.dictionary: raise KeyError with self.lock: - index = self.dictionary[key][0] + index = self.dictionary[hex_key][0] # not pending if index < self.len - self.pendingLen: # left of pending part @@ -82,7 +89,7 @@ class RandomTrackingDict(object): # operation can improve 4x, but it's already very fast so we'll # ignore it for the time being del self.indexDict[-1] - del self.dictionary[key] + del self.dictionary[hex_key] self.len -= 1 def setMaxPending(self, maxPending): diff --git a/src/shared.py b/src/shared.py index b85ddb20..c1099c00 100644 --- a/src/shared.py +++ b/src/shared.py @@ -114,11 +114,13 @@ def reloadMyAddressHashes(): if len(privEncryptionKey) == 64: myECCryptorObjects[hashobj] = \ highlevelcrypto.makeCryptor(privEncryptionKey) - myAddressesByHash[hashobj] = addressInKeysFile + hex_hash = hexlify(hashobj).decode('ascii') + myAddressesByHash[hex_hash] = addressInKeysFile tag = highlevelcrypto.double_sha512( encodeVarint(addressVersionNumber) + encodeVarint(streamNumber) + hashobj)[32:] - myAddressesByTag[tag] = addressInKeysFile + hex_tag = hexlify(tag).decode('ascii') + myAddressesByTag[hex_tag] = addressInKeysFile if not keyfileSecure: fixSensitiveFilePermissions(os.path.join( diff --git a/src/storage/filesystem.py b/src/storage/filesystem.py index e756a820..531c80f5 100644 --- a/src/storage/filesystem.py +++ b/src/storage/filesystem.py @@ -70,7 +70,7 @@ class FilesystemInventory(InventoryStorage): os.makedirs(os.path.join( self.baseDir, FilesystemInventory.objectDir, - hexlify(hashval).decode())) + hexlify(hashval).decode('ascii'))) except OSError: pass try: @@ -78,7 +78,7 @@ class FilesystemInventory(InventoryStorage): os.path.join( self.baseDir, FilesystemInventory.objectDir, - hexlify(hashval).decode(), + hexlify(hashval).decode('ascii'), FilesystemInventory.metadataFilename, ), "w", @@ -87,12 +87,12 @@ class FilesystemInventory(InventoryStorage): value.type, value.stream, value.expires, - hexlify(value.tag).decode())) + hexlify(value.tag).decode('ascii'))) with open( os.path.join( self.baseDir, FilesystemInventory.objectDir, - hexlify(hashval).decode(), + hexlify(hashval).decode('ascii'), FilesystemInventory.dataFilename, ), "wb", @@ -119,7 +119,7 @@ class FilesystemInventory(InventoryStorage): os.path.join( self.baseDir, FilesystemInventory.objectDir, - hexlify(hashval).decode(), + hexlify(hashval).decode('ascii'), FilesystemInventory.metadataFilename)) except IOError: pass @@ -128,7 +128,7 @@ class FilesystemInventory(InventoryStorage): os.path.join( self.baseDir, FilesystemInventory.objectDir, - hexlify(hashval).decode(), + hexlify(hashval).decode('ascii'), FilesystemInventory.dataFilename)) except IOError: pass @@ -136,7 +136,7 @@ class FilesystemInventory(InventoryStorage): os.rmdir(os.path.join( self.baseDir, FilesystemInventory.objectDir, - hexlify(hashval).decode())) + hexlify(hashval).decode('ascii'))) except IOError: pass @@ -186,7 +186,7 @@ class FilesystemInventory(InventoryStorage): os.path.join( self.baseDir, FilesystemInventory.objectDir, - hexlify(hashId).decode(), + hexlify(hashId).decode('ascii'), FilesystemInventory.dataFilename, ), "r", @@ -202,7 +202,7 @@ class FilesystemInventory(InventoryStorage): os.path.join( self.baseDir, FilesystemInventory.objectDir, - hexlify(hashId).decode(), + hexlify(hashId).decode('ascii'), FilesystemInventory.metadataFilename, ), "r", diff --git a/src/storage/sqlite.py b/src/storage/sqlite.py index 385434e5..59700b60 100644 --- a/src/storage/sqlite.py +++ b/src/storage/sqlite.py @@ -4,6 +4,7 @@ Sqlite Inventory import sqlite3 import time from threading import RLock +from binascii import hexlify, unhexlify from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery from .storage import InventoryItem, InventoryStorage @@ -29,23 +30,24 @@ class SqliteInventory(InventoryStorage): def __contains__(self, hash_): with self.lock: - if hash_ in self._objects: + hex_hash = hexlify(hash_).decode('ascii') + if hex_hash in self._objects: return True rows = sqlQuery( - 'SELECT streamnumber FROM inventory WHERE hash=?', - sqlite3.Binary(hash_.encode())) + 'SELECT streamnumber FROM inventory WHERE hash=?', hash_) if not rows: return False - self._objects[hash_] = rows[0][0] + self._objects[hex_hash] = rows[0][0] return True def __getitem__(self, hash_): with self.lock: - if hash_ in self._inventory: - return self._inventory[hash_] + hex_hash = hexlify(hash_).decode('ascii') + if hex_hash in self._inventory: + return self._inventory[hex_hash] rows = sqlQuery( - b'SELECT objecttype, streamnumber, payload, expirestime, tag' - b' FROM inventory WHERE hash=?', sqlite3.Binary(hash_.encode())) + 'SELECT objecttype, streamnumber, payload, expirestime, tag' + ' FROM inventory WHERE hash=?', hash_) if not rows: raise KeyError(hash_) return InventoryItem(*rows[0]) @@ -53,16 +55,17 @@ class SqliteInventory(InventoryStorage): def __setitem__(self, hash_, value): with self.lock: value = InventoryItem(*value) - self._inventory[hash_] = value - self._objects[hash_] = value.stream + hex_hash = hexlify(hash_).decode('ascii') + self._inventory[hex_hash] = value + self._objects[hex_hash] = value.stream def __delitem__(self, hash_): raise NotImplementedError def __iter__(self): with self.lock: - hashes = self._inventory.keys()[:] - hashes += (x for x, in sqlQuery('SELECT hash FROM inventory')) + hashes = map(unhexlify, self._inventory.keys()[:]) + hashes += (unhexlify(x) for x, in sqlQuery('SELECT hash FROM inventory')) return hashes.__iter__() def __len__(self): @@ -80,7 +83,7 @@ class SqliteInventory(InventoryStorage): ' FROM inventory WHERE objecttype=?', objectType] if tag: query[0] += ' AND tag=?' - query.append(sqlite3.Binary(tag)) + query.append(tag) with self.lock: values = [ value for value in self._inventory.values() @@ -93,9 +96,9 @@ class SqliteInventory(InventoryStorage): """Return unexpired inventory vectors filtered by stream""" with self.lock: t = int(time.time()) - hashes = [x for x, value in self._inventory.items() + hashes = [unhexlify(x) for x, value in self._inventory.items() if value.stream == stream and value.expires > t] - hashes += (str(payload) for payload, in sqlQuery( + hashes += (payload for payload, in sqlQuery( 'SELECT hash FROM inventory WHERE streamnumber=?' ' AND expirestime>?', stream, t)) return hashes @@ -109,7 +112,7 @@ class SqliteInventory(InventoryStorage): for objectHash, value in self._inventory.items(): sql.execute( 'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', - sqlite3.Binary(objectHash), *value) + unhexlify(objectHash), *value) self._inventory.clear() def clean(self): diff --git a/src/tests/test_api_thread.py b/src/tests/test_api_thread.py index 6e453b19..beb81876 100644 --- a/src/tests/test_api_thread.py +++ b/src/tests/test_api_thread.py @@ -82,12 +82,12 @@ class TestAPIThread(TestPartialRun): proofofwork.init() self.assertEqual( unhexlify(self.api.disseminatePreparedObject( - hexlify(sample_object_data).decode())), + hexlify(sample_object_data).decode('ascii'))), calculateInventoryHash(sample_object_data)) update_object = b'\x00' * 8 + pack( '>Q', int(time.time() + 7200)) + sample_object_data[16:] invhash = unhexlify(self.api.disseminatePreEncryptedMsg( - hexlify(update_object).decode() + hexlify(update_object).decode('ascii') )) obj_type, obj_stream, obj_data = state.Inventory[invhash][:3] self.assertEqual(obj_type, 42)