Merge pull request #38 from jaicis/py3convert

Solved sending message issues
This commit is contained in:
jaicis 2020-01-27 21:44:06 +05:30 committed by GitHub
commit fef7dddb6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 83 additions and 135 deletions

View File

@ -22,12 +22,16 @@ def search_sql(
sqlStatementBase = '''SELECT label, address From addressbook '''
else:
sqlStatementBase = (
'''SELECT folder, msgid, toaddress, message, fromaddress,'''
'''SELECT folder, toaddress, message, fromaddress,'''
''' subject, received, read FROM inbox '''
)
sqlStatementParts = []
sqlArguments = []
if account is not None:
#xAddress = 'toaddress'
#where = ['subject', 'message']
#what = None
#unreadOnly = False
if xAddress == 'both':
sqlStatementParts.append("(fromaddress = ? OR toaddress = ?)")
sqlArguments.append(account)

View File

@ -48,7 +48,7 @@ class objectProcessor(threading.Thread):
def __init__(self):
threading.Thread.__init__(self, name="objectProcessor")
random.seed()
# It may be the case that the last time Bitmessage was running,
# It may be the case that the last time Bitmes0sage was running,
# the user closed it before it finished processing everything in the
# objectProcessorQueue. Assuming that Bitmessage wasn't closed
# forcefully, it should have saved the data in the queue into the
@ -69,9 +69,7 @@ class objectProcessor(threading.Thread):
"""Process the objects from `.queues.objectProcessorQueue`"""
while True:
objectType, data = queues.objectProcessorQueue.get()
self.checkackdata(data)
try:
if objectType == protocol.OBJECT_GETPUBKEY:
self.processgetpubkey(data)
@ -143,7 +141,7 @@ class objectProcessor(threading.Thread):
if bytes(data[readPosition:]) in shared.ackdataForWhichImWatching:
logger.info('This object is an acknowledgement bound for me.')
del shared.ackdataForWhichImWatching[data[readPosition:]]
del shared.ackdataForWhichImWatching[bytes(data[readPosition:])]
sqlExecute(
'UPDATE sent SET status=?, lastactiontime=?'
' WHERE ackdata=?',
@ -237,7 +235,7 @@ class objectProcessor(threading.Thread):
'the tag requested in this getpubkey request is: %s',
hexlify(requestedTag))
if bytes(requestedTag) in shared.myAddressesByTag:
myAddress = shared.myAddressesByTag[requestedTag]
myAddress = shared.myAddressesByTag[bytes(requestedTag)]
if myAddress == '':
logger.info('This getpubkey request is not for any of my keys.')
@ -436,14 +434,14 @@ class objectProcessor(threading.Thread):
return
tag = data[readPosition:readPosition + 32]
if tag not in bytes(state.neededPubkeys):
if bytes(tag) not in state.neededPubkeys:
logger.info(
'We don\'t need this v4 pubkey. We didn\'t ask for it.')
return
# Let us try to decrypt the pubkey
toAddress, _ = state.neededPubkeys[tag]
if protocol.decryptAndCheckPubkeyPayload(data, toAddress) == \
toAddress, _ = state.neededPubkeys[bytes(tag)] #check with py2
if protocol.decryptAndCheckPubkeyPayload(bytes(data), toAddress) == \
'successful':
# At this point we know that we have been waiting on this
# pubkey. This function will command the workerThread

View File

@ -224,6 +224,7 @@ class singleWorker(StoppableThread):
if log_time:
start_time = time.time()
trialValue, nonce = proofofwork.run(target, initialHash)
print("nonce calculated value#############################", nonce)
self.logger.info(
'%s Found proof of work %s Nonce: %s',
log_prefix, trialValue, nonce
@ -804,7 +805,6 @@ class singleWorker(StoppableThread):
highlevelcrypto.makeCryptor(
hexlify(privEncryptionKey))
)
for value in Inventory().by_type_and_tag(1, toTag):
# if valid, this function also puts it
# in the pubkeys table.
@ -850,17 +850,14 @@ class singleWorker(StoppableThread):
self.requestPubKey(toaddress)
# on with the next msg on which we can do some work
continue
# At this point we know that we have the necessary pubkey
# in the pubkeys table.
TTL *= 2**retryNumber
if TTL > 28 * 24 * 60 * 60:
TTL = 28 * 24 * 60 * 60
# add some randomness to the TTL
TTL = int(TTL + helper_random.randomrandrange(-300, 300))
embeddedTime = int(time.time() + TTL)
# if we aren't sending this to ourselves or a chan
if not BMConfigParser().has_section(toaddress):
shared.ackdataForWhichImWatching[ackdata] = 0
@ -872,14 +869,15 @@ class singleWorker(StoppableThread):
"Looking up the receiver\'s public key"))
))
self.logger.info('Sending a message.')
self.logger.debug(
'First 150 characters of message: %s',
repr(message[:150])
)
# self.logger.debug(
# 'First 150 characters of message: %s',
# repr(message[:150])
# )
# Let us fetch the recipient's public key out of
# our database. If the required proof of work difficulty
# is too hard then we'll abort.
queryreturn = sqlQuery(
'SELECT transmitdata FROM pubkeys WHERE address=?',
toaddress)
@ -946,7 +944,6 @@ class singleWorker(StoppableThread):
pubEncryptionKeyBase256 = pubkeyPayload[
readPosition:readPosition + 64]
readPosition += 64
# Let us fetch the amount of work required by the recipient.
if toAddressVersionNumber == 2:
requiredAverageProofOfWorkNonceTrialsPerByte = \
@ -962,6 +959,7 @@ class singleWorker(StoppableThread):
"There is no required difficulty for"
" version 2 addresses like this."))
))
elif toAddressVersionNumber >= 3:
requiredAverageProofOfWorkNonceTrialsPerByte, \
varintLength = decodeVarint(
@ -988,7 +986,6 @@ class singleWorker(StoppableThread):
requiredAverageProofOfWorkNonceTrialsPerByte,
requiredPayloadLengthExtraBytes
)
queues.UISignalQueue.put(
(
'updateSentItemStatusByAckdata',
@ -1013,17 +1010,15 @@ class singleWorker(StoppableThread):
)
)
)
if status != 'forcepow':
maxacceptablenoncetrialsperbyte = BMConfigParser().getint(
'bitmessagesettings', 'maxacceptablenoncetrialsperbyte')
maxacceptablepayloadlengthextrabytes = BMConfigParser().getint(
'bitmessagesettings', 'maxacceptablepayloadlengthextrabytes')
maxacceptablenoncetrialsperbyte = int(BMConfigParser().get(
'bitmessagesettings', 'maxacceptablenoncetrialsperbyte'))
maxacceptablepayloadlengthextrabytes = int(BMConfigParser().get(
'bitmessagesettings', 'maxacceptablepayloadlengthextrabytes'))
cond1 = maxacceptablenoncetrialsperbyte and \
requiredAverageProofOfWorkNonceTrialsPerByte > maxacceptablenoncetrialsperbyte
cond2 = maxacceptablepayloadlengthextrabytes and \
requiredPayloadLengthExtraBytes > maxacceptablepayloadlengthextrabytes
if cond1 or cond2:
# The demanded difficulty is more than
# we are willing to do.
@ -1051,7 +1046,6 @@ class singleWorker(StoppableThread):
self.logger.debug(
'First 150 characters of message: %r', message[:150])
behaviorBitfield = protocol.getBitfield(fromaddress)
try:
privEncryptionKeyBase58 = BMConfigParser().get(
toaddress, 'privencryptionkey')
@ -1088,7 +1082,6 @@ class singleWorker(StoppableThread):
"MainWindow",
"Doing work necessary to send message."))
))
# Now we can start to assemble our message.
payload = encodeVarint(fromAddressVersionNumber)
payload += encodeVarint(fromStreamNumber)
@ -1096,7 +1089,6 @@ class singleWorker(StoppableThread):
# that can be expected from me. (See
# https://bitmessage.org/wiki/Protocol_specification#Pubkey_bitfield_features)
payload += protocol.getBitfield(fromaddress)
# We need to convert our private keys to public keys in order
# to include them.
try:
@ -1113,9 +1105,7 @@ class singleWorker(StoppableThread):
" (your address) in the keys.dat file."))
))
continue
payload += pubSigningKey + pubEncryptionKey
if fromAddressVersionNumber >= 3:
# If the receiver of our message is in our address book,
# subscriptions list, or whitelist then we will allow them to
@ -1128,11 +1118,10 @@ class singleWorker(StoppableThread):
payload += encodeVarint(
defaults.networkDefaultPayloadLengthExtraBytes)
else:
payload += encodeVarint(BMConfigParser().getint(
fromaddress, 'noncetrialsperbyte'))
payload += encodeVarint(BMConfigParser().getint(
fromaddress, 'payloadlengthextrabytes'))
payload += encodeVarint(int(BMConfigParser().get(
fromaddress, 'noncetrialsperbyte')))
payload += encodeVarint(int(BMConfigParser().get(
fromaddress, 'payloadlengthextrabytes')))
# This hash will be checked by the receiver of the message
# to verify that toRipe belongs to them. This prevents
# a Surreptitious Forwarding Attack.
@ -1163,8 +1152,8 @@ class singleWorker(StoppableThread):
fullAckPayload = self.generateFullAckMessage(
ackdata, toStreamNumber, TTL)
payload += encodeVarint(len(fullAckPayload))
payload += fullAckPayload
dataToSign = pack('>Q', embeddedTime) + '\x00\x00\x00\x02' + \
payload += fullAckPayload if isinstance(fullAckPayload,bytes) else fullAckPayload.encode()
dataToSign = pack('>Q', embeddedTime) + '\x00\x00\x00\x02'.encode() + \
encodeVarint(1) + encodeVarint(toStreamNumber) + payload
signature = highlevelcrypto.sign(dataToSign, privSigningKeyHex)
payload += encodeVarint(len(signature))
@ -1173,7 +1162,7 @@ class singleWorker(StoppableThread):
# We have assembled the data that will be encrypted.
try:
encrypted = highlevelcrypto.encrypt(
payload, "04" + hexlify(pubEncryptionKeyBase256)
payload, "04".encode() + hexlify(pubEncryptionKeyBase256)
)
except:
sqlExecute(
@ -1190,9 +1179,8 @@ class singleWorker(StoppableThread):
).arg(l10n.formatTimestamp()))
))
continue
encryptedPayload = pack('>Q', embeddedTime)
encryptedPayload += '\x00\x00\x00\x02' # object type: msg
encryptedPayload += '\x00\x00\x00\x02'.encode() # object type: msg
encryptedPayload += encodeVarint(1) # msg version
encryptedPayload += encodeVarint(toStreamNumber) + encrypted
target = 2 ** 64 / (
@ -1206,17 +1194,16 @@ class singleWorker(StoppableThread):
))
self.logger.info(
'(For msg message) Doing proof of work. Total required'
' difficulty: %f. Required small message difficulty: %f.',
float(requiredAverageProofOfWorkNonceTrialsPerByte) /
' difficulty: {}. Required small message difficulty: {}.'.format
(float(requiredAverageProofOfWorkNonceTrialsPerByte) /
defaults.networkDefaultProofOfWorkNonceTrialsPerByte,
float(requiredPayloadLengthExtraBytes) /
defaults.networkDefaultPayloadLengthExtraBytes
defaults.networkDefaultPayloadLengthExtraBytes)
)
powStartTime = time.time()
initialHash = hashlib.sha512(encryptedPayload).digest()
trialValue, nonce = proofofwork.run(target, initialHash)
print("nonce calculated value#############################", nonce)
self.logger.info(
'(For msg message) Found proof of work %s Nonce: %s',
trialValue, nonce
@ -1229,7 +1216,6 @@ class singleWorker(StoppableThread):
)
except:
pass
encryptedPayload = pack('>Q', nonce) + encryptedPayload
# Sanity check. The encryptedPayload size should never be
@ -1243,10 +1229,11 @@ class singleWorker(StoppableThread):
len(encryptedPayload)
)
continue
inventoryHash = calculateInventoryHash(encryptedPayload)
objectType = 2
Inventory()[inventoryHash] = (
inventoryHashlist = (
objectType, toStreamNumber,encryptedPayload, embeddedTime, '')
Inventory()._realInventory[inventoryHash] = (
objectType, toStreamNumber, encryptedPayload, embeddedTime, '')
if BMConfigParser().has_section(toaddress) or \
not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK):
@ -1255,7 +1242,7 @@ class singleWorker(StoppableThread):
ackdata,
tr._translate(
"MainWindow",
"Message sent. Sent at %1"
"Mobileessage sent. Sent at %1"
).arg(l10n.formatTimestamp()))))
else:
# not sending to a chan or one of my addresses
@ -1273,7 +1260,6 @@ class singleWorker(StoppableThread):
hexlify(inventoryHash)
)
queues.invQueue.put((toStreamNumber, inventoryHash))
# Update the sent message in the sent table with the
# necessary information.
if BMConfigParser().has_section(toaddress) or \
@ -1289,7 +1275,6 @@ class singleWorker(StoppableThread):
inventoryHash, newStatus, retryNumber + 1,
sleepTill, int(time.time()), ackdata
)
# If we are sending to ourselves or a chan, let's put
# the message in our own inbox.
if BMConfigParser().has_section(toaddress):
@ -1327,7 +1312,6 @@ class singleWorker(StoppableThread):
toAddress
)
return
queryReturn = sqlQuery(
'''SELECT retrynumber FROM sent WHERE toaddress=? '''
''' AND (status='doingpubkeypow' OR status='awaitingpubkey') '''
@ -1342,7 +1326,6 @@ class singleWorker(StoppableThread):
)
return
retryNumber = queryReturn[0][0]
if addressVersionNumber <= 3:
state.neededPubkeys[toAddress] = 0
elif addressVersionNumber >= 4:
@ -1378,7 +1361,7 @@ class singleWorker(StoppableThread):
TTL = TTL + helper_random.randomrandrange(-300, 300)
embeddedTime = int(time.time() + TTL)
payload = pack('>Q', embeddedTime)
payload += '\x00\x00\x00\x00' # object type: getpubkey
payload += '\x00\x00\x00\x00'.encode() # object type: getpubkey
payload += encodeVarint(addressVersionNumber)
payload += encodeVarint(streamNumber)
if addressVersionNumber <= 3:
@ -1401,16 +1384,14 @@ class singleWorker(StoppableThread):
"MainWindow",
"Doing work necessary to request encryption key."))
))
payload = self._doPOWDefaults(payload, TTL)
inventoryHash = calculateInventoryHash(payload)
objectType = 1
Inventory()[inventoryHash] = (
Inventory()._realInventory[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, '')
# Inventory()._realInventory[inventoryHashlist]
self.logger.info('sending inv (for the getpubkey message)')
queues.invQueue.put((streamNumber, inventoryHash))
# wait 10% past expiration
sleeptill = int(time.time() + TTL * 1.1)
sqlExecute(
@ -1419,7 +1400,6 @@ class singleWorker(StoppableThread):
''' WHERE toaddress=? AND (status='doingpubkeypow' OR '''
''' status='awaitingpubkey') ''',
int(time.time()), retryNumber + 1, sleeptill, toAddress)
queues.UISignalQueue.put((
'updateStatusBar',
tr._translate(

View File

@ -573,6 +573,7 @@ class sqlThread(threading.Thread):
rowcount = 0
# print 'item', item
# print 'parameters', parameters
# if 'inbox' in item:
try:
self.cur.execute(item, parameters)
rowcount = self.cur.rowcount

View File

@ -54,6 +54,7 @@ def encrypt(msg, hexPubkey):
def decrypt(msg, hexPrivkey):
print("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS#################################################")
"""Decrypts message with hex private key"""
return makeCryptor(hexPrivkey).decrypt(msg)

View File

@ -123,7 +123,8 @@ def formatTimestamp(timestamp=None, as_unicode=True):
timestring = time.strftime(time_format)
if as_unicode:
return unicode(timestring, encoding)
return (timestring.encode('utf-8'))
# return unicode(timestring, encoding)
return timestring

View File

@ -116,7 +116,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes
# if it's a stem duplicate, pretend we don't have it
if Dandelion().hasHash(self.inventoryHash):
return
if self.inventoryHash in Inventory():
if self.inventoryHash in Inventory()._realInventory:
raise BMObjectAlreadyHaveError()
def checkObjectByType(self):

View File

@ -34,20 +34,6 @@ from network.node import Node, Peer
from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue
from network.randomtrackingdict import RandomTrackingDict
global addr_count
addr_count = 0
global addr_verack
addr_verack = 0
global addr_version
addr_version = 0
# global addr_count
# addr_count = 0
count = 0
logger = logging.getLogger('default')
@ -100,14 +86,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
# its shoule be in string
self.command = self.command.rstrip('\x00'.encode('utf-8'))
# pylint: disable=global-statement
global count, addr_version, addr_count, addr_verack
count += 1
if self.command == 'verack'.encode():
addr_verack += 1
if self.command == 'version'.encode():
addr_version += 1
if self.command == 'addr'.encode():
addr_count += 1
if self.magic != 0xE9BEB4D9:
self.set_state("bm_header", length=1)
self.bm_proto_reset()
@ -364,7 +342,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
if now < self.skipUntil:
return True
for i in items:
self.pendingUpload[str(i)] = now
self.pendingUpload[bytes(i)] = now
return True
def _command_inv(self, dandelion=False):
@ -378,9 +356,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
# ignore dinv if dandelion turned off
if dandelion and not state.dandelion:
return True
for i in map(bytes, items):
if i in Inventory() and not Dandelion().hasHash(i):
if i in Inventory()._realInventory and not Dandelion().hasHash(i):
continue
if dandelion and not Dandelion().hasHash(i):
Dandelion().addHash(i, self)
@ -441,7 +418,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
except KeyError:
pass
if self.object.inventoryHash in Inventory() and Dandelion().hasHash(self.object.inventoryHash):
if self.object.inventoryHash in Inventory()._realInventory and Dandelion().hasHash(self.object.inventoryHash):
Dandelion().removeHash(self.object.inventoryHash, "cycle detection")
[self.object.inventoryHash] = (

View File

@ -61,7 +61,7 @@ class DownloadThread(StoppableThread):
payload = bytearray()
chunkCount = 0
for chunk in request:
if chunk in Inventory() and not Dandelion().hasHash(chunk):
if chunk in Inventory()._realInventory and not Dandelion().hasHash(chunk):
try:
del i.objectsNewToMe[chunk]
except KeyError:

View File

@ -87,19 +87,18 @@ class InvThread(StoppableThread):
fluffs.append(inv[1])
except KeyError:
fluffs.append(inv[1])
if fluffs:
random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket(
'inv',
addresses.encodeVarint(
len(fluffs)) + ''.join(fluffs)))
len(fluffs)) + ('').encode().join([x for x in fluffs]))) #compare result with python2
if stems:
random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket(
'dinv',
addresses.encodeVarint(
len(stems)) + ''.join(stems)))
len(stems)) + ('').encode().join([x for x in stems]))) #compare result with python2
invQueue.iterate()
for _ in range(len(chunk)):

View File

@ -211,7 +211,7 @@ class TCPConnection(BMProto, TLSDispatcher):
# may lock for a long time, but I think it's better than
# thousands of small locks
with self.objectsNewToThemLock:
for objHash in Inventory().unexpired_hashes_by_stream(stream):
for objHash in Inventory()._realInventory.unexpired_hashes_by_stream(stream):
# don't advertise stem objects on bigInv
if Dandelion().hasHash(objHash):
continue

View File

@ -45,6 +45,7 @@ class UploadThread(StoppableThread):
if Dandelion().hasHash(chunk) and \
i != Dandelion().objectChildStem(chunk):
i.antiIntersectionDelay()
print
self.logger.info(
'%s asked for a stem object we didn\'t offer to it.',
i.destination)

View File

@ -110,11 +110,10 @@ def _doFastPoW(target, initialHash):
time.sleep(0.2)
def _doCPoW(target, initialHash):
h = initialHash
m = target
out_h = ctypes.pointer(ctypes.create_string_buffer(h, 64))
out_m = ctypes.c_ulonglong(m)
out_h = ctypes.pointer(ctypes.create_string_buffer(initialHash, 64))
out_m = ctypes.c_ulonglong(target)
logger.debug("C PoW start")
nonce = bmpow(out_h, out_m)
trialValue, = unpack('>Q', hashlib.sha512(hashlib.sha512(pack('>Q', nonce) + initialHash).digest()).digest()[0:8])
@ -241,7 +240,6 @@ def buildCPoW():
def run(target, initialHash):
"""Run the proof of work thread"""
if state.shutdown != 0:
raise # pylint: disable=misplaced-bare-raise
target = int(target)
@ -332,7 +330,7 @@ def init():
import glob
try:
bso = ctypes.CDLL(glob.glob(os.path.join(
paths.codePath(), "bitmsghash", "bitmsghash*.so"
paths.codePath(), " ", "bitmsghash*.so"
))[0])
except (OSError, IndexError):
bso = None

View File

@ -431,7 +431,7 @@ def decryptAndCheckPubkeyPayload(data, address):
encryptedData = data[readPosition:]
# Let us try to decrypt the pubkey
toAddress, cryptorObject = state.neededPubkeys[tag]
toAddress, cryptorObject = state.neededPubkeys[bytes(tag)]
if toAddress != address:
logger.critical(
'decryptAndCheckPubkeyPayload failed due to toAddress'
@ -444,6 +444,7 @@ def decryptAndCheckPubkeyPayload(data, address):
# That sort of address-malleability should have been caught
# by the UI or API and an error given to the user.
return 'failed'
print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW#################################################")
try:
decryptedData = cryptorObject.decrypt(encryptedData)
except:
@ -451,13 +452,13 @@ def decryptAndCheckPubkeyPayload(data, address):
# but tagged it with a tag for which we are watching.
logger.info('Pubkey decryption was unsuccessful.')
return 'failed'
readPosition = 0
# bitfieldBehaviors = decryptedData[readPosition:readPosition + 4]
readPosition += 4
publicSigningKey = '\x04' + decryptedData[readPosition:readPosition + 64]
print("working fine till here#################################################################")
publicSigningKey = '\x04'.encode() + decryptedData[readPosition:readPosition + 64]
readPosition += 64
publicEncryptionKey = '\x04' + decryptedData[readPosition:readPosition + 64]
publicEncryptionKey = '\x04'.encode() + decryptedData[readPosition:readPosition + 64]
readPosition += 64
specifiedNonceTrialsPerByteLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])[1]
@ -471,7 +472,6 @@ def decryptAndCheckPubkeyPayload(data, address):
decryptedData[readPosition:readPosition + 10])
readPosition += signatureLengthLength
signature = decryptedData[readPosition:readPosition + signatureLength]
if not highlevelcrypto.verify(
signedData, signature, hexlify(publicSigningKey)):
logger.info(
@ -480,11 +480,9 @@ def decryptAndCheckPubkeyPayload(data, address):
logger.info(
'ECDSA verify passed (within decryptAndCheckPubkeyPayload)')
sha = hashlib.new('sha512')
sha.update(publicSigningKey + publicEncryptionKey)
embeddedRipe = RIPEMD160Hash(sha.digest()).digest()
if embeddedRipe != ripe:
# Although this pubkey object had the tag were were looking for
# and was encrypted with the correct encryption key,
@ -503,9 +501,9 @@ def decryptAndCheckPubkeyPayload(data, address):
addressVersion, streamNumber, hexlify(ripe),
hexlify(publicSigningKey), hexlify(publicEncryptionKey)
)
t = (address, addressVersion, storedData, int(time.time()), 'yes')
sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t)
print("successful Insertion of pubkey hurray#################################################")
return 'successful'
except varintDecodeError:
logger.info(

View File

@ -29,7 +29,6 @@ class SqliteInventory(InventoryStorage): # pylint: disable=too-many-ancestors
self.lock = RLock()
def __contains__(self, hash_):
print('----------contains------------------')
with self.lock:
if hash_ in self._objects:
return True
@ -42,42 +41,32 @@ class SqliteInventory(InventoryStorage): # pylint: disable=too-many-ancestors
return True
def __getitem__(self, hash_):
if hash_ == 0:
hash_ = bytes()
with self.lock:
try:
if hash_ in self._inventory:
return self._inventory[hash_]
rows = sqlQuery(
'SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?',
sqlite3.Binary(hash_))
if not rows:
pass
# raise KeyError(hash_)
except:
pass
if hash_ in self._inventory:
return self._inventory[hash_]
rows = sqlQuery(
'SELECT objecttype, streamnumber, payload, expirestime, tag'
' FROM inventory WHERE hash=?', sqlite3.Binary(hash_))
if not rows:
raise KeyError(hash_)
return InventoryItem(*rows[0])
def __setitem__(self, hash_, value):
print('----------__setitem__------------------')
with self.lock:
value = InventoryItem(*value)
self._inventory[hash_] = value
self._objects[hash_] = value.stream
def __delitem__(self, hash_):
print('----------__delitem__------------------')
raise NotImplementedError
def __iter__(self):
print('----------__iter__------------------')
with self.lock:
hashes = self._inventory.keys()[:]
hashes += (x for x, in sqlQuery('SELECT hash FROM inventory'))
return hashes.__iter__()
def __len__(self):
print('----------__len__------------------')
with self.lock:
return len(self._inventory) + sqlQuery(
'SELECT count(*) FROM inventory')[0][0]
@ -99,9 +88,10 @@ class SqliteInventory(InventoryStorage): # pylint: disable=too-many-ancestors
t = int(time.time())
hashes = [x for x, value in self._inventory.items()
if value.stream == stream and value.expires > t]
hashes += (str(payload) for payload, in sqlQuery(
hashes += (payload for payload, in sqlQuery(
'SELECT hash FROM inventory WHERE streamnumber=?'
' AND expirestime>?', stream, t))
# print('sqlllllllllllllllllllllllllllllllllll',hashes)
return hashes
def flush(self):