Asyncore performance optimisation
- use bytearray instead of strings for buffers
This commit is contained in:
parent
1eb0dd6f01
commit
4c9006a632
|
@ -12,8 +12,8 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
||||||
def __init__(self, sock=None):
|
def __init__(self, sock=None):
|
||||||
if not hasattr(self, '_map'):
|
if not hasattr(self, '_map'):
|
||||||
asyncore.dispatcher.__init__(self, sock)
|
asyncore.dispatcher.__init__(self, sock)
|
||||||
self.read_buf = b""
|
self.read_buf = bytearray()
|
||||||
self.write_buf = b""
|
self.write_buf = bytearray()
|
||||||
self.state = "init"
|
self.state = "init"
|
||||||
self.lastTx = time.time()
|
self.lastTx = time.time()
|
||||||
self.sentBytes = 0
|
self.sentBytes = 0
|
||||||
|
@ -25,18 +25,23 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
||||||
|
|
||||||
def append_write_buf(self, data):
|
def append_write_buf(self, data):
|
||||||
if data:
|
if data:
|
||||||
|
if isinstance(data, list):
|
||||||
with self.writeLock:
|
with self.writeLock:
|
||||||
self.write_buf += data
|
for chunk in data:
|
||||||
|
self.write_buf.extend(chunk)
|
||||||
|
else:
|
||||||
|
with self.writeLock:
|
||||||
|
self.write_buf.extend(data)
|
||||||
|
|
||||||
def slice_write_buf(self, length=0):
|
def slice_write_buf(self, length=0):
|
||||||
if length > 0:
|
if length > 0:
|
||||||
with self.writeLock:
|
with self.writeLock:
|
||||||
self.write_buf = self.write_buf[length:]
|
del self.write_buf[0:length]
|
||||||
|
|
||||||
def slice_read_buf(self, length=0):
|
def slice_read_buf(self, length=0):
|
||||||
if length > 0:
|
if length > 0:
|
||||||
with self.readLock:
|
with self.readLock:
|
||||||
self.read_buf = self.read_buf[length:]
|
del self.read_buf[0:length]
|
||||||
|
|
||||||
def process(self):
|
def process(self):
|
||||||
if not self.connected:
|
if not self.connected:
|
||||||
|
@ -85,7 +90,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
||||||
self.receivedBytes += len(newData)
|
self.receivedBytes += len(newData)
|
||||||
asyncore.update_received(len(newData))
|
asyncore.update_received(len(newData))
|
||||||
with self.readLock:
|
with self.readLock:
|
||||||
self.read_buf += newData
|
self.read_buf.extend(newData)
|
||||||
|
|
||||||
def handle_write(self):
|
def handle_write(self):
|
||||||
self.lastTx = time.time()
|
self.lastTx = time.time()
|
||||||
|
@ -108,7 +113,9 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def handle_close(self):
|
def handle_close(self):
|
||||||
self.read_buf = b""
|
with self.readLock:
|
||||||
self.write_buf = b""
|
self.read_buf = bytearray()
|
||||||
|
with self.writeLock:
|
||||||
|
self.write_buf = bytearray()
|
||||||
self.state = "close"
|
self.state = "close"
|
||||||
asyncore.dispatcher.close(self)
|
asyncore.dispatcher.close(self)
|
||||||
|
|
|
@ -138,16 +138,16 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
def decode_payload_node(self):
|
def decode_payload_node(self):
|
||||||
services, host, port = self.decode_payload_content("Q16sH")
|
services, host, port = self.decode_payload_content("Q16sH")
|
||||||
if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||||
host = socket.inet_ntop(socket.AF_INET, host[12:])
|
host = socket.inet_ntop(socket.AF_INET, buffer(host, 12, 4))
|
||||||
elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43':
|
elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43':
|
||||||
# Onion, based on BMD/bitcoind
|
# Onion, based on BMD/bitcoind
|
||||||
host = base64.b32encode(host[6:]).lower() + ".onion"
|
host = base64.b32encode(host[6:]).lower() + ".onion"
|
||||||
else:
|
else:
|
||||||
host = socket.inet_ntop(socket.AF_INET6, host)
|
host = socket.inet_ntop(socket.AF_INET6, buffer(host))
|
||||||
if host == "":
|
if host == "":
|
||||||
# This can happen on Windows systems which are not 64-bit compatible
|
# This can happen on Windows systems which are not 64-bit compatible
|
||||||
# so let us drop the IPv6 address.
|
# so let us drop the IPv6 address.
|
||||||
host = socket.inet_ntop(socket.AF_INET, host[12:])
|
host = socket.inet_ntop(socket.AF_INET, buffer(host, 12, 4))
|
||||||
|
|
||||||
return Node(services, host, port)
|
return Node(services, host, port)
|
||||||
|
|
||||||
|
@ -272,7 +272,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
return True
|
return True
|
||||||
#TODO make this more asynchronous
|
#TODO make this more asynchronous
|
||||||
random.shuffle(items)
|
random.shuffle(items)
|
||||||
for i in items:
|
for i in map(str, items):
|
||||||
if i in DandelionStems().stem and \
|
if i in DandelionStems().stem and \
|
||||||
self != DandelionStems().stem[i]:
|
self != DandelionStems().stem[i]:
|
||||||
self.antiIntersectionDelay()
|
self.antiIntersectionDelay()
|
||||||
|
@ -298,7 +298,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
else:
|
else:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
for i in items:
|
for i in map(str, items):
|
||||||
self.handleReceivedInventory(i)
|
self.handleReceivedInventory(i)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
@ -323,7 +323,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
self.dandelionRoutes = BMConnectionPool.dandelionRouteSelector(self)
|
self.dandelionRoutes = BMConnectionPool.dandelionRouteSelector(self)
|
||||||
self.dandelionRefresh = time.time() + REASSIGN_INTERVAL
|
self.dandelionRefresh = time.time() + REASSIGN_INTERVAL
|
||||||
|
|
||||||
for i in items:
|
for i in map(str, items):
|
||||||
DandelionStems().add(i, self, self.dandelionRoutes)
|
DandelionStems().add(i, self, self.dandelionRoutes)
|
||||||
self.handleReceivedInventory(i)
|
self.handleReceivedInventory(i)
|
||||||
|
|
||||||
|
@ -354,12 +354,12 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.object.checkObjectByType()
|
self.object.checkObjectByType()
|
||||||
objectProcessorQueue.put((self.object.objectType, self.object.data))
|
objectProcessorQueue.put((self.object.objectType, buffer(self.object.data)))
|
||||||
except BMObjectInvalidError as e:
|
except BMObjectInvalidError as e:
|
||||||
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
|
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
|
||||||
|
|
||||||
Inventory()[self.object.inventoryHash] = (
|
Inventory()[self.object.inventoryHash] = (
|
||||||
self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag)
|
self.object.objectType, self.object.streamNumber, buffer(self.payload[objectOffset:]), self.object.expiresTime, buffer(self.object.tag))
|
||||||
invQueue.put((self.object.streamNumber, self.object.inventoryHash, self.destination))
|
invQueue.put((self.object.streamNumber, self.object.inventoryHash, self.destination))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -370,7 +370,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
addresses = self._decode_addr()
|
addresses = self._decode_addr()
|
||||||
for i in addresses:
|
for i in addresses:
|
||||||
seenTime, stream, services, ip, port = i
|
seenTime, stream, services, ip, port = i
|
||||||
decodedIP = protocol.checkIPAddress(ip)
|
decodedIP = protocol.checkIPAddress(buffer(ip))
|
||||||
if stream not in state.streamsInWhichIAmParticipating:
|
if stream not in state.streamsInWhichIAmParticipating:
|
||||||
continue
|
continue
|
||||||
if decodedIP is not False and seenTime > time.time() - BMProto.addressAlive:
|
if decodedIP is not False and seenTime > time.time() - BMProto.addressAlive:
|
||||||
|
|
|
@ -55,7 +55,10 @@ class DownloadThread(threading.Thread, StoppableThread):
|
||||||
i.objectsNewToMe[k] = False
|
i.objectsNewToMe[k] = False
|
||||||
self.pending[k] = now
|
self.pending[k] = now
|
||||||
|
|
||||||
payload = addresses.encodeVarint(len(request)) + ''.join(request)
|
payload = bytearray()
|
||||||
|
payload.extend(addresses.encodeVarint(len(request)))
|
||||||
|
for chunk in request:
|
||||||
|
payload.extend(chunk)
|
||||||
i.append_write_buf(protocol.CreatePacket('getdata', payload))
|
i.append_write_buf(protocol.CreatePacket('getdata', payload))
|
||||||
logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request))
|
logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request))
|
||||||
requested += len(request)
|
requested += len(request)
|
||||||
|
|
|
@ -89,7 +89,7 @@ class UDPSocket(BMProto):
|
||||||
remoteport = False
|
remoteport = False
|
||||||
for i in addresses:
|
for i in addresses:
|
||||||
seenTime, stream, services, ip, port = i
|
seenTime, stream, services, ip, port = i
|
||||||
decodedIP = protocol.checkIPAddress(ip)
|
decodedIP = protocol.checkIPAddress(buffer(ip))
|
||||||
if stream not in state.streamsInWhichIAmParticipating:
|
if stream not in state.streamsInWhichIAmParticipating:
|
||||||
continue
|
continue
|
||||||
if seenTime < time.time() - BMProto.maxTimeOffset or seenTime > time.time() + BMProto.maxTimeOffset:
|
if seenTime < time.time() - BMProto.maxTimeOffset or seenTime > time.time() + BMProto.maxTimeOffset:
|
||||||
|
@ -142,7 +142,7 @@ class UDPSocket(BMProto):
|
||||||
else:
|
else:
|
||||||
self.local = False
|
self.local = False
|
||||||
# overwrite the old buffer to avoid mixing data and so that self.local works correctly
|
# overwrite the old buffer to avoid mixing data and so that self.local works correctly
|
||||||
self.read_buf = recdata
|
self.read_buf[0:] = recdata
|
||||||
self.bm_proto_reset()
|
self.bm_proto_reset()
|
||||||
receiveDataQueue.put(self.listening)
|
receiveDataQueue.put(self.listening)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user