Added more API for raw objects

This commit is contained in:
Biryuzovye Kleshni 2018-07-29 11:06:41 +00:00
parent 00c4ee881f
commit 02ece2d491
3 changed files with 106 additions and 0 deletions

View File

@ -18,6 +18,7 @@ from random import randint
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
from struct import pack from struct import pack
import errno import errno
import Queue
import shared import shared
from addresses import ( from addresses import (
@ -28,6 +29,7 @@ import defaults
import helper_inbox import helper_inbox
import helper_sent import helper_sent
import helper_threading import helper_threading
import helper_random
import state import state
import queues import queues
@ -47,6 +49,7 @@ import proofofwork
str_chan = '[chan]' str_chan = '[chan]'
queuedRawObjects = {}
class APIError(Exception): class APIError(Exception):
def __init__(self, error_number, error_message): def __init__(self, error_number, error_message):
@ -1234,6 +1237,66 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
return json.dumps(result, indent = 4, separators = (",", ": ")) return json.dumps(result, indent = 4, separators = (",", ": "))
def HandleQueueRawObject(self, arguments):
if len(arguments) != 2:
raise APIError(0, "2 arguments needed")
TTL, headlessPayload = arguments
headlessPayload = self._decode(headlessPayload, "hex")
if type(TTL) is not int or TTL < 300 or TTL > 28 * 24 * 60 * 60:
raise APIError(33, "TTL must be an integer between 300 and 28 * 24 * 60 * 60 seconds")
ID = helper_random.randomBytes(32)
queues.workerQueue.put(("sendRawObject", ID, TTL, headlessPayload))
queuedRawObjects[ID] = "queued",
return hexlify(ID)
def HandleCancelQueuedRawObject(self, arguments):
if len(arguments) != 1:
raise APIError(0, "1 argument needed")
ID, = arguments
if len(ID) != 64:
raise APIError(19, "The length of queue item ID should be 32 bytes (encoded in hex thus 64 characters)")
ID = self._decode(ID, 'hex')
queues.workerQueue.put(("cancelRawObject", ID))
def HandleCheckQueuedRawObject(self, arguments):
if len(arguments) != 1:
raise APIError(0, "1 argument needed")
ID, = arguments
if len(ID) != 64:
raise APIError(19, "The length of queue item ID should be 32 bytes (encoded in hex thus 64 characters)")
ID = self._decode(ID, 'hex')
while True:
try:
queueItem = queues.processedRawObjectsQueue.get_nowait()
command, randomID, arguments = queueItem[0], queueItem[1], queueItem[2: ]
if command == "sent":
queuedRawObjects[randomID] = command, hexlify(arguments[0])
else:
queuedRawObjects[randomID] = (command, ) + arguments
except Queue.Empty:
break
status = queuedRawObjects.get(ID, ("notfound", ))
if status[0] in ["failed", "sent", "canceled"]:
del queuedRawObjects[ID]
return status
def HandleClientStatus(self, params): def HandleClientStatus(self, params):
if len(network.stats.connectedHostsList()) == 0: if len(network.stats.connectedHostsList()) == 0:
networkStatus = 'notConnected' networkStatus = 'notConnected'
@ -1344,6 +1407,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
handlers["disseminateRawObject"] = HandleDisseminateRawObject handlers["disseminateRawObject"] = HandleDisseminateRawObject
handlers["getRawObject"] = HandleGetRawObject handlers["getRawObject"] = HandleGetRawObject
handlers["listRawObjects"] = HandleListRawObjects handlers["listRawObjects"] = HandleListRawObjects
handlers["queueRawObject"] = HandleQueueRawObject
handlers["cancelQueuedRawObject"] = HandleCancelQueuedRawObject
handlers["checkQueuedRawObject"] = HandleCheckQueuedRawObject
handlers['clientStatus'] = HandleClientStatus handlers['clientStatus'] = HandleClientStatus
handlers['decodeAddress'] = HandleDecodeAddress handlers['decodeAddress'] = HandleDecodeAddress
handlers['deleteAndVacuum'] = HandleDeleteAndVacuum handlers['deleteAndVacuum'] = HandleDeleteAndVacuum

View File

@ -14,3 +14,4 @@ portCheckerQueue = Queue.Queue()
receiveDataQueue = Queue.Queue() receiveDataQueue = Queue.Queue()
apiAddressGeneratorReturnQueue = Queue.Queue( apiAddressGeneratorReturnQueue = Queue.Queue(
) # The address generator thread uses this queue to get information back to the API thread. ) # The address generator thread uses this queue to get information back to the API thread.
processedRawObjectsQueue = Queue.Queue()

View File

@ -312,6 +312,10 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
self.sendMyPubkey(*arguments) self.sendMyPubkey(*arguments)
elif command == "requestPubkey": elif command == "requestPubkey":
self.requestPubkey(*arguments) self.requestPubkey(*arguments)
elif command == "sendRawObject":
self.sendRawObject(*arguments)
elif command == "cancelRawObject":
self.cancelRawObject(*arguments)
elif command == "resetPoW": elif command == "resetPoW":
pass pass
elif command == "GPUError": elif command == "GPUError":
@ -1061,3 +1065,38 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread):
"(For getpubkey message)".format(version), "(For getpubkey message)".format(version),
workDone workDone
) )
def sendRawObject(self, randomID, TTL, headlessPayload):
ID = "raw", randomID
debug.logger.info("Sending raw object")
expiryTime = int(time.time() + TTL)
def workDone(head):
inventoryHash = protocol.checkAndShareObjectWithPeers(head + headlessPayload)
if inventoryHash is None:
queues.processedRawObjectsQueue.put(("failed", randomID))
else:
queues.processedRawObjectsQueue.put(("sent", randomID, inventoryHash))
queues.processedRawObjectsQueue.put(("doingwork", randomID))
self.startWork(
ID, headlessPayload, TTL, expiryTime,
defaults.networkDefaultProofOfWorkNonceTrialsPerByte,
defaults.networkDefaultPayloadLengthExtraBytes,
"(For raw object)",
workDone
)
def cancelRawObject(self, randomID):
ID = "raw", randomID
if ID in self.startedWorks:
del self.startedWorks[ID]
workProver.commandsQueue.put(("cancelTask", ID))
queues.processedRawObjectsQueue.put(("canceled", randomID))