Merge pull request from Atheros1/master

smarter commits Vastly improves disk performance. Also added knownNodesLock
This commit is contained in:
Jonathan Warren 2013-04-04 11:56:17 -07:00
commit 3f959d5c77
2 changed files with 72 additions and 111 deletions

View File

@ -148,7 +148,9 @@ class outgoingSynSender(QThread):
printLock.release() printLock.release()
PORT, timeLastSeen = knownNodes[self.streamNumber][HOST] PORT, timeLastSeen = knownNodes[self.streamNumber][HOST]
if (int(time.time())-timeLastSeen) > 172800 and len(knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure. if (int(time.time())-timeLastSeen) > 172800 and len(knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
knownNodesLock.acquire()
del knownNodes[self.streamNumber][HOST] del knownNodes[self.streamNumber][HOST]
knownNodesLock.release()
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.' print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
except socks.Socks5AuthError, err: except socks.Socks5AuthError, err:
self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"SOCKS5 Authentication problem: "+str(err)) self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"SOCKS5 Authentication problem: "+str(err))
@ -169,7 +171,9 @@ class outgoingSynSender(QThread):
printLock.release() printLock.release()
PORT, timeLastSeen = knownNodes[self.streamNumber][HOST] PORT, timeLastSeen = knownNodes[self.streamNumber][HOST]
if (int(time.time())-timeLastSeen) > 172800 and len(knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure. if (int(time.time())-timeLastSeen) > 172800 and len(knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
knownNodesLock.acquire()
del knownNodes[self.streamNumber][HOST] del knownNodes[self.streamNumber][HOST]
knownNodesLock.release()
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.' print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
except Exception, err: except Exception, err:
print 'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:', err print 'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:', err
@ -323,7 +327,9 @@ class receiveDataThread(QThread):
#print 'message checksum is correct' #print 'message checksum is correct'
#The time we've last seen this node is obviously right now since we just received valid data from it. So update the knownNodes list so that other peers can be made aware of its existance. #The time we've last seen this node is obviously right now since we just received valid data from it. So update the knownNodes list so that other peers can be made aware of its existance.
if self.initiatedConnection: #The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port). if self.initiatedConnection: #The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
knownNodesLock.acquire()
knownNodes[self.streamNumber][self.HOST] = (self.PORT,int(time.time())) knownNodes[self.streamNumber][self.HOST] = (self.PORT,int(time.time()))
knownNodesLock.release()
if self.payloadLength <= 180000000: #If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.) if self.payloadLength <= 180000000: #If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.)
remoteCommand = self.data[4:16] remoteCommand = self.data[4:16]
printLock.acquire() printLock.acquire()
@ -623,6 +629,7 @@ class receiveDataThread(QThread):
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it.
@ -654,6 +661,7 @@ class receiveDataThread(QThread):
sqlSubmitQueue.put('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.emit(SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),self.inventoryHash,toAddress,fromAddress,subject,body) self.emit(SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),self.inventoryHash,toAddress,fromAddress,subject,body)
@ -749,6 +757,7 @@ class receiveDataThread(QThread):
sqlSubmitQueue.put('UPDATE sent SET status=? WHERE ackdata=?') sqlSubmitQueue.put('UPDATE sent SET status=? WHERE ackdata=?')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),encryptedData[readPosition:],'Acknowledgement of the message received just now.') self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),encryptedData[readPosition:],'Acknowledgement of the message received just now.')
return return
@ -850,6 +859,7 @@ class receiveDataThread(QThread):
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
workerQueue.put(('newpubkey',(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. workerQueue.put(('newpubkey',(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it.
blockMessage = False #Gets set to True if the user shouldn't see the message according to black or white lists. blockMessage = False #Gets set to True if the user shouldn't see the message according to black or white lists.
@ -923,6 +933,7 @@ class receiveDataThread(QThread):
sqlSubmitQueue.put('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.emit(SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),self.inventoryHash,toAddress,fromAddress,subject,body) self.emit(SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),self.inventoryHash,toAddress,fromAddress,subject,body)
@ -954,6 +965,7 @@ class receiveDataThread(QThread):
sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.emit(SIGNAL("displayNewSentMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),toAddress,'[Broadcast subscribers]',fromAddress,subject,message,ackdata) self.emit(SIGNAL("displayNewSentMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),toAddress,'[Broadcast subscribers]',fromAddress,subject,message,ackdata)
@ -1112,6 +1124,7 @@ class receiveDataThread(QThread):
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
printLock.acquire() printLock.acquire()
printLock.release() printLock.release()
@ -1123,6 +1136,7 @@ class receiveDataThread(QThread):
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
printLock.acquire() printLock.acquire()
printLock.release() printLock.release()
@ -1394,10 +1408,14 @@ class receiveDataThread(QThread):
continue continue
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',data[lengthOfNumberOfAddresses+(34*i):4+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message. timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',data[lengthOfNumberOfAddresses+(34*i):4+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message.
if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it. if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
knownNodesLock.acquire()
knownNodes[recaddrStream] = {} knownNodes[recaddrStream] = {}
knownNodesLock.release()
if hostFromAddrMessage not in knownNodes[recaddrStream]: if hostFromAddrMessage not in knownNodes[recaddrStream]:
if len(knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time())-10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): #If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. if len(knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time())-10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): #If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
knownNodesLock.acquire()
knownNodes[recaddrStream][hostFromAddrMessage] = (recaddrPort, timeSomeoneElseReceivedMessageFromThisNode) knownNodes[recaddrStream][hostFromAddrMessage] = (recaddrPort, timeSomeoneElseReceivedMessageFromThisNode)
knownNodesLock.release()
print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream
needToWriteKnownNodesToDisk = True needToWriteKnownNodesToDisk = True
hostDetails = (timeSomeoneElseReceivedMessageFromThisNode, recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort) hostDetails = (timeSomeoneElseReceivedMessageFromThisNode, recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
@ -1405,12 +1423,16 @@ class receiveDataThread(QThread):
else: else:
PORT, timeLastReceivedMessageFromThisNode = knownNodes[recaddrStream][hostFromAddrMessage]#PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. PORT, timeLastReceivedMessageFromThisNode = knownNodes[recaddrStream][hostFromAddrMessage]#PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())): if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
knownNodesLock.acquire()
knownNodes[recaddrStream][hostFromAddrMessage] = (PORT, timeSomeoneElseReceivedMessageFromThisNode) knownNodes[recaddrStream][hostFromAddrMessage] = (PORT, timeSomeoneElseReceivedMessageFromThisNode)
knownNodesLock.release()
if PORT != recaddrPort: if PORT != recaddrPort:
print 'Strange occurance: The port specified in an addr message', str(recaddrPort),'does not match the port',str(PORT),'that this program (or some other peer) used to connect to it',str(hostFromAddrMessage),'. Perhaps they changed their port or are using a strange NAT configuration.' print 'Strange occurance: The port specified in an addr message', str(recaddrPort),'does not match the port',str(PORT),'that this program (or some other peer) used to connect to it',str(hostFromAddrMessage),'. Perhaps they changed their port or are using a strange NAT configuration.'
if needToWriteKnownNodesToDisk: #Runs if any nodes were new to us. Also, share those nodes with our peers. if needToWriteKnownNodesToDisk: #Runs if any nodes were new to us. Also, share those nodes with our peers.
output = open(appdata + 'knownnodes.dat', 'wb') output = open(appdata + 'knownnodes.dat', 'wb')
knownNodesLock.acquire()
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
knownNodesLock.release()
output.close() output.close()
self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers) self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
printLock.acquire() printLock.acquire()
@ -1552,9 +1574,11 @@ class receiveDataThread(QThread):
printLock.release() printLock.release()
return return
knownNodesLock.acquire()
knownNodes[self.streamNumber][self.HOST] = (self.remoteNodeIncomingPort, int(time.time())) knownNodes[self.streamNumber][self.HOST] = (self.remoteNodeIncomingPort, int(time.time()))
output = open(appdata + 'knownnodes.dat', 'wb') output = open(appdata + 'knownnodes.dat', 'wb')
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
knownNodesLock.release()
output.close() output.close()
self.sendverack() self.sendverack()
@ -1770,6 +1794,7 @@ def flushInventory():
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
del inventory[hash] del inventory[hash]
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
def isInSqlInventory(hash): def isInSqlInventory(hash):
@ -1974,13 +1999,16 @@ class sqlThread(QThread):
while True: while True:
item = sqlSubmitQueue.get() item = sqlSubmitQueue.get()
parameters = sqlSubmitQueue.get() if item == 'commit':
#print 'item', item self.conn.commit()
#print 'parameters', parameters else:
self.cur.execute(item, parameters) parameters = sqlSubmitQueue.get()
sqlReturnQueue.put(self.cur.fetchall()) #print 'item', item
sqlSubmitQueue.task_done() #print 'parameters', parameters
self.conn.commit() self.cur.execute(item, parameters)
sqlReturnQueue.put(self.cur.fetchall())
#sqlSubmitQueue.task_done()
'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. '''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy.
@ -2009,12 +2037,13 @@ class singleCleaner(QThread):
self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"Doing housekeeping (Flushing inventory in memory to disk...)") self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"Doing housekeeping (Flushing inventory in memory to disk...)")
for hash, storedValue in inventory.items(): for hash, storedValue in inventory.items():
objectType, streamNumber, payload, receivedTime = storedValue objectType, streamNumber, payload, receivedTime = storedValue
if int(time.time())- 600 > receivedTime: if int(time.time())- 3600 > receivedTime:
t = (hash,objectType,streamNumber,payload,receivedTime) t = (hash,objectType,streamNumber,payload,receivedTime)
sqlSubmitQueue.put('''INSERT INTO inventory VALUES (?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO inventory VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
del inventory[hash] del inventory[hash]
sqlSubmitQueue.put('commit')
self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"") self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"")
sqlLock.release() sqlLock.release()
broadcastToSendDataQueues((0, 'pong', 'no data')) #commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. broadcastToSendDataQueues((0, 'pong', 'no data')) #commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes.
@ -2033,6 +2062,7 @@ class singleCleaner(QThread):
sqlSubmitQueue.put('''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''') sqlSubmitQueue.put('''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
t = () t = ()
sqlSubmitQueue.put('''select toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber FROM sent WHERE ((status='findingpubkey' OR status='sentmessage') AND folder='sent') ''') #If the message's folder='trash' then we'll ignore it. sqlSubmitQueue.put('''select toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber FROM sent WHERE ((status='findingpubkey' OR status='sentmessage') AND folder='sent') ''') #If the message's folder='trash' then we'll ignore it.
@ -2062,6 +2092,7 @@ class singleCleaner(QThread):
sqlReturnQueue.get() sqlReturnQueue.get()
workerQueue.put(('sendmessage',toaddress)) workerQueue.put(('sendmessage',toaddress))
self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"Doing work necessary to again attempt to deliver a message...") self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"Doing work necessary to again attempt to deliver a message...")
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
@ -2217,6 +2248,7 @@ class singleWorker(QThread):
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get() queryreturn = sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
@ -2297,69 +2329,8 @@ class singleWorker(QThread):
sqlSubmitQueue.put('UPDATE sent SET status=?, lastactiontime=? WHERE fromaddress=? AND subject=? AND message=? AND status=?') sqlSubmitQueue.put('UPDATE sent SET status=?, lastactiontime=? WHERE fromaddress=? AND subject=? AND message=? AND status=?')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get() queryreturn = sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
"""elif addressVersionNumber == 1: #This whole section can be taken out soon because we aren't supporting v1 addresses for much longer.
messageToTransmit = '\x02' #message encoding type
messageToTransmit += encodeVarint(len('Subject:' + subject + '\n' + 'Body:' + body)) #Type 2 is simple UTF-8 message encoding.
messageToTransmit += 'Subject:' + subject + '\n' + 'Body:' + body
#We need the all the integers for our private key in order to sign our message, and we need our public key to send with the message.
n = config.getint(fromaddress, 'n')
e = config.getint(fromaddress, 'e')
d = config.getint(fromaddress, 'd')
p = config.getint(fromaddress, 'p')
q = config.getint(fromaddress, 'q')
nString = convertIntToString(n)
eString = convertIntToString(e)
#myPubkey = rsa.PublicKey(n,e)
myPrivatekey = rsa.PrivateKey(n,e,d,p,q)
#The payload of the broadcast message starts with a POW, but that will be added later.
payload = pack('>I',(int(time.time())))
payload += encodeVarint(1) #broadcast version
payload += encodeVarint(addressVersionNumber)
payload += encodeVarint(streamNumber)
payload += ripe
payload += encodeVarint(len(nString))
payload += nString
payload += encodeVarint(len(eString))
payload += eString
payload += messageToTransmit
signature = rsa.sign(messageToTransmit,myPrivatekey,'SHA-512')
#print 'signature', signature.encode('hex')
payload += signature
#print 'nString', repr(nString)
#print 'eString', repr(eString)
nonce = 0
trialValue = 99999999999999999999
target = 2**64 / ((len(payload)+payloadLengthExtraBytes+8) * averageProofOfWorkNonceTrialsPerByte)
print '(For broadcast message) Doing proof of work...'
initialHash = hashlib.sha512(payload).digest()
while trialValue > target:
nonce += 1
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
print '(For broadcast message) Found proof of work', trialValue, 'Nonce:', nonce
payload = pack('>Q',nonce) + payload
inventoryHash = calculateInventoryHash(payload)
objectType = 'broadcast'
inventory[inventoryHash] = (objectType, streamNumber, payload, int(time.time()))
print 'sending inv (within sendBroadcast function)'
broadcastToSendDataQueues((streamNumber, 'sendinv', inventoryHash))
self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),ackdata,'Broadcast sent at '+unicode(strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8'))
#Update the status of the message in the 'sent' table to have a 'broadcastsent' status
sqlLock.acquire()
t = ('broadcastsent',int(time.time()),fromaddress, subject, body,'broadcastpending')
sqlSubmitQueue.put('UPDATE sent SET status=?, lastactiontime=? WHERE fromaddress=? AND subject=? AND message=? AND status=?')
sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get()
sqlLock.release()"""
else: else:
printLock.acquire() printLock.acquire()
print 'In the singleWorker thread, the sendBroadcast function doesn\'t understand the address version' print 'In the singleWorker thread, the sendBroadcast function doesn\'t understand the address version'
@ -2371,6 +2342,7 @@ class singleWorker(QThread):
sqlSubmitQueue.put('UPDATE sent SET status=? WHERE status=? AND toripe=?') sqlSubmitQueue.put('UPDATE sent SET status=? WHERE status=? AND toripe=?')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get() queryreturn = sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
t = ('doingpow',toRipe) t = ('doingpow',toRipe)
sqlSubmitQueue.put('SELECT toaddress, fromaddress, subject, message, ackdata FROM sent WHERE status=? AND toripe=?') sqlSubmitQueue.put('SELECT toaddress, fromaddress, subject, message, ackdata FROM sent WHERE status=? AND toripe=?')
@ -2488,41 +2460,6 @@ class singleWorker(QThread):
readPosition += 64 readPosition += 64
encrypted = highlevelcrypto.encrypt(payload,"04"+pubEncryptionKeyBase256.encode('hex')) encrypted = highlevelcrypto.encrypt(payload,"04"+pubEncryptionKeyBase256.encode('hex'))
"""elif toAddressVersionNumber == 1:
sqlLock.acquire()
sqlSubmitQueue.put('SELECT transmitdata FROM pubkeys WHERE hash=?')
sqlSubmitQueue.put((toRipe,))
queryreturn = sqlReturnQueue.get()
sqlLock.release()
for row in queryreturn:
pubkeyPayload, = row
readPosition = 8 #to bypass the nonce
behaviorBitfield = pubkeyPayload[8:12]
readPosition += 4 #to bypass the bitfield of behaviors
addressVersion, addressVersionLength = decodeVarint(pubkeyPayload[readPosition:readPosition+10])
readPosition += addressVersionLength
streamNumber, streamNumberLength = decodeVarint(pubkeyPayload[readPosition:readPosition+10])
readPosition += streamNumberLength
nLength, nLengthLength = decodeVarint(pubkeyPayload[readPosition:readPosition+10])
readPosition += nLengthLength
n = convertStringToInt(pubkeyPayload[readPosition:readPosition+nLength])
readPosition += nLength
eLength, eLengthLength = decodeVarint(pubkeyPayload[readPosition:readPosition+10])
readPosition += eLengthLength
e = convertStringToInt(pubkeyPayload[readPosition:readPosition+eLength])
receiversPubkey = rsa.PublicKey(n,e)
infile = cStringIO.StringIO(payload)
outfile = cStringIO.StringIO()
#print 'Encrypting using public key:', receiversPubkey
encrypt_bigfile(infile,outfile,receiversPubkey)
encrypted = outfile.getvalue()
infile.close()
outfile.close()"""
nonce = 0 nonce = 0
trialValue = 99999999999999999999 trialValue = 99999999999999999999
@ -2561,7 +2498,7 @@ class singleWorker(QThread):
sqlSubmitQueue.put('''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=?''') sqlSubmitQueue.put('''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=?''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get() queryreturn = sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
@ -3032,6 +2969,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
sqlSubmitQueue.put('''UPDATE inbox SET folder='trash' WHERE msgid=?''') sqlSubmitQueue.put('''UPDATE inbox SET folder='trash' WHERE msgid=?''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
apiSignalQueue.put(('updateStatusBar','Per API: Trashed message (assuming message existed). UI not updated.')) apiSignalQueue.put(('updateStatusBar','Per API: Trashed message (assuming message existed). UI not updated.'))
return 'Trashed message (assuming message existed). UI not updated. To double check, run getAllInboxMessages to see that the message disappeared, or restart Bitmessage and look in the normal Bitmessage GUI.' return 'Trashed message (assuming message existed). UI not updated. To double check, run getAllInboxMessages to see that the message disappeared, or restart Bitmessage and look in the normal Bitmessage GUI.'
@ -3092,6 +3030,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
toLabel = '' toLabel = ''
@ -3152,6 +3091,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
toLabel = '[Broadcast subscribers]' toLabel = '[Broadcast subscribers]'
@ -4030,6 +3970,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
@ -4104,6 +4045,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''') sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
workerQueue.put(('sendbroadcast',(fromAddress,subject,message))) workerQueue.put(('sendbroadcast',(fromAddress,subject,message)))
@ -4342,6 +4284,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''INSERT INTO addressbook VALUES (?,?)''') sqlSubmitQueue.put('''INSERT INTO addressbook VALUES (?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get() queryreturn = sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
else: else:
@ -4375,6 +4318,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''INSERT INTO subscriptions VALUES (?,?,?)''') sqlSubmitQueue.put('''INSERT INTO subscriptions VALUES (?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get() queryreturn = sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
self.reloadBroadcastSendersForWhichImWatching() self.reloadBroadcastSendersForWhichImWatching()
@ -4467,9 +4411,11 @@ class MyForm(QtGui.QMainWindow):
with open('keys.dat', 'wb') as configfile: with open('keys.dat', 'wb') as configfile:
config.write(configfile) config.write(configfile)
#Write the knownnodes.dat file to disk in the new location #Write the knownnodes.dat file to disk in the new location
knownNodesLock.acquire()
output = open('knownnodes.dat', 'wb') output = open('knownnodes.dat', 'wb')
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
output.close() output.close()
knownNodesLock.release()
os.remove(appdata + 'keys.dat') os.remove(appdata + 'keys.dat')
os.remove(appdata + 'knownnodes.dat') os.remove(appdata + 'knownnodes.dat')
appdata = '' appdata = ''
@ -4484,9 +4430,11 @@ class MyForm(QtGui.QMainWindow):
with open(appdata + 'keys.dat', 'wb') as configfile: with open(appdata + 'keys.dat', 'wb') as configfile:
config.write(configfile) config.write(configfile)
#Write the knownnodes.dat file to disk in the new location #Write the knownnodes.dat file to disk in the new location
knownNodesLock.acquire()
output = open(appdata + 'knownnodes.dat', 'wb') output = open(appdata + 'knownnodes.dat', 'wb')
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
output.close() output.close()
knownNodesLock.release()
os.remove('keys.dat') os.remove('keys.dat')
os.remove('knownnodes.dat') os.remove('knownnodes.dat')
QMessageBox.about(self, "Restart", "Bitmessage has moved most of your config files to the application data directory but you must restart Bitmessage to move the last file (the file which holds messages).") QMessageBox.about(self, "Restart", "Bitmessage has moved most of your config files to the application data directory but you must restart Bitmessage to move the last file (the file which holds messages).")
@ -4544,6 +4492,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''INSERT INTO whitelist VALUES (?,?,?)''') sqlSubmitQueue.put('''INSERT INTO whitelist VALUES (?,?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get() queryreturn = sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
else: else:
self.statusBar().showMessage('Error: You cannot add the same address to your list twice. Perhaps rename the existing one if you want.') self.statusBar().showMessage('Error: You cannot add the same address to your list twice. Perhaps rename the existing one if you want.')
@ -4619,7 +4568,7 @@ class MyForm(QtGui.QMainWindow):
printLock.acquire() printLock.acquire()
print 'Closing. Flushing inventory in memory out to disk...' print 'Closing. Flushing inventory in memory out to disk...'
printLock.release() printLock.release()
self.statusBar().showMessage('Flushing inventory in memory out to disk. This may take several minutes...') self.statusBar().showMessage('Flushing inventory in memory out to disk. This should normally only take a second...')
flushInventory() flushInventory()
#This one last useless query will guarantee that the previous query committed before we close the program. #This one last useless query will guarantee that the previous query committed before we close the program.
@ -4630,9 +4579,11 @@ class MyForm(QtGui.QMainWindow):
sqlLock.release() sqlLock.release()
self.statusBar().showMessage('Saving the knownNodes list of peers to disk...') self.statusBar().showMessage('Saving the knownNodes list of peers to disk...')
knownNodesLock.acquire()
output = open(appdata + 'knownnodes.dat', 'wb') output = open(appdata + 'knownnodes.dat', 'wb')
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
output.close() output.close()
knownNodesLock.release()
self.trayIcon.hide() self.trayIcon.hide()
printLock.acquire() printLock.acquire()
@ -4702,6 +4653,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''INSERT INTO addressbook VALUES (?,?)''') sqlSubmitQueue.put('''INSERT INTO addressbook VALUES (?,?)''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get() queryreturn = sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.ui.tabWidget.setCurrentIndex(5) self.ui.tabWidget.setCurrentIndex(5)
self.ui.tableWidgetAddressBook.setCurrentCell(0,0) self.ui.tableWidgetAddressBook.setCurrentCell(0,0)
@ -4720,6 +4672,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''UPDATE inbox SET folder='trash' WHERE msgid=?''') sqlSubmitQueue.put('''UPDATE inbox SET folder='trash' WHERE msgid=?''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.ui.textEditInboxMessage.setText("") self.ui.textEditInboxMessage.setText("")
self.ui.tableWidgetInbox.removeRow(currentRow) self.ui.tableWidgetInbox.removeRow(currentRow)
@ -4735,6 +4688,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''UPDATE sent SET folder='trash' WHERE ackdata=?''') sqlSubmitQueue.put('''UPDATE sent SET folder='trash' WHERE ackdata=?''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.ui.textEditSentMessage.setPlainText("") self.ui.textEditSentMessage.setPlainText("")
self.ui.tableWidgetSent.removeRow(currentRow) self.ui.tableWidgetSent.removeRow(currentRow)
@ -4757,6 +4711,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''DELETE FROM addressbook WHERE label=? AND address=?''') sqlSubmitQueue.put('''DELETE FROM addressbook WHERE label=? AND address=?''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get() queryreturn = sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.ui.tableWidgetAddressBook.removeRow(currentRow) self.ui.tableWidgetAddressBook.removeRow(currentRow)
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
@ -4792,6 +4747,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''DELETE FROM subscriptions WHERE label=? AND address=?''') sqlSubmitQueue.put('''DELETE FROM subscriptions WHERE label=? AND address=?''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.ui.tableWidgetSubscriptions.removeRow(currentRow) self.ui.tableWidgetSubscriptions.removeRow(currentRow)
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
@ -4822,6 +4778,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''DELETE FROM whitelist WHERE label=? AND address=?''') sqlSubmitQueue.put('''DELETE FROM whitelist WHERE label=? AND address=?''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.ui.tableWidgetBlacklist.removeRow(currentRow) self.ui.tableWidgetBlacklist.removeRow(currentRow)
def on_action_BlacklistClipboard(self): def on_action_BlacklistClipboard(self):
@ -4846,6 +4803,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''UPDATE whitelist SET enabled=1 WHERE address=?''') sqlSubmitQueue.put('''UPDATE whitelist SET enabled=1 WHERE address=?''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
def on_action_BlacklistDisable(self): def on_action_BlacklistDisable(self):
currentRow = self.ui.tableWidgetBlacklist.currentRow() currentRow = self.ui.tableWidgetBlacklist.currentRow()
@ -4862,6 +4820,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''UPDATE whitelist SET enabled=0 WHERE address=?''') sqlSubmitQueue.put('''UPDATE whitelist SET enabled=0 WHERE address=?''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
#Group of functions for the Your Identities dialog box #Group of functions for the Your Identities dialog box
@ -4941,6 +4900,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''UPDATE addressbook set label=? WHERE address=?''') sqlSubmitQueue.put('''UPDATE addressbook set label=? WHERE address=?''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
self.rerenderSentToLabels() self.rerenderSentToLabels()
@ -4954,6 +4914,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('''UPDATE subscriptions set label=? WHERE address=?''') sqlSubmitQueue.put('''UPDATE subscriptions set label=? WHERE address=?''')
sqlSubmitQueue.put(t) sqlSubmitQueue.put(t)
sqlReturnQueue.get() sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
self.rerenderSentToLabels() self.rerenderSentToLabels()
@ -5006,6 +4967,7 @@ sqlSubmitQueue = Queue.Queue() #SQLITE3 is so thread-unsafe that they won't even
sqlReturnQueue = Queue.Queue() sqlReturnQueue = Queue.Queue()
sqlLock = threading.Lock() sqlLock = threading.Lock()
printLock = threading.Lock() printLock = threading.Lock()
knownNodesLock = threading.Lock()
ackdataForWhichImWatching = {} ackdataForWhichImWatching = {}
broadcastSendersForWhichImWatching = {} broadcastSendersForWhichImWatching = {}
statusIconColor = 'red' statusIconColor = 'red'
@ -5115,6 +5077,7 @@ if __name__ == "__main__":
try: try:
#We shouldn't have to use the knownNodesLock because this had better be the only thread accessing knownNodes right now.
pickleFile = open(appdata + 'knownnodes.dat', 'rb') pickleFile = open(appdata + 'knownnodes.dat', 'rb')
knownNodes = pickle.load(pickleFile) knownNodes = pickle.load(pickleFile)
pickleFile.close() pickleFile.close()

View File

@ -9,12 +9,10 @@ from time import strftime, localtime
def createDefaultKnownNodes(appdata): def createDefaultKnownNodes(appdata):
############## Stream 1 ################ ############## Stream 1 ################
stream1 = {} stream1 = {}
stream1['80.69.173.220'] = (443,int(time.time())) stream1['84.48.88.42'] = (8444,int(time.time()))
stream1['109.95.105.15'] = (8443,int(time.time()))
stream1['66.65.120.151'] = (8080,int(time.time())) stream1['66.65.120.151'] = (8080,int(time.time()))
stream1['76.180.233.38'] = (8444,int(time.time())) stream1['76.180.233.38'] = (8444,int(time.time()))
stream1['84.48.88.42'] = (8444,int(time.time()))
stream1['74.132.73.137'] = (8444,int(time.time())) stream1['74.132.73.137'] = (8444,int(time.time()))
stream1['60.242.109.18'] = (8444,int(time.time())) stream1['60.242.109.18'] = (8444,int(time.time()))