smarter commits Vastly improves disk performance. Also added knownNodesLock #107

Merged
Atheros1 merged 2 commits from master into master 2013-04-04 20:56:17 +02:00
Showing only changes of commit c7efc06e83 - Show all commits

View File

@ -148,7 +148,9 @@ class outgoingSynSender(QThread):
printLock.release() printLock.release()
PORT, timeLastSeen = knownNodes[self.streamNumber][HOST] PORT, timeLastSeen = knownNodes[self.streamNumber][HOST]
if (int(time.time())-timeLastSeen) > 172800 and len(knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure. if (int(time.time())-timeLastSeen) > 172800 and len(knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
knownNodesLock.acquire()
del knownNodes[self.streamNumber][HOST] del knownNodes[self.streamNumber][HOST]
knownNodesLock.release()
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.' print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
except socks.Socks5AuthError, err: except socks.Socks5AuthError, err:
self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"SOCKS5 Authentication problem: "+str(err)) self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"SOCKS5 Authentication problem: "+str(err))
@ -169,7 +171,9 @@ class outgoingSynSender(QThread):
printLock.release() printLock.release()
PORT, timeLastSeen = knownNodes[self.streamNumber][HOST] PORT, timeLastSeen = knownNodes[self.streamNumber][HOST]
if (int(time.time())-timeLastSeen) > 172800 and len(knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure. if (int(time.time())-timeLastSeen) > 172800 and len(knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
knownNodesLock.acquire()
del knownNodes[self.streamNumber][HOST] del knownNodes[self.streamNumber][HOST]
knownNodesLock.release()
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.' print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
except Exception, err: except Exception, err:
print 'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:', err print 'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:', err
@ -323,7 +327,9 @@ class receiveDataThread(QThread):
#print 'message checksum is correct' #print 'message checksum is correct'
#The time we've last seen this node is obviously right now since we just received valid data from it. So update the knownNodes list so that other peers can be made aware of its existance. #The time we've last seen this node is obviously right now since we just received valid data from it. So update the knownNodes list so that other peers can be made aware of its existance.
if self.initiatedConnection: #The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port). if self.initiatedConnection: #The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
knownNodesLock.acquire()
knownNodes[self.streamNumber][self.HOST] = (self.PORT,int(time.time())) knownNodes[self.streamNumber][self.HOST] = (self.PORT,int(time.time()))
knownNodesLock.release()
if self.payloadLength <= 180000000: #If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.) if self.payloadLength <= 180000000: #If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.)
remoteCommand = self.data[4:16] remoteCommand = self.data[4:16]
printLock.acquire() printLock.acquire()
@ -1402,10 +1408,14 @@ class receiveDataThread(QThread):
continue continue
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',data[lengthOfNumberOfAddresses+(34*i):4+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message. timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',data[lengthOfNumberOfAddresses+(34*i):4+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message.
if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it. if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
knownNodesLock.acquire()
knownNodes[recaddrStream] = {} knownNodes[recaddrStream] = {}
knownNodesLock.release()
if hostFromAddrMessage not in knownNodes[recaddrStream]: if hostFromAddrMessage not in knownNodes[recaddrStream]:
if len(knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time())-10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): #If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. if len(knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time())-10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): #If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
knownNodesLock.acquire()
knownNodes[recaddrStream][hostFromAddrMessage] = (recaddrPort, timeSomeoneElseReceivedMessageFromThisNode) knownNodes[recaddrStream][hostFromAddrMessage] = (recaddrPort, timeSomeoneElseReceivedMessageFromThisNode)
knownNodesLock.release()
print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream
needToWriteKnownNodesToDisk = True needToWriteKnownNodesToDisk = True
hostDetails = (timeSomeoneElseReceivedMessageFromThisNode, recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort) hostDetails = (timeSomeoneElseReceivedMessageFromThisNode, recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
@ -1413,12 +1423,16 @@ class receiveDataThread(QThread):
else: else:
PORT, timeLastReceivedMessageFromThisNode = knownNodes[recaddrStream][hostFromAddrMessage]#PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. PORT, timeLastReceivedMessageFromThisNode = knownNodes[recaddrStream][hostFromAddrMessage]#PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())): if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
knownNodesLock.acquire()
knownNodes[recaddrStream][hostFromAddrMessage] = (PORT, timeSomeoneElseReceivedMessageFromThisNode) knownNodes[recaddrStream][hostFromAddrMessage] = (PORT, timeSomeoneElseReceivedMessageFromThisNode)
knownNodesLock.release()
if PORT != recaddrPort: if PORT != recaddrPort:
print 'Strange occurance: The port specified in an addr message', str(recaddrPort),'does not match the port',str(PORT),'that this program (or some other peer) used to connect to it',str(hostFromAddrMessage),'. Perhaps they changed their port or are using a strange NAT configuration.' print 'Strange occurance: The port specified in an addr message', str(recaddrPort),'does not match the port',str(PORT),'that this program (or some other peer) used to connect to it',str(hostFromAddrMessage),'. Perhaps they changed their port or are using a strange NAT configuration.'
if needToWriteKnownNodesToDisk: #Runs if any nodes were new to us. Also, share those nodes with our peers. if needToWriteKnownNodesToDisk: #Runs if any nodes were new to us. Also, share those nodes with our peers.
output = open(appdata + 'knownnodes.dat', 'wb') output = open(appdata + 'knownnodes.dat', 'wb')
knownNodesLock.acquire()
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
knownNodesLock.release()
output.close() output.close()
self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers) self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
printLock.acquire() printLock.acquire()
@ -1560,9 +1574,11 @@ class receiveDataThread(QThread):
printLock.release() printLock.release()
return return
knownNodesLock.acquire()
knownNodes[self.streamNumber][self.HOST] = (self.remoteNodeIncomingPort, int(time.time())) knownNodes[self.streamNumber][self.HOST] = (self.remoteNodeIncomingPort, int(time.time()))
output = open(appdata + 'knownnodes.dat', 'wb') output = open(appdata + 'knownnodes.dat', 'wb')
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
knownNodesLock.release()
output.close() output.close()
self.sendverack() self.sendverack()
@ -4395,9 +4411,11 @@ class MyForm(QtGui.QMainWindow):
with open('keys.dat', 'wb') as configfile: with open('keys.dat', 'wb') as configfile:
config.write(configfile) config.write(configfile)
#Write the knownnodes.dat file to disk in the new location #Write the knownnodes.dat file to disk in the new location
knownNodesLock.acquire()
output = open('knownnodes.dat', 'wb') output = open('knownnodes.dat', 'wb')
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
output.close() output.close()
knownNodesLock.release()
os.remove(appdata + 'keys.dat') os.remove(appdata + 'keys.dat')
os.remove(appdata + 'knownnodes.dat') os.remove(appdata + 'knownnodes.dat')
appdata = '' appdata = ''
@ -4412,9 +4430,11 @@ class MyForm(QtGui.QMainWindow):
with open(appdata + 'keys.dat', 'wb') as configfile: with open(appdata + 'keys.dat', 'wb') as configfile:
config.write(configfile) config.write(configfile)
#Write the knownnodes.dat file to disk in the new location #Write the knownnodes.dat file to disk in the new location
knownNodesLock.acquire()
output = open(appdata + 'knownnodes.dat', 'wb') output = open(appdata + 'knownnodes.dat', 'wb')
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
output.close() output.close()
knownNodesLock.release()
os.remove('keys.dat') os.remove('keys.dat')
os.remove('knownnodes.dat') os.remove('knownnodes.dat')
QMessageBox.about(self, "Restart", "Bitmessage has moved most of your config files to the application data directory but you must restart Bitmessage to move the last file (the file which holds messages).") QMessageBox.about(self, "Restart", "Bitmessage has moved most of your config files to the application data directory but you must restart Bitmessage to move the last file (the file which holds messages).")
@ -4548,7 +4568,7 @@ class MyForm(QtGui.QMainWindow):
printLock.acquire() printLock.acquire()
print 'Closing. Flushing inventory in memory out to disk...' print 'Closing. Flushing inventory in memory out to disk...'
printLock.release() printLock.release()
self.statusBar().showMessage('Flushing inventory in memory out to disk. This may take several minutes...') self.statusBar().showMessage('Flushing inventory in memory out to disk. This should normally only take a second...')
flushInventory() flushInventory()
#This one last useless query will guarantee that the previous query committed before we close the program. #This one last useless query will guarantee that the previous query committed before we close the program.
@ -4559,9 +4579,11 @@ class MyForm(QtGui.QMainWindow):
sqlLock.release() sqlLock.release()
self.statusBar().showMessage('Saving the knownNodes list of peers to disk...') self.statusBar().showMessage('Saving the knownNodes list of peers to disk...')
knownNodesLock.acquire()
output = open(appdata + 'knownnodes.dat', 'wb') output = open(appdata + 'knownnodes.dat', 'wb')
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
output.close() output.close()
knownNodesLock.release()
self.trayIcon.hide() self.trayIcon.hide()
printLock.acquire() printLock.acquire()
@ -4945,6 +4967,7 @@ sqlSubmitQueue = Queue.Queue() #SQLITE3 is so thread-unsafe that they won't even
sqlReturnQueue = Queue.Queue() sqlReturnQueue = Queue.Queue()
sqlLock = threading.Lock() sqlLock = threading.Lock()
printLock = threading.Lock() printLock = threading.Lock()
knownNodesLock = threading.Lock()
ackdataForWhichImWatching = {} ackdataForWhichImWatching = {}
broadcastSendersForWhichImWatching = {} broadcastSendersForWhichImWatching = {}
statusIconColor = 'red' statusIconColor = 'red'
@ -5054,6 +5077,7 @@ if __name__ == "__main__":
try: try:
#We shouldn't have to use the knownNodesLock because this had better be the only thread accessing knownNodes right now.
pickleFile = open(appdata + 'knownnodes.dat', 'rb') pickleFile = open(appdata + 'knownnodes.dat', 'rb')
knownNodes = pickle.load(pickleFile) knownNodes = pickle.load(pickleFile)
pickleFile.close() pickleFile.close()