Merge pull request #128 from Atheros1/master
preparation for switch from 32 bit time to 64 bit time in protocol
This commit is contained in:
commit
4d7e7d350f
|
@ -181,7 +181,7 @@ class outgoingSynSender(QThread):
|
||||||
knownNodesLock.release()
|
knownNodesLock.release()
|
||||||
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
|
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
print 'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:', err
|
sys.stderr.write('An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: %s\n' % err)
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
#Only one singleListener thread will ever exist. It creates the receiveDataThread and sendDataThread for each incoming connection. Note that it cannot set the stream number because it is not known yet- the other node will have to tell us its stream number in a version message. If we don't care about their stream, we will close the connection (within the recversion function of the recieveData thread)
|
#Only one singleListener thread will ever exist. It creates the receiveDataThread and sendDataThread for each incoming connection. Note that it cannot set the stream number because it is not known yet- the other node will have to tell us its stream number in a version message. If we don't care about their stream, we will close the connection (within the recversion function of the recieveData thread)
|
||||||
|
@ -439,7 +439,7 @@ class receiveDataThread(QThread):
|
||||||
printLock.release()
|
printLock.release()
|
||||||
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.HOST, remoteNodeIncomingPort)]) #This lets all of our peers know about this new node.
|
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.HOST, remoteNodeIncomingPort)]) #This lets all of our peers know about this new node.
|
||||||
self.sendaddr() #This is one large addr message to this one peer.
|
self.sendaddr() #This is one large addr message to this one peer.
|
||||||
if connectionsCount[self.streamNumber] > 150:
|
if not self.initiatedConnection and connectionsCount[self.streamNumber] > 150:
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print 'We are connected to too many people. Closing connection.'
|
print 'We are connected to too many people. Closing connection.'
|
||||||
printLock.release()
|
printLock.release()
|
||||||
|
@ -511,7 +511,16 @@ class receiveDataThread(QThread):
|
||||||
if not self.isProofOfWorkSufficient(data):
|
if not self.isProofOfWorkSufficient(data):
|
||||||
print 'Proof of work in broadcast message insufficient.'
|
print 'Proof of work in broadcast message insufficient.'
|
||||||
return
|
return
|
||||||
embeddedTime, = unpack('>I',data[8:12])
|
readPosition = 8 #bypass the nonce
|
||||||
|
embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
|
||||||
|
|
||||||
|
#This section is used for the transition from 32 bit time to 64 bit time in the protocol.
|
||||||
|
if embeddedTime == 0:
|
||||||
|
embeddedTime, = unpack('>Q',data[readPosition:readPosition+8])
|
||||||
|
readPosition += 8
|
||||||
|
else:
|
||||||
|
readPosition += 4
|
||||||
|
|
||||||
if embeddedTime > (int(time.time())+10800): #prevent funny business
|
if embeddedTime > (int(time.time())+10800): #prevent funny business
|
||||||
print 'The embedded time in this broadcast message is more than three hours in the future. That doesn\'t make sense. Ignoring message.'
|
print 'The embedded time in this broadcast message is more than three hours in the future. That doesn\'t make sense. Ignoring message.'
|
||||||
return
|
return
|
||||||
|
@ -539,7 +548,7 @@ class receiveDataThread(QThread):
|
||||||
self.emit(SIGNAL("incrementNumberOfBroadcastsProcessed()"))
|
self.emit(SIGNAL("incrementNumberOfBroadcastsProcessed()"))
|
||||||
|
|
||||||
|
|
||||||
self.processbroadcast(data)#When this function returns, we will have either successfully processed this broadcast because we are interested in it, ignored it because we aren't interested in it, or found problem with the broadcast that warranted ignoring it.
|
self.processbroadcast(readPosition,data)#When this function returns, we will have either successfully processed this broadcast because we are interested in it, ignored it because we aren't interested in it, or found problem with the broadcast that warranted ignoring it.
|
||||||
|
|
||||||
# Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. If we haven't used the specified amount of time, we shall sleep. These values are mostly the same values used for msg messages although broadcast messages are processed faster.
|
# Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. If we haven't used the specified amount of time, we shall sleep. These values are mostly the same values used for msg messages although broadcast messages are processed faster.
|
||||||
if len(data) > 100000000: #Size is greater than 100 megabytes
|
if len(data) > 100000000: #Size is greater than 100 megabytes
|
||||||
|
@ -563,8 +572,7 @@ class receiveDataThread(QThread):
|
||||||
printLock.release()
|
printLock.release()
|
||||||
|
|
||||||
#A broadcast message has a valid time and POW and requires processing. The recbroadcast function calls this one.
|
#A broadcast message has a valid time and POW and requires processing. The recbroadcast function calls this one.
|
||||||
def processbroadcast(self,data):
|
def processbroadcast(self,readPosition,data):
|
||||||
readPosition = 12
|
|
||||||
broadcastVersion, broadcastVersionLength = decodeVarint(data[readPosition:readPosition+9])
|
broadcastVersion, broadcastVersionLength = decodeVarint(data[readPosition:readPosition+9])
|
||||||
if broadcastVersion <> 1:
|
if broadcastVersion <> 1:
|
||||||
#Cannot decode incoming broadcast versions higher than 1. Assuming the sender isn\' being silly, you should upgrade Bitmessage because this message shall be ignored.
|
#Cannot decode incoming broadcast versions higher than 1. Assuming the sender isn\' being silly, you should upgrade Bitmessage because this message shall be ignored.
|
||||||
|
@ -693,13 +701,20 @@ class receiveDataThread(QThread):
|
||||||
|
|
||||||
readPosition = 8
|
readPosition = 8
|
||||||
embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
|
embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
|
||||||
|
|
||||||
|
#This section is used for the transition from 32 bit time to 64 bit time in the protocol.
|
||||||
|
if embeddedTime == 0:
|
||||||
|
embeddedTime, = unpack('>Q',data[readPosition:readPosition+8])
|
||||||
|
readPosition += 8
|
||||||
|
else:
|
||||||
|
readPosition += 4
|
||||||
|
|
||||||
if embeddedTime > int(time.time())+10800:
|
if embeddedTime > int(time.time())+10800:
|
||||||
print 'The time in the msg message is too new. Ignoring it. Time:', embeddedTime
|
print 'The time in the msg message is too new. Ignoring it. Time:', embeddedTime
|
||||||
return
|
return
|
||||||
if embeddedTime < int(time.time())-maximumAgeOfAnObjectThatIAmWillingToAccept:
|
if embeddedTime < int(time.time())-maximumAgeOfAnObjectThatIAmWillingToAccept:
|
||||||
print 'The time in the msg message is too old. Ignoring it. Time:', embeddedTime
|
print 'The time in the msg message is too old. Ignoring it. Time:', embeddedTime
|
||||||
return
|
return
|
||||||
readPosition += 4
|
|
||||||
streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint(data[readPosition:readPosition+9])
|
streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint(data[readPosition:readPosition+9])
|
||||||
if streamNumberAsClaimedByMsg != self.streamNumber:
|
if streamNumberAsClaimedByMsg != self.streamNumber:
|
||||||
print 'The stream number encoded in this msg (' + str(streamNumberAsClaimedByMsg) + ') message does not match the stream number on which it was received. Ignoring it.'
|
print 'The stream number encoded in this msg (' + str(streamNumberAsClaimedByMsg) + ') message does not match the stream number on which it was received. Ignoring it.'
|
||||||
|
@ -1024,6 +1039,14 @@ class receiveDataThread(QThread):
|
||||||
|
|
||||||
readPosition = 8 #for the nonce
|
readPosition = 8 #for the nonce
|
||||||
embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
|
embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
|
||||||
|
|
||||||
|
#This section is used for the transition from 32 bit time to 64 bit time in the protocol.
|
||||||
|
if embeddedTime == 0:
|
||||||
|
embeddedTime, = unpack('>Q',data[readPosition:readPosition+8])
|
||||||
|
readPosition += 8
|
||||||
|
else:
|
||||||
|
readPosition += 4
|
||||||
|
|
||||||
if embeddedTime < int(time.time())-lengthOfTimeToHoldOnToAllPubkeys:
|
if embeddedTime < int(time.time())-lengthOfTimeToHoldOnToAllPubkeys:
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime
|
print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime
|
||||||
|
@ -1034,7 +1057,6 @@ class receiveDataThread(QThread):
|
||||||
print 'The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.'
|
print 'The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.'
|
||||||
printLock.release()
|
printLock.release()
|
||||||
return
|
return
|
||||||
readPosition += 4 #for the time
|
|
||||||
addressVersion, varintLength = decodeVarint(data[readPosition:readPosition+10])
|
addressVersion, varintLength = decodeVarint(data[readPosition:readPosition+10])
|
||||||
readPosition += varintLength
|
readPosition += varintLength
|
||||||
streamNumber, varintLength = decodeVarint(data[readPosition:readPosition+10])
|
streamNumber, varintLength = decodeVarint(data[readPosition:readPosition+10])
|
||||||
|
@ -1154,19 +1176,29 @@ class receiveDataThread(QThread):
|
||||||
if len(data) < 34:
|
if len(data) < 34:
|
||||||
print 'getpubkey message doesn\'t contain enough data. Ignoring.'
|
print 'getpubkey message doesn\'t contain enough data. Ignoring.'
|
||||||
return
|
return
|
||||||
embeddedTime, = unpack('>I',data[8:12])
|
readPosition = 8 #bypass the nonce
|
||||||
|
embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
|
||||||
|
|
||||||
|
#This section is used for the transition from 32 bit time to 64 bit time in the protocol.
|
||||||
|
if embeddedTime == 0:
|
||||||
|
embeddedTime, = unpack('>Q',data[readPosition:readPosition+8])
|
||||||
|
readPosition += 8
|
||||||
|
else:
|
||||||
|
readPosition += 4
|
||||||
|
|
||||||
if embeddedTime > int(time.time())+10800:
|
if embeddedTime > int(time.time())+10800:
|
||||||
print 'The time in this getpubkey message is too new. Ignoring it. Time:', embeddedTime
|
print 'The time in this getpubkey message is too new. Ignoring it. Time:', embeddedTime
|
||||||
return
|
return
|
||||||
if embeddedTime < int(time.time())-maximumAgeOfAnObjectThatIAmWillingToAccept:
|
if embeddedTime < int(time.time())-maximumAgeOfAnObjectThatIAmWillingToAccept:
|
||||||
print 'The time in this getpubkey message is too old. Ignoring it. Time:', embeddedTime
|
print 'The time in this getpubkey message is too old. Ignoring it. Time:', embeddedTime
|
||||||
return
|
return
|
||||||
|
addressVersionNumber, addressVersionLength = decodeVarint(data[readPosition:readPosition+10])
|
||||||
addressVersionNumber, addressVersionLength = decodeVarint(data[12:22])
|
readPosition += addressVersionLength
|
||||||
streamNumber, streamNumberLength = decodeVarint(data[12+addressVersionLength:22+addressVersionLength])
|
streamNumber, streamNumberLength = decodeVarint(data[readPosition:readPosition+10])
|
||||||
if streamNumber <> self.streamNumber:
|
if streamNumber <> self.streamNumber:
|
||||||
print 'The streamNumber', streamNumber, 'doesn\'t match our stream number:', self.streamNumber
|
print 'The streamNumber', streamNumber, 'doesn\'t match our stream number:', self.streamNumber
|
||||||
return
|
return
|
||||||
|
readPosition += streamNumberLength
|
||||||
|
|
||||||
inventoryHash = calculateInventoryHash(data)
|
inventoryHash = calculateInventoryHash(data)
|
||||||
inventoryLock.acquire()
|
inventoryLock.acquire()
|
||||||
|
@ -1195,7 +1227,7 @@ class receiveDataThread(QThread):
|
||||||
print 'The addressVersionNumber of the pubkey request is too high. Can\'t understand. Ignoring it.'
|
print 'The addressVersionNumber of the pubkey request is too high. Can\'t understand. Ignoring it.'
|
||||||
return
|
return
|
||||||
|
|
||||||
requestedHash = data[12+addressVersionLength+streamNumberLength:32+addressVersionLength+streamNumberLength]
|
requestedHash = data[readPosition:readPosition+20]
|
||||||
if len(requestedHash) != 20:
|
if len(requestedHash) != 20:
|
||||||
print 'The length of the requested hash is not 20 bytes. Something is wrong. Ignoring.'
|
print 'The length of the requested hash is not 20 bytes. Something is wrong. Ignoring.'
|
||||||
return
|
return
|
||||||
|
@ -1349,12 +1381,12 @@ class receiveDataThread(QThread):
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
|
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
|
||||||
printLock.release()
|
printLock.release()
|
||||||
#print 'lengthOfNumberOfAddresses', lengthOfNumberOfAddresses
|
|
||||||
|
|
||||||
|
if self.remoteProtocolVersion == 1:
|
||||||
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
||||||
return
|
return
|
||||||
if len(data) < lengthOfNumberOfAddresses + (34 * numberOfAddressesIncluded):
|
if len(data) != lengthOfNumberOfAddresses + (34 * numberOfAddressesIncluded):
|
||||||
print 'addr message does not contain enough data. Ignoring.'
|
print 'addr message does not contain the correct amount of data. Ignoring.'
|
||||||
return
|
return
|
||||||
|
|
||||||
needToWriteKnownNodesToDisk = False
|
needToWriteKnownNodesToDisk = False
|
||||||
|
@ -1410,6 +1442,96 @@ class receiveDataThread(QThread):
|
||||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
||||||
continue
|
continue
|
||||||
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',data[lengthOfNumberOfAddresses+(34*i):4+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message.
|
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',data[lengthOfNumberOfAddresses+(34*i):4+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message.
|
||||||
|
if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
||||||
|
knownNodesLock.acquire()
|
||||||
|
knownNodes[recaddrStream] = {}
|
||||||
|
knownNodesLock.release()
|
||||||
|
if hostFromAddrMessage not in knownNodes[recaddrStream]:
|
||||||
|
if len(knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time())-10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): #If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
||||||
|
knownNodesLock.acquire()
|
||||||
|
knownNodes[recaddrStream][hostFromAddrMessage] = (recaddrPort, timeSomeoneElseReceivedMessageFromThisNode)
|
||||||
|
knownNodesLock.release()
|
||||||
|
needToWriteKnownNodesToDisk = True
|
||||||
|
hostDetails = (timeSomeoneElseReceivedMessageFromThisNode, recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
|
||||||
|
listOfAddressDetailsToBroadcastToPeers.append(hostDetails)
|
||||||
|
else:
|
||||||
|
PORT, timeLastReceivedMessageFromThisNode = knownNodes[recaddrStream][hostFromAddrMessage]#PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
||||||
|
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
||||||
|
knownNodesLock.acquire()
|
||||||
|
knownNodes[recaddrStream][hostFromAddrMessage] = (PORT, timeSomeoneElseReceivedMessageFromThisNode)
|
||||||
|
knownNodesLock.release()
|
||||||
|
if PORT != recaddrPort:
|
||||||
|
print 'Strange occurance: The port specified in an addr message', str(recaddrPort),'does not match the port',str(PORT),'that this program (or some other peer) used to connect to it',str(hostFromAddrMessage),'. Perhaps they changed their port or are using a strange NAT configuration.'
|
||||||
|
if needToWriteKnownNodesToDisk: #Runs if any nodes were new to us. Also, share those nodes with our peers.
|
||||||
|
output = open(appdata + 'knownnodes.dat', 'wb')
|
||||||
|
knownNodesLock.acquire()
|
||||||
|
pickle.dump(knownNodes, output)
|
||||||
|
knownNodesLock.release()
|
||||||
|
output.close()
|
||||||
|
self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers) #no longer broadcast
|
||||||
|
printLock.acquire()
|
||||||
|
print 'knownNodes currently has', len(knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||||
|
printLock.release()
|
||||||
|
elif self.remoteProtocolVersion >= 2: #The difference is that in protocol version 2, network addresses use 64 bit times rather than 32 bit times.
|
||||||
|
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
||||||
|
return
|
||||||
|
if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded):
|
||||||
|
print 'addr message does not contain the correct amount of data. Ignoring.'
|
||||||
|
return
|
||||||
|
|
||||||
|
needToWriteKnownNodesToDisk = False
|
||||||
|
for i in range(0,numberOfAddressesIncluded):
|
||||||
|
try:
|
||||||
|
if data[20+lengthOfNumberOfAddresses+(38*i):32+lengthOfNumberOfAddresses+(38*i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||||
|
printLock.acquire()
|
||||||
|
print 'Skipping IPv6 address.', repr(data[20+lengthOfNumberOfAddresses+(38*i):32+lengthOfNumberOfAddresses+(38*i)])
|
||||||
|
printLock.release()
|
||||||
|
continue
|
||||||
|
except Exception, err:
|
||||||
|
printLock.acquire()
|
||||||
|
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
||||||
|
printLock.release()
|
||||||
|
break #giving up on unpacking any more. We should still be connected however.
|
||||||
|
|
||||||
|
try:
|
||||||
|
recaddrStream, = unpack('>I',data[8+lengthOfNumberOfAddresses+(38*i):12+lengthOfNumberOfAddresses+(38*i)])
|
||||||
|
except Exception, err:
|
||||||
|
printLock.acquire()
|
||||||
|
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
||||||
|
printLock.release()
|
||||||
|
break #giving up on unpacking any more. We should still be connected however.
|
||||||
|
if recaddrStream == 0:
|
||||||
|
continue
|
||||||
|
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): #if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
recaddrServices, = unpack('>Q',data[12+lengthOfNumberOfAddresses+(38*i):20+lengthOfNumberOfAddresses+(38*i)])
|
||||||
|
except Exception, err:
|
||||||
|
printLock.acquire()
|
||||||
|
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
||||||
|
printLock.release()
|
||||||
|
break #giving up on unpacking any more. We should still be connected however.
|
||||||
|
|
||||||
|
try:
|
||||||
|
recaddrPort, = unpack('>H',data[36+lengthOfNumberOfAddresses+(38*i):38+lengthOfNumberOfAddresses+(38*i)])
|
||||||
|
except Exception, err:
|
||||||
|
printLock.acquire()
|
||||||
|
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
||||||
|
printLock.release()
|
||||||
|
break #giving up on unpacking any more. We should still be connected however.
|
||||||
|
#print 'Within recaddr(): IP', recaddrIP, ', Port', recaddrPort, ', i', i
|
||||||
|
hostFromAddrMessage = socket.inet_ntoa(data[32+lengthOfNumberOfAddresses+(38*i):36+lengthOfNumberOfAddresses+(38*i)])
|
||||||
|
#print 'hostFromAddrMessage', hostFromAddrMessage
|
||||||
|
if data[32+lengthOfNumberOfAddresses+(38*i)] == '\x7F':
|
||||||
|
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
|
||||||
|
continue
|
||||||
|
if data[32+lengthOfNumberOfAddresses+(38*i)] == '\x0A':
|
||||||
|
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
||||||
|
continue
|
||||||
|
if data[32+lengthOfNumberOfAddresses+(38*i):34+lengthOfNumberOfAddresses+(38*i)] == '\xC0A8':
|
||||||
|
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
||||||
|
continue
|
||||||
|
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q',data[lengthOfNumberOfAddresses+(38*i):8+lengthOfNumberOfAddresses+(38*i)]) #This is the 'time' value in the received addr message. 64-bit.
|
||||||
if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
||||||
knownNodesLock.acquire()
|
knownNodesLock.acquire()
|
||||||
knownNodes[recaddrStream] = {}
|
knownNodes[recaddrStream] = {}
|
||||||
|
@ -1442,13 +1564,14 @@ class receiveDataThread(QThread):
|
||||||
print 'knownNodes currently has', len(knownNodes[self.streamNumber]), 'nodes for this stream.'
|
print 'knownNodes currently has', len(knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||||
printLock.release()
|
printLock.release()
|
||||||
|
|
||||||
|
|
||||||
#Function runs when we want to broadcast an addr message to all of our peers. Runs when we learn of nodes that we didn't previously know about and want to share them with our peers.
|
#Function runs when we want to broadcast an addr message to all of our peers. Runs when we learn of nodes that we didn't previously know about and want to share them with our peers.
|
||||||
def broadcastaddr(self,listOfAddressDetailsToBroadcastToPeers):
|
def broadcastaddr(self,listOfAddressDetailsToBroadcastToPeers):
|
||||||
numberOfAddressesInAddrMessage = len(listOfAddressDetailsToBroadcastToPeers)
|
numberOfAddressesInAddrMessage = len(listOfAddressDetailsToBroadcastToPeers)
|
||||||
payload = ''
|
payload = ''
|
||||||
for hostDetails in listOfAddressDetailsToBroadcastToPeers:
|
for hostDetails in listOfAddressDetailsToBroadcastToPeers:
|
||||||
timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails
|
timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails
|
||||||
payload += pack('>I',timeLastReceivedMessageFromThisNode)
|
payload += pack('>Q',timeLastReceivedMessageFromThisNode) #now uses 64-bit time
|
||||||
payload += pack('>I',streamNumber)
|
payload += pack('>I',streamNumber)
|
||||||
payload += pack('>q',services) #service bit flags offered by this node
|
payload += pack('>q',services) #service bit flags offered by this node
|
||||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(host)
|
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(host)
|
||||||
|
@ -1503,7 +1626,10 @@ class receiveDataThread(QThread):
|
||||||
PORT, timeLastReceivedMessageFromThisNode = value
|
PORT, timeLastReceivedMessageFromThisNode = value
|
||||||
if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old..
|
if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old..
|
||||||
numberOfAddressesInAddrMessage += 1
|
numberOfAddressesInAddrMessage += 1
|
||||||
payload += pack('>I',timeLastReceivedMessageFromThisNode)
|
if self.remoteProtocolVersion == 1:
|
||||||
|
payload += pack('>I',timeLastReceivedMessageFromThisNode) #32-bit time
|
||||||
|
else:
|
||||||
|
payload += pack('>Q',timeLastReceivedMessageFromThisNode) #64-bit time
|
||||||
payload += pack('>I',self.streamNumber)
|
payload += pack('>I',self.streamNumber)
|
||||||
payload += pack('>q',1) #service bit flags offered by this node
|
payload += pack('>q',1) #service bit flags offered by this node
|
||||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST)
|
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST)
|
||||||
|
@ -1512,7 +1638,10 @@ class receiveDataThread(QThread):
|
||||||
PORT, timeLastReceivedMessageFromThisNode = value
|
PORT, timeLastReceivedMessageFromThisNode = value
|
||||||
if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old..
|
if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old..
|
||||||
numberOfAddressesInAddrMessage += 1
|
numberOfAddressesInAddrMessage += 1
|
||||||
payload += pack('>I',timeLastReceivedMessageFromThisNode)
|
if self.remoteProtocolVersion == 1:
|
||||||
|
payload += pack('>I',timeLastReceivedMessageFromThisNode) #32-bit time
|
||||||
|
else:
|
||||||
|
payload += pack('>Q',timeLastReceivedMessageFromThisNode) #64-bit time
|
||||||
payload += pack('>I',self.streamNumber*2)
|
payload += pack('>I',self.streamNumber*2)
|
||||||
payload += pack('>q',1) #service bit flags offered by this node
|
payload += pack('>q',1) #service bit flags offered by this node
|
||||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST)
|
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST)
|
||||||
|
@ -1521,7 +1650,10 @@ class receiveDataThread(QThread):
|
||||||
PORT, timeLastReceivedMessageFromThisNode = value
|
PORT, timeLastReceivedMessageFromThisNode = value
|
||||||
if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old..
|
if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old..
|
||||||
numberOfAddressesInAddrMessage += 1
|
numberOfAddressesInAddrMessage += 1
|
||||||
payload += pack('>I',timeLastReceivedMessageFromThisNode)
|
if self.remoteProtocolVersion == 1:
|
||||||
|
payload += pack('>I',timeLastReceivedMessageFromThisNode) #32-bit time
|
||||||
|
else:
|
||||||
|
payload += pack('>Q',timeLastReceivedMessageFromThisNode) #64-bit time
|
||||||
payload += pack('>I',(self.streamNumber*2)+1)
|
payload += pack('>I',(self.streamNumber*2)+1)
|
||||||
payload += pack('>q',1) #service bit flags offered by this node
|
payload += pack('>q',1) #service bit flags offered by this node
|
||||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST)
|
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST)
|
||||||
|
@ -1576,6 +1708,7 @@ class receiveDataThread(QThread):
|
||||||
print 'Closing connection to myself: ', self.HOST
|
print 'Closing connection to myself: ', self.HOST
|
||||||
printLock.release()
|
printLock.release()
|
||||||
return
|
return
|
||||||
|
broadcastToSendDataQueues((0,'setRemoteProtocolVersion',(self.HOST,self.remoteProtocolVersion)))
|
||||||
|
|
||||||
knownNodesLock.acquire()
|
knownNodesLock.acquire()
|
||||||
knownNodes[self.streamNumber][self.HOST] = (self.remoteNodeIncomingPort, int(time.time()))
|
knownNodes[self.streamNumber][self.HOST] = (self.remoteNodeIncomingPort, int(time.time()))
|
||||||
|
@ -1590,39 +1723,10 @@ class receiveDataThread(QThread):
|
||||||
|
|
||||||
#Sends a version message
|
#Sends a version message
|
||||||
def sendversion(self):
|
def sendversion(self):
|
||||||
global softwareVersion
|
|
||||||
payload = ''
|
|
||||||
payload += pack('>L',1) #protocol version.
|
|
||||||
payload += pack('>q',1) #bitflags of the services I offer.
|
|
||||||
payload += pack('>q',int(time.time()))
|
|
||||||
|
|
||||||
payload += pack('>q',1) #boolservices offered by the remote node. This data is ignored by the remote host because how could We know what Their services are without them telling us?
|
|
||||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(self.HOST)
|
|
||||||
payload += pack('>H',self.PORT)#remote IPv6 and port
|
|
||||||
|
|
||||||
payload += pack('>q',1) #bitflags of the services I offer.
|
|
||||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack('>L',2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used.
|
|
||||||
payload += pack('>H',config.getint('bitmessagesettings', 'port'))#my external IPv6 and port
|
|
||||||
|
|
||||||
random.seed()
|
|
||||||
payload += eightBytesOfRandomDataUsedToDetectConnectionsToSelf
|
|
||||||
userAgent = '/PyBitmessage:' + softwareVersion + '/' #Length of userAgent must be less than 253.
|
|
||||||
payload += pack('>B',len(userAgent)) #user agent string length. If the user agent is more than 252 bytes long, this code isn't going to work.
|
|
||||||
payload += userAgent
|
|
||||||
payload += encodeVarint(1) #The number of streams about which I care. PyBitmessage currently only supports 1.
|
|
||||||
payload += encodeVarint(self.streamNumber)
|
|
||||||
|
|
||||||
datatosend = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
|
||||||
datatosend = datatosend + 'version\x00\x00\x00\x00\x00' #version command
|
|
||||||
datatosend = datatosend + pack('>L',len(payload)) #payload length
|
|
||||||
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
|
|
||||||
datatosend = datatosend + payload
|
|
||||||
|
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print 'Sending version message'
|
print 'Sending version message'
|
||||||
printLock.release()
|
printLock.release()
|
||||||
self.sock.sendall(datatosend)
|
self.sock.sendall(assembleVersionMessage(self.HOST,self.PORT,self.streamNumber))
|
||||||
#self.versionSent = 1
|
|
||||||
|
|
||||||
#Sends a verack message
|
#Sends a verack message
|
||||||
def sendverack(self):
|
def sendverack(self):
|
||||||
|
@ -1666,35 +1770,7 @@ class sendDataThread(QThread):
|
||||||
printLock.release()
|
printLock.release()
|
||||||
|
|
||||||
def sendVersionMessage(self):
|
def sendVersionMessage(self):
|
||||||
|
datatosend = assembleVersionMessage(self.HOST,self.PORT,self.streamNumber)#the IP and port of the remote host, and my streamNumber.
|
||||||
#Note that there is another copy of this version-sending code in the receiveData class which would need to be changed if you make changes here.
|
|
||||||
global softwareVersion
|
|
||||||
payload = ''
|
|
||||||
payload += pack('>L',1) #protocol version.
|
|
||||||
payload += pack('>q',1) #bitflags of the services I offer.
|
|
||||||
payload += pack('>q',int(time.time()))
|
|
||||||
|
|
||||||
payload += pack('>q',1) #boolservices of remote connection. How can I even know this for sure? This is probably ignored by the remote host.
|
|
||||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(self.HOST)
|
|
||||||
payload += pack('>H',self.PORT)#remote IPv6 and port
|
|
||||||
|
|
||||||
payload += pack('>q',1) #bitflags of the services I offer.
|
|
||||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack('>L',2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used.
|
|
||||||
payload += pack('>H',config.getint('bitmessagesettings', 'port'))#my external IPv6 and port
|
|
||||||
|
|
||||||
random.seed()
|
|
||||||
payload += eightBytesOfRandomDataUsedToDetectConnectionsToSelf
|
|
||||||
userAgent = '/PyBitmessage:' + softwareVersion + '/' #Length of userAgent must be less than 253.
|
|
||||||
payload += pack('>B',len(userAgent)) #user agent string length. If the user agent is more than 252 bytes long, this code isn't going to work.
|
|
||||||
payload += userAgent
|
|
||||||
payload += encodeVarint(1) #The number of streams about which I care. PyBitmessage currently only supports 1 per connection.
|
|
||||||
payload += encodeVarint(self.streamNumber)
|
|
||||||
|
|
||||||
datatosend = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
|
||||||
datatosend = datatosend + 'version\x00\x00\x00\x00\x00' #version command
|
|
||||||
datatosend = datatosend + pack('>L',len(payload)) #payload length
|
|
||||||
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
|
|
||||||
datatosend = datatosend + payload
|
|
||||||
|
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print 'Sending version packet: ', repr(datatosend)
|
print 'Sending version packet: ', repr(datatosend)
|
||||||
|
@ -1728,7 +1804,19 @@ class sendDataThread(QThread):
|
||||||
print 'setting the stream number in the sendData thread (ID:',id(self), ') to', specifiedStreamNumber
|
print 'setting the stream number in the sendData thread (ID:',id(self), ') to', specifiedStreamNumber
|
||||||
printLock.release()
|
printLock.release()
|
||||||
self.streamNumber = specifiedStreamNumber
|
self.streamNumber = specifiedStreamNumber
|
||||||
|
elif command == 'setRemoteProtocolVersion':
|
||||||
|
hostInMessage, specifiedRemoteProtocolVersion = data
|
||||||
|
if hostInMessage == self.HOST:
|
||||||
|
printLock.acquire()
|
||||||
|
print 'setting the remote node\'s protocol version in the sendData thread (ID:',id(self), ') to', specifiedRemoteProtocolVersion
|
||||||
|
printLock.release()
|
||||||
|
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
|
||||||
elif command == 'sendaddr':
|
elif command == 'sendaddr':
|
||||||
|
if self.remoteProtocolVersion == 1:
|
||||||
|
printLock.acquire()
|
||||||
|
print 'a sendData thread is not sending an addr message to this particular peer ('+self.HOST+') because their protocol version is 1.'
|
||||||
|
printLock.release()
|
||||||
|
else:
|
||||||
try:
|
try:
|
||||||
#To prevent some network analysis, 'leak' the data out to our peer after waiting a random amount of time unless we have a long list of messages in our queue to send.
|
#To prevent some network analysis, 'leak' the data out to our peer after waiting a random amount of time unless we have a long list of messages in our queue to send.
|
||||||
random.seed()
|
random.seed()
|
||||||
|
@ -1936,6 +2024,35 @@ def isAddressInMyAddressBook(address):
|
||||||
sqlLock.release()
|
sqlLock.release()
|
||||||
return queryreturn != []
|
return queryreturn != []
|
||||||
|
|
||||||
|
def assembleVersionMessage(remoteHost,remotePort,myStreamNumber):
|
||||||
|
global softwareVersion
|
||||||
|
payload = ''
|
||||||
|
payload += pack('>L',2) #protocol version.
|
||||||
|
payload += pack('>q',1) #bitflags of the services I offer.
|
||||||
|
payload += pack('>q',int(time.time()))
|
||||||
|
|
||||||
|
payload += pack('>q',1) #boolservices of remote connection. How can I even know this for sure? This is probably ignored by the remote host.
|
||||||
|
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(remoteHost)
|
||||||
|
payload += pack('>H',remotePort)#remote IPv6 and port
|
||||||
|
|
||||||
|
payload += pack('>q',1) #bitflags of the services I offer.
|
||||||
|
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack('>L',2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used.
|
||||||
|
payload += pack('>H',config.getint('bitmessagesettings', 'port'))#my external IPv6 and port
|
||||||
|
|
||||||
|
random.seed()
|
||||||
|
payload += eightBytesOfRandomDataUsedToDetectConnectionsToSelf
|
||||||
|
userAgent = '/PyBitmessage:' + softwareVersion + '/' #Length of userAgent must be less than 253.
|
||||||
|
payload += pack('>B',len(userAgent)) #user agent string length. If the user agent is more than 252 bytes long, this code isn't going to work.
|
||||||
|
payload += userAgent
|
||||||
|
payload += encodeVarint(1) #The number of streams about which I care. PyBitmessage currently only supports 1 per connection.
|
||||||
|
payload += encodeVarint(myStreamNumber)
|
||||||
|
|
||||||
|
datatosend = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
||||||
|
datatosend = datatosend + 'version\x00\x00\x00\x00\x00' #version command
|
||||||
|
datatosend = datatosend + pack('>L',len(payload)) #payload length
|
||||||
|
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
|
||||||
|
return datatosend + payload
|
||||||
|
|
||||||
#This thread exists because SQLITE3 is so un-threadsafe that we must submit queries to it and it puts results back in a different queue. They won't let us just use locks.
|
#This thread exists because SQLITE3 is so un-threadsafe that we must submit queries to it and it puts results back in a different queue. They won't let us just use locks.
|
||||||
class sqlThread(QThread):
|
class sqlThread(QThread):
|
||||||
def __init__(self, parent = None):
|
def __init__(self, parent = None):
|
||||||
|
@ -3993,18 +4110,7 @@ class MyForm(QtGui.QMainWindow):
|
||||||
sqlSubmitQueue.put('commit')
|
sqlSubmitQueue.put('commit')
|
||||||
sqlLock.release()
|
sqlLock.release()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
"""try:
|
|
||||||
fromLabel = config.get(fromAddress, 'label')
|
|
||||||
except:
|
|
||||||
fromLabel = ''
|
|
||||||
if fromLabel == '':
|
|
||||||
fromLabel = fromAddress"""
|
|
||||||
|
|
||||||
toLabel = ''
|
toLabel = ''
|
||||||
|
|
||||||
t = (toAddress,)
|
t = (toAddress,)
|
||||||
sqlLock.acquire()
|
sqlLock.acquire()
|
||||||
sqlSubmitQueue.put('''select label from addressbook where address=?''')
|
sqlSubmitQueue.put('''select label from addressbook where address=?''')
|
||||||
|
@ -4018,30 +4124,6 @@ class MyForm(QtGui.QMainWindow):
|
||||||
self.displayNewSentMessage(toAddress,toLabel,fromAddress, subject, message, ackdata)
|
self.displayNewSentMessage(toAddress,toLabel,fromAddress, subject, message, ackdata)
|
||||||
workerQueue.put(('sendmessage',toAddress))
|
workerQueue.put(('sendmessage',toAddress))
|
||||||
|
|
||||||
"""self.ui.tableWidgetSent.insertRow(0)
|
|
||||||
if toLabel == '':
|
|
||||||
newItem = QtGui.QTableWidgetItem(unicode(toAddress,'utf-8'))
|
|
||||||
else:
|
|
||||||
newItem = QtGui.QTableWidgetItem(unicode(toLabel,'utf-8'))
|
|
||||||
newItem.setData(Qt.UserRole,str(toAddress))
|
|
||||||
self.ui.tableWidgetSent.setItem(0,0,newItem)
|
|
||||||
|
|
||||||
if fromLabel == '':
|
|
||||||
newItem = QtGui.QTableWidgetItem(unicode(fromAddress,'utf-8'))
|
|
||||||
else:
|
|
||||||
newItem = QtGui.QTableWidgetItem(unicode(fromLabel,'utf-8'))
|
|
||||||
newItem.setData(Qt.UserRole,str(fromAddress))
|
|
||||||
self.ui.tableWidgetSent.setItem(0,1,newItem)
|
|
||||||
newItem = QtGui.QTableWidgetItem(unicode(subject,'utf-8)'))
|
|
||||||
newItem.setData(Qt.UserRole,unicode(message,'utf-8)'))
|
|
||||||
self.ui.tableWidgetSent.setItem(0,2,newItem)
|
|
||||||
newItem = myTableWidgetItem('Just pressed ''send'' ' + unicode(strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8'))
|
|
||||||
newItem.setData(Qt.UserRole,QByteArray(ackdata))
|
|
||||||
newItem.setData(33,int(time.time()))
|
|
||||||
self.ui.tableWidgetSent.setItem(0,3,newItem)
|
|
||||||
|
|
||||||
self.ui.textEditSentMessage.setPlainText(self.ui.tableWidgetSent.item(0,2).data(Qt.UserRole).toPyObject())"""
|
|
||||||
|
|
||||||
self.ui.comboBoxSendFrom.setCurrentIndex(0)
|
self.ui.comboBoxSendFrom.setCurrentIndex(0)
|
||||||
self.ui.labelFrom.setText('')
|
self.ui.labelFrom.setText('')
|
||||||
self.ui.lineEditTo.setText('')
|
self.ui.lineEditTo.setText('')
|
||||||
|
@ -4592,6 +4674,14 @@ class MyForm(QtGui.QMainWindow):
|
||||||
else:
|
else:
|
||||||
event.ignore()'''
|
event.ignore()'''
|
||||||
|
|
||||||
|
self.statusBar().showMessage('Bitmessage is stuck waiting for the knownNodesLock.')
|
||||||
|
knownNodesLock.acquire()
|
||||||
|
self.statusBar().showMessage('Saving the knownNodes list of peers to disk...')
|
||||||
|
output = open(appdata + 'knownnodes.dat', 'wb')
|
||||||
|
pickle.dump(knownNodes, output)
|
||||||
|
output.close()
|
||||||
|
knownNodesLock.release()
|
||||||
|
|
||||||
broadcastToSendDataQueues((0, 'shutdown', 'all'))
|
broadcastToSendDataQueues((0, 'shutdown', 'all'))
|
||||||
|
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
|
@ -4607,13 +4697,6 @@ class MyForm(QtGui.QMainWindow):
|
||||||
sqlReturnQueue.get()
|
sqlReturnQueue.get()
|
||||||
sqlLock.release()
|
sqlLock.release()
|
||||||
|
|
||||||
self.statusBar().showMessage('Saving the knownNodes list of peers to disk...')
|
|
||||||
knownNodesLock.acquire()
|
|
||||||
output = open(appdata + 'knownnodes.dat', 'wb')
|
|
||||||
pickle.dump(knownNodes, output)
|
|
||||||
output.close()
|
|
||||||
knownNodesLock.release()
|
|
||||||
|
|
||||||
self.trayIcon.hide()
|
self.trayIcon.hide()
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print 'Done.'
|
print 'Done.'
|
||||||
|
@ -4621,7 +4704,8 @@ class MyForm(QtGui.QMainWindow):
|
||||||
self.statusBar().showMessage('All done. Closing user interface...')
|
self.statusBar().showMessage('All done. Closing user interface...')
|
||||||
event.accept()
|
event.accept()
|
||||||
print 'done. (passed event.accept())'
|
print 'done. (passed event.accept())'
|
||||||
raise SystemExit
|
#raise SystemExit
|
||||||
|
os._exit(0)
|
||||||
|
|
||||||
def on_action_InboxMessageForceHtml(self):
|
def on_action_InboxMessageForceHtml(self):
|
||||||
currentInboxRow = self.ui.tableWidgetInbox.currentRow()
|
currentInboxRow = self.ui.tableWidgetInbox.currentRow()
|
||||||
|
|
Reference in New Issue
Block a user