diff --git a/src/api.py b/src/api.py index d43bdf0b..8c1a1da6 100644 --- a/src/api.py +++ b/src/api.py @@ -22,10 +22,12 @@ import shared import time from addresses import (decodeAddress, addBMIfNotPresent, decodeVarint, calculateInventoryHash, varintDecodeError) +import protocol from bmconfigparser import BMConfigParser import defaults import helper_inbox import helper_sent +import helper_random import hashlib import state @@ -63,6 +65,7 @@ class StoppableXMLRPCServer(SimpleXMLRPCServer): while state.shutdown == 0: self.handle_request() +queuedRawObjects = {} # This is one of several classes that constitute the API # This class was written by Vaibhav Bhatia. Modified by Jonathan Warren (Atheros). @@ -951,6 +954,79 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): data += ']}' return data + def HandleDisseminateRawObject(self, params): + if len(params) != 1: + raise APIError(0, 'I need 1 parameter!') + payload = self._decode(params[0], "hex") + + if protocol.checkAndShareObjectWithPeers(payload) == 0: + raise APIError(30, 'Invalid object or insufficient POW') + else: + return hexlify(calculateInventoryHash(payload)) + + def HandleQueueRawObject(self, params): + if len(params) != 1: + raise APIError(0, 'I need 1 parameter!') + payload = self._decode(params[0], "hex") + queueItemID = helper_random.randomBytes(32) + queues.workerQueue.put(('sendrawobject', (queueItemID, payload))) + queuedRawObjects[queueItemID] = "queued" + return hexlify(queueItemID) + + def HandleCheckQueuedRawObject(self, params): + if len(params) != 1: + raise APIError(0, 'I need 1 parameter!') + queueItemID, = params + if len(queueItemID) != 64: + raise APIError(19, 'The length of queue item ID should be 32 bytes (encoded in hex thus 64 characters).') + queueItemID = self._decode(queueItemID, "hex") + + while True: + try: + ID, result = queues.processedRawObjectsQueue.get_nowait() + queuedRawObjects[ID] = result + except: + break + + result = queuedRawObjects.get(queueItemID, "notfound") + if result == "failed" or len(result) == 64: + del queuedRawObjects[queueItemID] + return result + + def HandleGetRawObject(self, params): + if len(params) != 1: + raise APIError(0, 'I need 1 parameter!') + inventoryHash, = params + if len(inventoryHash) != 64: + raise APIError(19, 'The length of hash should be 32 bytes (encoded in hex thus 64 characters).') + inventoryHash = self._decode(inventoryHash, "hex") + + try: + inventoryItem = Inventory()[inventoryHash] + except: + raise APIError(31, 'Object not found.') + else: + return json.dumps({'hash':hexlify(inventoryHash),'objectType':inventoryItem.type,'streamNumber':inventoryItem.stream,'payload':hexlify(inventoryItem.payload),'expiresTime':inventoryItem.expires,'tag':hexlify(inventoryItem.tag)}, indent=4, separators=(',', ': ')) + + def HandleListRawObjects(self, params): + if len(params) != 4: + raise APIError(0, 'I need 4 parameters!') + objectType, streamNumber, sliceStart, sliceEnd = params + if type(sliceStart) is not int or type(sliceEnd) is not int: + raise APIError(32, 'Invalid slice boundaries') + + result = [] + inventory = Inventory() + for i in inventory: + inventoryItem = inventory[str(i)] + if objectType != -1 and inventoryItem.type != objectType: + continue + if streamNumber != 0 and inventoryItem.stream != streamNumber: + continue + result.append({'hash':hexlify(i),'slice':hexlify(inventoryItem.payload[sliceStart:sliceEnd])}) + + return json.dumps(result, indent=4, separators=(',', ': ')) + def HandleClientStatus(self, params): if len(network.stats.connectedHostsList()) == 0: networkStatus = 'notConnected' @@ -1039,6 +1115,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): handlers['disseminatePubkey'] = HandleDissimatePubKey handlers['getMessageDataByDestinationHash'] = HandleGetMessageDataByDestinationHash handlers['getMessageDataByDestinationTag'] = HandleGetMessageDataByDestinationHash + handlers['disseminateRawObject'] = HandleDisseminateRawObject + handlers['queueRawObject'] = HandleQueueRawObject + handlers['checkQueuedRawObject'] = HandleCheckQueuedRawObject + handlers['getRawObject'] = HandleGetRawObject + handlers['listRawObjects'] = HandleListRawObjects handlers['clientStatus'] = HandleClientStatus handlers['decodeAddress'] = HandleDecodeAddress handlers['deleteAndVacuum'] = HandleDeleteAndVacuum diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index c95d484a..7a6553c0 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -3,7 +3,7 @@ from __future__ import division import time import threading import hashlib -from struct import pack +from struct import pack, unpack # used when the API must execute an outside program from subprocess import call from binascii import hexlify, unhexlify @@ -137,6 +137,11 @@ class singleWorker(threading.Thread, StoppableThread): self.sendBroadcast() except: pass + elif command == 'sendrawobject': + try: + self.sendRawObject(data) + except: + pass elif command == 'doPOWForMyV2Pubkey': try: self.doPOWForMyV2Pubkey(data) @@ -226,6 +231,18 @@ class singleWorker(threading.Thread, StoppableThread): # inventoryHash = calculateInventoryHash(payload) return payload + def sendRawObject(self, data): + queueItemID, payload = data + endOfLifeTime, = unpack('>Q', payload[:8]) + TTL = min(28 * 24 * 60 * 60, max(300, endOfLifeTime - int(time.time()))) + payload = self._doPOWDefaults( + payload, TTL, log_prefix='(For raw object)') + + if protocol.checkAndShareObjectWithPeers(payload) == 0: + queues.processedRawObjectsQueue.put((queueItemID, "failed")) + else: + queues.processedRawObjectsQueue.put((queueItemID, hexlify(calculateInventoryHash(payload)))) + # This function also broadcasts out the pubkey message # once it is done with the POW def doPOWForMyV2Pubkey(self, hash): diff --git a/src/queues.py b/src/queues.py index e8923dbd..ad8a1252 100644 --- a/src/queues.py +++ b/src/queues.py @@ -14,3 +14,4 @@ portCheckerQueue = Queue.Queue() receiveDataQueue = Queue.Queue() apiAddressGeneratorReturnQueue = Queue.Queue( ) # The address generator thread uses this queue to get information back to the API thread. +processedRawObjectsQueue = Queue.Queue()