diff --git a/src/api.py b/src/api.py index 04699bde..2e134a64 100644 --- a/src/api.py +++ b/src/api.py @@ -18,6 +18,7 @@ from random import randint from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer from struct import pack import errno +import Queue import shared from addresses import ( @@ -28,6 +29,7 @@ import defaults import helper_inbox import helper_sent import helper_threading +import helper_random import state import queues @@ -47,6 +49,7 @@ import proofofwork str_chan = '[chan]' +queuedRawObjects = {} class APIError(Exception): def __init__(self, error_number, error_message): @@ -1234,6 +1237,66 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): return json.dumps(result, indent = 4, separators = (",", ": ")) + def HandleQueueRawObject(self, arguments): + if len(arguments) != 2: + raise APIError(0, "2 arguments needed") + + TTL, headlessPayload = arguments + headlessPayload = self._decode(headlessPayload, "hex") + + if type(TTL) is not int or TTL < 300 or TTL > 28 * 24 * 60 * 60: + raise APIError(33, "TTL must be an integer between 300 and 28 * 24 * 60 * 60 seconds") + + ID = helper_random.randomBytes(32) + + queues.workerQueue.put(("sendRawObject", ID, TTL, headlessPayload)) + queuedRawObjects[ID] = "queued", + + return hexlify(ID) + + def HandleCancelQueuedRawObject(self, arguments): + if len(arguments) != 1: + raise APIError(0, "1 argument needed") + + ID, = arguments + + if len(ID) != 64: + raise APIError(19, "The length of queue item ID should be 32 bytes (encoded in hex thus 64 characters)") + + ID = self._decode(ID, 'hex') + + queues.workerQueue.put(("cancelRawObject", ID)) + + def HandleCheckQueuedRawObject(self, arguments): + if len(arguments) != 1: + raise APIError(0, "1 argument needed") + + ID, = arguments + + if len(ID) != 64: + raise APIError(19, "The length of queue item ID should be 32 bytes (encoded in hex thus 64 characters)") + + ID = self._decode(ID, 'hex') + + while True: + try: + queueItem = queues.processedRawObjectsQueue.get_nowait() + command, randomID, arguments = queueItem[0], queueItem[1], queueItem[2: ] + + if command == "sent": + queuedRawObjects[randomID] = command, hexlify(arguments[0]) + else: + queuedRawObjects[randomID] = (command, ) + arguments + except Queue.Empty: + break + + status = queuedRawObjects.get(ID, ("notfound", )) + + if status[0] in ["failed", "sent", "canceled"]: + del queuedRawObjects[ID] + + return status + def HandleClientStatus(self, params): if len(network.stats.connectedHostsList()) == 0: networkStatus = 'notConnected' @@ -1344,6 +1407,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): handlers["disseminateRawObject"] = HandleDisseminateRawObject handlers["getRawObject"] = HandleGetRawObject handlers["listRawObjects"] = HandleListRawObjects + handlers["queueRawObject"] = HandleQueueRawObject + handlers["cancelQueuedRawObject"] = HandleCancelQueuedRawObject + handlers["checkQueuedRawObject"] = HandleCheckQueuedRawObject handlers['clientStatus'] = HandleClientStatus handlers['decodeAddress'] = HandleDecodeAddress handlers['deleteAndVacuum'] = HandleDeleteAndVacuum diff --git a/src/queues.py b/src/queues.py index e8923dbd..ad8a1252 100644 --- a/src/queues.py +++ b/src/queues.py @@ -14,3 +14,4 @@ portCheckerQueue = Queue.Queue() receiveDataQueue = Queue.Queue() apiAddressGeneratorReturnQueue = Queue.Queue( ) # The address generator thread uses this queue to get information back to the API thread. +processedRawObjectsQueue = Queue.Queue() diff --git a/src/singleworker.py b/src/singleworker.py index 35a1adf1..b87ec2f7 100644 --- a/src/singleworker.py +++ b/src/singleworker.py @@ -312,6 +312,10 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): self.sendMyPubkey(*arguments) elif command == "requestPubkey": self.requestPubkey(*arguments) + elif command == "sendRawObject": + self.sendRawObject(*arguments) + elif command == "cancelRawObject": + self.cancelRawObject(*arguments) elif command == "resetPoW": pass elif command == "GPUError": @@ -1061,3 +1065,38 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): "(For getpubkey message)".format(version), workDone ) + + def sendRawObject(self, randomID, TTL, headlessPayload): + ID = "raw", randomID + + debug.logger.info("Sending raw object") + + expiryTime = int(time.time() + TTL) + + def workDone(head): + inventoryHash = protocol.checkAndShareObjectWithPeers(head + headlessPayload) + + if inventoryHash is None: + queues.processedRawObjectsQueue.put(("failed", randomID)) + else: + queues.processedRawObjectsQueue.put(("sent", randomID, inventoryHash)) + + queues.processedRawObjectsQueue.put(("doingwork", randomID)) + + self.startWork( + ID, headlessPayload, TTL, expiryTime, + defaults.networkDefaultProofOfWorkNonceTrialsPerByte, + defaults.networkDefaultPayloadLengthExtraBytes, + "(For raw object)", + workDone + ) + + def cancelRawObject(self, randomID): + ID = "raw", randomID + + if ID in self.startedWorks: + del self.startedWorks[ID] + + workProver.commandsQueue.put(("cancelTask", ID)) + + queues.processedRawObjectsQueue.put(("canceled", randomID))