Asyncore updates

- fix crash in inv thread
- more prints changed into logger
- minor fixes
This commit is contained in:
Peter Šurda 2017-05-29 00:47:41 +02:00
parent c85d52b8e8
commit 65bb6648e7
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
4 changed files with 10 additions and 16 deletions

View File

@ -97,7 +97,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
logger.debug("Bad checksum, ignoring")
self.invalid = True
retval = True
if not self.fullyEstablished and self.command not in ("version", "verack"):
if not self.fullyEstablished and self.command not in ("error", "version", "verack"):
logger.error("Received command %s before connection was fully established, ignoring", self.command)
self.invalid = True
if not self.invalid:

View File

@ -51,18 +51,16 @@ class InvThread(threading.Thread, StoppableThread):
continue
if len(hashes) > 0:
connection.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(len(hashes)) + b"".join(hashes)))
self.collectionOfInvs[iterator] = []
self.collectionOfInvs[iterator] = {}
iterator += 1
iterator %= InvThread.size
self.stop.wait(1)
def holdHash(self, stream, hash):
iter = random.randrange(0, InvThread.size)
try:
self.collectionOfInvs[iter][stream].append(hash)
except KeyError, IndexError:
self.collectionOfInvs[iter][stream] = []
self.collectionOfInvs[iter][stream].append(hash)
i = random.randrange(0, InvThread.size)
if stream not in self.collectionOfInvs[i]:
self.collectionOfInvs[i][stream] = []
self.collectionOfInvs[i][stream].append(hash)
def hasHash(self, hash):
for streamlist in self.collectionOfInvs:

View File

@ -19,7 +19,6 @@ class BMNetworkThread(threading.Thread, StoppableThread):
BMConnectionPool().loop()
def stopThread(self):
super(BMNetworkThread, self).stopThread()
for i in BMConnectionPool().listeningSockets:
try:
i.close()
@ -38,3 +37,4 @@ class BMNetworkThread(threading.Thread, StoppableThread):
# just in case
asyncore.close_all()
super(BMNetworkThread, self).stopThread()

View File

@ -3,7 +3,6 @@ from binascii import hexlify
import hashlib
import math
import time
from pprint import pprint
import socket
import struct
import random
@ -107,7 +106,7 @@ class UDPSocket(BMProto):
remoteport = port
if remoteport is False:
return
print "received peer discovery from %s:%i (port %i):" % (self.destination.host, self.destination.port, remoteport)
logger.debug("received peer discovery from %s:%i (port %i):", self.destination.host, self.destination.port, remoteport)
if self.local:
peerDiscoveryQueue.put(state.Peer(self.destination.host, remoteport))
return True
@ -140,7 +139,7 @@ class UDPSocket(BMProto):
try:
(recdata, addr) = self.socket.recvfrom(AdvancedDispatcher._buf_len)
except socket.error as e:
print "socket error: %s" % (str(e))
logger.error("socket error: %s", str(e))
return
self.destination = state.Peer(addr[0], addr[1])
@ -149,23 +148,20 @@ class UDPSocket(BMProto):
self.local = True
else:
self.local = False
print "read %ib" % (len(recdata))
# overwrite the old buffer to avoid mixing data and so that self.local works correctly
self.read_buf = recdata
self.bm_proto_reset()
self.process()
def handle_write(self):
# print "handling write"
try:
data = self.writeQueue.get(False)
except Queue.Empty:
return
try:
retval = self.socket.sendto(data, ('<broadcast>', UDPSocket.port))
#print "broadcasted %ib" % (retval)
except socket.error as e:
print "socket error on sendato: %s" % (e)
logger.error("socket error on sendato: %s", str(e))
self.writeQueue.task_done()