Merge pull request #473 from Atheros1/master
refactor inv message processing
This commit is contained in:
commit
06c0da5e72
|
@ -118,6 +118,10 @@ class MyForm(QtGui.QMainWindow):
|
||||||
|
|
||||||
self.ui.labelSendBroadcastWarning.setVisible(False)
|
self.ui.labelSendBroadcastWarning.setVisible(False)
|
||||||
|
|
||||||
|
self.timer = QtCore.QTimer()
|
||||||
|
self.timer.start(2000) # milliseconds
|
||||||
|
QtCore.QObject.connect(self.timer, QtCore.SIGNAL("timeout()"), self.runEveryTwoSeconds)
|
||||||
|
|
||||||
# FILE MENU and other buttons
|
# FILE MENU and other buttons
|
||||||
QtCore.QObject.connect(self.ui.actionExit, QtCore.SIGNAL(
|
QtCore.QObject.connect(self.ui.actionExit, QtCore.SIGNAL(
|
||||||
"triggered()"), self.quit)
|
"triggered()"), self.quit)
|
||||||
|
@ -1292,6 +1296,12 @@ class MyForm(QtGui.QMainWindow):
|
||||||
elif len(shared.connectedHostsList) == 0:
|
elif len(shared.connectedHostsList) == 0:
|
||||||
self.setStatusIcon('red')
|
self.setStatusIcon('red')
|
||||||
|
|
||||||
|
# timer driven
|
||||||
|
def runEveryTwoSeconds(self):
|
||||||
|
self.ui.labelLookupsPerSecond.setText(_translate(
|
||||||
|
"MainWindow", "Inventory lookups per second: %1").arg(str(shared.numberOfInventoryLookupsPerformed/2)))
|
||||||
|
shared.numberOfInventoryLookupsPerformed = 0
|
||||||
|
|
||||||
# Indicates whether or not there is a connection to the Bitmessage network
|
# Indicates whether or not there is a connection to the Bitmessage network
|
||||||
connected = False
|
connected = False
|
||||||
|
|
||||||
|
|
|
@ -2,8 +2,8 @@
|
||||||
|
|
||||||
# Form implementation generated from reading ui file 'bitmessageui.ui'
|
# Form implementation generated from reading ui file 'bitmessageui.ui'
|
||||||
#
|
#
|
||||||
# Created: Thu Aug 15 14:19:52 2013
|
# Created: Tue Sep 03 15:17:26 2013
|
||||||
# by: PyQt4 UI code generator 4.10
|
# by: PyQt4 UI code generator 4.10.2
|
||||||
#
|
#
|
||||||
# WARNING! All changes made in this file will be lost!
|
# WARNING! All changes made in this file will be lost!
|
||||||
|
|
||||||
|
@ -430,13 +430,16 @@ class Ui_MainWindow(object):
|
||||||
self.labelBroadcastCount = QtGui.QLabel(self.networkstatus)
|
self.labelBroadcastCount = QtGui.QLabel(self.networkstatus)
|
||||||
self.labelBroadcastCount.setGeometry(QtCore.QRect(350, 150, 351, 16))
|
self.labelBroadcastCount.setGeometry(QtCore.QRect(350, 150, 351, 16))
|
||||||
self.labelBroadcastCount.setObjectName(_fromUtf8("labelBroadcastCount"))
|
self.labelBroadcastCount.setObjectName(_fromUtf8("labelBroadcastCount"))
|
||||||
|
self.labelLookupsPerSecond = QtGui.QLabel(self.networkstatus)
|
||||||
|
self.labelLookupsPerSecond.setGeometry(QtCore.QRect(320, 210, 291, 16))
|
||||||
|
self.labelLookupsPerSecond.setObjectName(_fromUtf8("labelLookupsPerSecond"))
|
||||||
icon9 = QtGui.QIcon()
|
icon9 = QtGui.QIcon()
|
||||||
icon9.addPixmap(QtGui.QPixmap(_fromUtf8(":/newPrefix/images/networkstatus.png")), QtGui.QIcon.Normal, QtGui.QIcon.Off)
|
icon9.addPixmap(QtGui.QPixmap(_fromUtf8(":/newPrefix/images/networkstatus.png")), QtGui.QIcon.Normal, QtGui.QIcon.Off)
|
||||||
self.tabWidget.addTab(self.networkstatus, icon9, _fromUtf8(""))
|
self.tabWidget.addTab(self.networkstatus, icon9, _fromUtf8(""))
|
||||||
self.gridLayout.addWidget(self.tabWidget, 0, 0, 1, 1)
|
self.gridLayout.addWidget(self.tabWidget, 0, 0, 1, 1)
|
||||||
MainWindow.setCentralWidget(self.centralwidget)
|
MainWindow.setCentralWidget(self.centralwidget)
|
||||||
self.menubar = QtGui.QMenuBar(MainWindow)
|
self.menubar = QtGui.QMenuBar(MainWindow)
|
||||||
self.menubar.setGeometry(QtCore.QRect(0, 0, 795, 23))
|
self.menubar.setGeometry(QtCore.QRect(0, 0, 795, 18))
|
||||||
self.menubar.setObjectName(_fromUtf8("menubar"))
|
self.menubar.setObjectName(_fromUtf8("menubar"))
|
||||||
self.menuFile = QtGui.QMenu(self.menubar)
|
self.menuFile = QtGui.QMenu(self.menubar)
|
||||||
self.menuFile.setObjectName(_fromUtf8("menuFile"))
|
self.menuFile.setObjectName(_fromUtf8("menuFile"))
|
||||||
|
@ -554,8 +557,8 @@ class Ui_MainWindow(object):
|
||||||
self.textEditMessage.setHtml(_translate("MainWindow", "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0//EN\" \"http://www.w3.org/TR/REC-html40/strict.dtd\">\n"
|
self.textEditMessage.setHtml(_translate("MainWindow", "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0//EN\" \"http://www.w3.org/TR/REC-html40/strict.dtd\">\n"
|
||||||
"<html><head><meta name=\"qrichtext\" content=\"1\" /><style type=\"text/css\">\n"
|
"<html><head><meta name=\"qrichtext\" content=\"1\" /><style type=\"text/css\">\n"
|
||||||
"p, li { white-space: pre-wrap; }\n"
|
"p, li { white-space: pre-wrap; }\n"
|
||||||
"</style></head><body style=\" font-family:\'Sans\'; font-size:9pt; font-weight:400; font-style:normal;\">\n"
|
"</style></head><body style=\" font-family:\'MS Shell Dlg 2\'; font-size:9pt; font-weight:400; font-style:normal;\">\n"
|
||||||
"<p style=\"-qt-paragraph-type:empty; margin-top:0px; margin-bottom:0px; margin-left:0px; margin-right:0px; -qt-block-indent:0; text-indent:0px; font-family:\'MS Shell Dlg 2\';\"><br /></p></body></html>", None))
|
"<p style=\"-qt-paragraph-type:empty; margin-top:0px; margin-bottom:0px; margin-left:0px; margin-right:0px; -qt-block-indent:0; text-indent:0px;\"><br /></p></body></html>", None))
|
||||||
self.label.setText(_translate("MainWindow", "To:", None))
|
self.label.setText(_translate("MainWindow", "To:", None))
|
||||||
self.label_2.setText(_translate("MainWindow", "From:", None))
|
self.label_2.setText(_translate("MainWindow", "From:", None))
|
||||||
self.radioButtonBroadcast.setText(_translate("MainWindow", "Broadcast to everyone who is subscribed to your address", None))
|
self.radioButtonBroadcast.setText(_translate("MainWindow", "Broadcast to everyone who is subscribed to your address", None))
|
||||||
|
@ -621,6 +624,7 @@ class Ui_MainWindow(object):
|
||||||
self.labelMessageCount.setText(_translate("MainWindow", "Processed 0 person-to-person message.", None))
|
self.labelMessageCount.setText(_translate("MainWindow", "Processed 0 person-to-person message.", None))
|
||||||
self.labelPubkeyCount.setText(_translate("MainWindow", "Processed 0 public key.", None))
|
self.labelPubkeyCount.setText(_translate("MainWindow", "Processed 0 public key.", None))
|
||||||
self.labelBroadcastCount.setText(_translate("MainWindow", "Processed 0 broadcast.", None))
|
self.labelBroadcastCount.setText(_translate("MainWindow", "Processed 0 broadcast.", None))
|
||||||
|
self.labelLookupsPerSecond.setText(_translate("MainWindow", "Inventory lookups per second: 0", None))
|
||||||
self.tabWidget.setTabText(self.tabWidget.indexOf(self.networkstatus), _translate("MainWindow", "Network Status", None))
|
self.tabWidget.setTabText(self.tabWidget.indexOf(self.networkstatus), _translate("MainWindow", "Network Status", None))
|
||||||
self.menuFile.setTitle(_translate("MainWindow", "File", None))
|
self.menuFile.setTitle(_translate("MainWindow", "File", None))
|
||||||
self.menuSettings.setTitle(_translate("MainWindow", "Settings", None))
|
self.menuSettings.setTitle(_translate("MainWindow", "Settings", None))
|
||||||
|
|
|
@ -22,16 +22,7 @@
|
||||||
</property>
|
</property>
|
||||||
<widget class="QWidget" name="centralwidget">
|
<widget class="QWidget" name="centralwidget">
|
||||||
<layout class="QGridLayout" name="gridLayout">
|
<layout class="QGridLayout" name="gridLayout">
|
||||||
<property name="leftMargin">
|
<property name="margin">
|
||||||
<number>0</number>
|
|
||||||
</property>
|
|
||||||
<property name="topMargin">
|
|
||||||
<number>0</number>
|
|
||||||
</property>
|
|
||||||
<property name="rightMargin">
|
|
||||||
<number>0</number>
|
|
||||||
</property>
|
|
||||||
<property name="bottomMargin">
|
|
||||||
<number>0</number>
|
<number>0</number>
|
||||||
</property>
|
</property>
|
||||||
<item row="0" column="0">
|
<item row="0" column="0">
|
||||||
|
@ -287,8 +278,8 @@
|
||||||
<string><!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">
|
<string><!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">
|
||||||
<html><head><meta name="qrichtext" content="1" /><style type="text/css">
|
<html><head><meta name="qrichtext" content="1" /><style type="text/css">
|
||||||
p, li { white-space: pre-wrap; }
|
p, li { white-space: pre-wrap; }
|
||||||
</style></head><body style=" font-family:'Sans'; font-size:9pt; font-weight:400; font-style:normal;">
|
</style></head><body style=" font-family:'MS Shell Dlg 2'; font-size:9pt; font-weight:400; font-style:normal;">
|
||||||
<p style="-qt-paragraph-type:empty; margin-top:0px; margin-bottom:0px; margin-left:0px; margin-right:0px; -qt-block-indent:0; text-indent:0px; font-family:'MS Shell Dlg 2';"><br /></p></body></html></string>
|
<p style="-qt-paragraph-type:empty; margin-top:0px; margin-bottom:0px; margin-left:0px; margin-right:0px; -qt-block-indent:0; text-indent:0px;"><br /></p></body></html></string>
|
||||||
</property>
|
</property>
|
||||||
</widget>
|
</widget>
|
||||||
</item>
|
</item>
|
||||||
|
@ -1040,6 +1031,19 @@ p, li { white-space: pre-wrap; }
|
||||||
<string>Processed 0 broadcast.</string>
|
<string>Processed 0 broadcast.</string>
|
||||||
</property>
|
</property>
|
||||||
</widget>
|
</widget>
|
||||||
|
<widget class="QLabel" name="labelLookupsPerSecond">
|
||||||
|
<property name="geometry">
|
||||||
|
<rect>
|
||||||
|
<x>320</x>
|
||||||
|
<y>210</y>
|
||||||
|
<width>291</width>
|
||||||
|
<height>16</height>
|
||||||
|
</rect>
|
||||||
|
</property>
|
||||||
|
<property name="text">
|
||||||
|
<string>Inventory lookups per second: 0</string>
|
||||||
|
</property>
|
||||||
|
</widget>
|
||||||
</widget>
|
</widget>
|
||||||
</widget>
|
</widget>
|
||||||
</item>
|
</item>
|
||||||
|
@ -1051,7 +1055,7 @@ p, li { white-space: pre-wrap; }
|
||||||
<x>0</x>
|
<x>0</x>
|
||||||
<y>0</y>
|
<y>0</y>
|
||||||
<width>795</width>
|
<width>795</width>
|
||||||
<height>23</height>
|
<height>18</height>
|
||||||
</rect>
|
</rect>
|
||||||
</property>
|
</property>
|
||||||
<widget class="QMenu" name="menuFile">
|
<widget class="QMenu" name="menuFile">
|
||||||
|
|
|
@ -21,7 +21,8 @@ import helper_inbox
|
||||||
import helper_sent
|
import helper_sent
|
||||||
from helper_sql import *
|
from helper_sql import *
|
||||||
import tr
|
import tr
|
||||||
#from bitmessagemain import shared.lengthOfTimeToLeaveObjectsInInventory, shared.lengthOfTimeToHoldOnToAllPubkeys, shared.maximumAgeOfAnObjectThatIAmWillingToAccept, shared.maximumAgeOfObjectsThatIAdvertiseToOthers, shared.maximumAgeOfNodesThatIAdvertiseToOthers, shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer, shared.neededPubkeys
|
from debug import logger
|
||||||
|
#from bitmessagemain import shared.lengthOfTimeToLeaveObjectsInInventory, shared.lengthOfTimeToHoldOnToAllPubkeys, shared.maximumAgeOfAnObjectThatIAmWillingToAccept, shared.maximumAgeOfObjectsThatIAdvertiseToOthers, shared.maximumAgeOfNodesThatIAdvertiseToOthers, shared.numberOfObjectsThatWeHaveYetToGetPerPeer, shared.neededPubkeys
|
||||||
|
|
||||||
# This thread is created either by the synSenderThread(for outgoing
|
# This thread is created either by the synSenderThread(for outgoing
|
||||||
# connections) or the singleListenerThread(for incoming connectiosn).
|
# connections) or the singleListenerThread(for incoming connectiosn).
|
||||||
|
@ -46,7 +47,7 @@ class receiveDataThread(threading.Thread):
|
||||||
self.peer = shared.Peer(HOST, port)
|
self.peer = shared.Peer(HOST, port)
|
||||||
self.streamNumber = streamNumber
|
self.streamNumber = streamNumber
|
||||||
self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header
|
self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header
|
||||||
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {}
|
self.objectsThatWeHaveYetToGetFromThisPeer = {}
|
||||||
self.selfInitiatedConnections = selfInitiatedConnections
|
self.selfInitiatedConnections = selfInitiatedConnections
|
||||||
shared.connectedHostsList[
|
shared.connectedHostsList[
|
||||||
self.peer.host] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
self.peer.host] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
||||||
|
@ -101,7 +102,7 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'Could not delete', self.peer.host, 'from shared.connectedHostsList.', err
|
print 'Could not delete', self.peer.host, 'from shared.connectedHostsList.', err
|
||||||
|
|
||||||
try:
|
try:
|
||||||
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
||||||
self.peer]
|
self.peer]
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
@ -172,52 +173,54 @@ class receiveDataThread(threading.Thread):
|
||||||
self.data = self.data[
|
self.data = self.data[
|
||||||
self.payloadLength + 24:] # take this message out and then process the next message
|
self.payloadLength + 24:] # take this message out and then process the next message
|
||||||
if self.data == '':
|
if self.data == '':
|
||||||
while len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 0:
|
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
||||||
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
random.seed()
|
random.seed()
|
||||||
objectHash, = random.sample(
|
objectHash, = random.sample(
|
||||||
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 1)
|
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
||||||
if objectHash in shared.inventory:
|
if objectHash in shared.inventory:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Inventory (in memory) already has object listed in inv message.'
|
print 'Inventory (in memory) already has object listed in inv message.'
|
||||||
|
|
||||||
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||||
objectHash]
|
objectHash]
|
||||||
elif shared.isInSqlInventory(objectHash):
|
elif shared.isInSqlInventory(objectHash):
|
||||||
if shared.verbose >= 3:
|
if shared.verbose >= 3:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Inventory (SQL on disk) already has object listed in inv message.'
|
print 'Inventory (SQL on disk) already has object listed in inv message.'
|
||||||
|
|
||||||
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||||
objectHash]
|
objectHash]
|
||||||
else:
|
else:
|
||||||
self.sendgetdata(objectHash)
|
self.sendgetdata(objectHash)
|
||||||
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||||
objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway.
|
objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway.
|
||||||
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
|
print 'concerning', self.peer.host, ', len(self.objectsThatWeHaveYetToGetFromThisPeer) is', len(self.objectsThatWeHaveYetToGetFromThisPeer)
|
||||||
|
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now', len(self.objectsThatWeHaveYetToGetFromThisPeer)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
||||||
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
break
|
break
|
||||||
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
|
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now', len(self.objectsThatWeHaveYetToGetFromThisPeer)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
||||||
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 0:
|
if len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now', len(self.objectsThatWeHaveYetToGetFromThisPeer)
|
||||||
|
|
||||||
shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.peer] = len(
|
shared.numberOfObjectsThatWeHaveYetToGetPerPeer[self.peer] = len(
|
||||||
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.objectsThatWeHaveYetToGetFromThisPeer) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
if len(self.ackDataThatWeHaveYetToSend) > 0:
|
if len(self.ackDataThatWeHaveYetToSend) > 0:
|
||||||
self.data = self.ackDataThatWeHaveYetToSend.pop()
|
self.data = self.ackDataThatWeHaveYetToSend.pop()
|
||||||
self.processData()
|
self.processData()
|
||||||
|
@ -375,6 +378,7 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'The stream number encoded in this broadcast message (' + str(streamNumber) + ') does not match the stream number on which it was received. Ignoring it.'
|
print 'The stream number encoded in this broadcast message (' + str(streamNumber) + ') does not match the stream number on which it was received. Ignoring it.'
|
||||||
return
|
return
|
||||||
|
|
||||||
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
shared.inventoryLock.acquire()
|
shared.inventoryLock.acquire()
|
||||||
self.inventoryHash = calculateInventoryHash(data)
|
self.inventoryHash = calculateInventoryHash(data)
|
||||||
if self.inventoryHash in shared.inventory:
|
if self.inventoryHash in shared.inventory:
|
||||||
|
@ -738,6 +742,7 @@ class receiveDataThread(threading.Thread):
|
||||||
return
|
return
|
||||||
readPosition += streamNumberAsClaimedByMsgLength
|
readPosition += streamNumberAsClaimedByMsgLength
|
||||||
self.inventoryHash = calculateInventoryHash(data)
|
self.inventoryHash = calculateInventoryHash(data)
|
||||||
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
shared.inventoryLock.acquire()
|
shared.inventoryLock.acquire()
|
||||||
if self.inventoryHash in shared.inventory:
|
if self.inventoryHash in shared.inventory:
|
||||||
print 'We have already received this msg message. Ignoring.'
|
print 'We have already received this msg message. Ignoring.'
|
||||||
|
@ -1135,6 +1140,7 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'stream number embedded in this pubkey doesn\'t match our stream number. Ignoring.'
|
print 'stream number embedded in this pubkey doesn\'t match our stream number. Ignoring.'
|
||||||
return
|
return
|
||||||
|
|
||||||
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
inventoryHash = calculateInventoryHash(data)
|
inventoryHash = calculateInventoryHash(data)
|
||||||
shared.inventoryLock.acquire()
|
shared.inventoryLock.acquire()
|
||||||
if inventoryHash in shared.inventory:
|
if inventoryHash in shared.inventory:
|
||||||
|
@ -1328,6 +1334,7 @@ class receiveDataThread(threading.Thread):
|
||||||
return
|
return
|
||||||
readPosition += streamNumberLength
|
readPosition += streamNumberLength
|
||||||
|
|
||||||
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
inventoryHash = calculateInventoryHash(data)
|
inventoryHash = calculateInventoryHash(data)
|
||||||
shared.inventoryLock.acquire()
|
shared.inventoryLock.acquire()
|
||||||
if inventoryHash in shared.inventory:
|
if inventoryHash in shared.inventory:
|
||||||
|
@ -1400,13 +1407,13 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
# We have received an inv message
|
# We have received an inv message
|
||||||
def recinv(self, data):
|
def recinv(self, data):
|
||||||
totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = 0 # ..from all peers, counting duplicates seperately (because they take up memory)
|
totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = 0 # this counts duplicates seperately because they take up memory
|
||||||
if len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0:
|
if len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer) > 0:
|
||||||
for key, value in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items():
|
for key, value in shared.numberOfObjectsThatWeHaveYetToGetPerPeer.items():
|
||||||
totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave += value
|
totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers += value
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer)
|
print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToGetPerPeer:', len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer)
|
||||||
print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave
|
print 'totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = ', totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers
|
||||||
|
|
||||||
numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10])
|
numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10])
|
||||||
if numberOfItemsInInv > 50000:
|
if numberOfItemsInInv > 50000:
|
||||||
|
@ -1416,36 +1423,49 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'inv message doesn\'t contain enough data. Ignoring.'
|
print 'inv message doesn\'t contain enough data. Ignoring.'
|
||||||
return
|
return
|
||||||
if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object.
|
if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object.
|
||||||
if totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave > 200000 and len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 1000: # inv flooding attack mitigation
|
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.'
|
print 'We already have', totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.'
|
||||||
|
|
||||||
return
|
return
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
|
||||||
data[lengthOfVarint:32 + lengthOfVarint]] = 0
|
data[lengthOfVarint:32 + lengthOfVarint]] = 0
|
||||||
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
|
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Inventory (in memory) has inventory item already.'
|
print 'Inventory (in memory) has inventory item already.'
|
||||||
|
|
||||||
elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
|
elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
|
||||||
print 'Inventory (SQL on disk) has inventory item already.'
|
print 'Inventory (SQL on disk) has inventory item already.'
|
||||||
else:
|
else:
|
||||||
self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint])
|
self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint])
|
||||||
else:
|
else:
|
||||||
print 'inv message lists', numberOfItemsInInv, 'objects.'
|
# There are many items listed in this inv message. Let us create a
|
||||||
for i in range(numberOfItemsInInv): # upon finishing dealing with an incoming message, the receiveDataThread will request a random object from the peer. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
|
# 'set' of objects we are aware of and a set of objects in this inv
|
||||||
if len(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) == 32: # The length of an inventory hash should be 32. If it isn't 32 then the remote node is either badly programmed or behaving nefariously.
|
# message so that we can diff one from the other cheaply.
|
||||||
if totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave > 200000 and len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 1000: # inv flooding attack mitigation
|
startTime = time.time()
|
||||||
|
currentInventoryList = set()
|
||||||
|
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''',
|
||||||
|
self.streamNumber)
|
||||||
|
for row in queryData:
|
||||||
|
currentInventoryList.add(row[0])
|
||||||
|
with shared.inventoryLock:
|
||||||
|
for objectHash, value in shared.inventory.items():
|
||||||
|
currentInventoryList.add(objectHash)
|
||||||
|
advertisedSet = set()
|
||||||
|
for i in range(numberOfItemsInInv):
|
||||||
|
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
|
||||||
|
objectsNewToMe = advertisedSet - currentInventoryList
|
||||||
|
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
|
||||||
|
for item in objectsNewToMe:
|
||||||
|
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave), 'from this node in particular. Ignoring the rest of this inv message.'
|
print 'We already have', totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToGetFromThisPeer), 'from this node in particular. Ignoring the rest of this inv message.'
|
||||||
|
|
||||||
break
|
break
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[data[
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein
|
||||||
lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0
|
self.objectsThatWeHaveYetToGetFromThisPeer[item] = 0 # upon finishing dealing with an incoming message, the receiveDataThread will request a random object of from peer out of this data structure. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
|
||||||
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
if len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
||||||
data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0
|
shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
||||||
shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
self.peer] = len(self.objectsThatWeHaveYetToGetFromThisPeer)
|
||||||
self.peer] = len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
|
||||||
|
|
||||||
# Send a getdata message to our peer to request the object with the given
|
# Send a getdata message to our peer to request the object with the given
|
||||||
# hash
|
# hash
|
||||||
|
@ -1480,7 +1500,7 @@ class receiveDataThread(threading.Thread):
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'received getdata request for item:', hash.encode('hex')
|
print 'received getdata request for item:', hash.encode('hex')
|
||||||
|
|
||||||
# print 'inventory is', shared.inventory
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
if hash in shared.inventory:
|
if hash in shared.inventory:
|
||||||
objectType, streamNumber, payload, receivedTime = shared.inventory[
|
objectType, streamNumber, payload, receivedTime = shared.inventory[
|
||||||
hash]
|
hash]
|
||||||
|
|
|
@ -53,7 +53,7 @@ alreadyAttemptedConnectionsList = {
|
||||||
alreadyAttemptedConnectionsListLock = threading.Lock()
|
alreadyAttemptedConnectionsListLock = threading.Lock()
|
||||||
alreadyAttemptedConnectionsListResetTime = int(
|
alreadyAttemptedConnectionsListResetTime = int(
|
||||||
time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect.
|
time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect.
|
||||||
numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer = {}
|
numberOfObjectsThatWeHaveYetToGetPerPeer = {}
|
||||||
neededPubkeys = {}
|
neededPubkeys = {}
|
||||||
eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack(
|
eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack(
|
||||||
'>Q', random.randrange(1, 18446744073709551615))
|
'>Q', random.randrange(1, 18446744073709551615))
|
||||||
|
@ -66,6 +66,7 @@ clientHasReceivedIncomingConnections = False #used by API command clientStatus
|
||||||
numberOfMessagesProcessed = 0
|
numberOfMessagesProcessed = 0
|
||||||
numberOfBroadcastsProcessed = 0
|
numberOfBroadcastsProcessed = 0
|
||||||
numberOfPubkeysProcessed = 0
|
numberOfPubkeysProcessed = 0
|
||||||
|
numberOfInventoryLookupsPerformed = 0
|
||||||
|
|
||||||
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
|
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
|
||||||
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
|
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
|
||||||
|
|
Loading…
Reference in New Issue
Block a user