Freezing message parser fix #2
- this has been tested on Windows as well, and has been cleaned up. There is now a permanent parser thread, and it restarts when the parsing takes more than 1 second - Fixes #900
This commit is contained in:
parent
8f194296e7
commit
47e2df86b9
src
|
@ -1,5 +1,7 @@
|
|||
from PyQt4 import QtCore, QtGui
|
||||
|
||||
import multiprocessing
|
||||
import Queue
|
||||
from urlparse import urlparse
|
||||
from safehtmlparser import *
|
||||
|
||||
|
|
|
@ -1,24 +1,28 @@
|
|||
from HTMLParser import HTMLParser
|
||||
import inspect
|
||||
import multiprocessing
|
||||
import re
|
||||
import Queue
|
||||
from urllib import quote, quote_plus
|
||||
from urlparse import urlparse
|
||||
import multiprocessing
|
||||
import Queue
|
||||
from debug import logger
|
||||
from shared import parserInputQueue, parserOutputQueue, parserProcess, parserLock
|
||||
|
||||
def regexpSubprocess(queue):
|
||||
try:
|
||||
result = queue.get()
|
||||
result = SafeHTMLParser.uriregex1.sub(
|
||||
r'<a href="\1">\1</a>',
|
||||
result)
|
||||
result = SafeHTMLParser.uriregex2.sub(r'<a href="\1&', result)
|
||||
result = SafeHTMLParser.emailregex.sub(r'<a href="mailto:\1">\1</a>', result)
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
queue.put(result)
|
||||
def regexpSubprocess(parserInputQueue, parserOutputQueue):
|
||||
for data in iter(parserInputQueue.get, None):
|
||||
if data is None:
|
||||
break;
|
||||
try:
|
||||
result = SafeHTMLParser.uriregex1.sub(
|
||||
r'<a href="\1">\1</a>',
|
||||
data)
|
||||
result = SafeHTMLParser.uriregex2.sub(r'<a href="\1&', result)
|
||||
result = SafeHTMLParser.emailregex.sub(r'<a href="mailto:\1">\1</a>', result)
|
||||
parserOutputQueue.put(result)
|
||||
except SystemExit:
|
||||
break;
|
||||
except:
|
||||
break;
|
||||
|
||||
class SafeHTMLParser(HTMLParser):
|
||||
# from html5lib.sanitiser
|
||||
|
@ -106,22 +110,28 @@ class SafeHTMLParser(HTMLParser):
|
|||
self.sanitised += "&" + name + ";"
|
||||
|
||||
def feed(self, data):
|
||||
global parserLock, parserProcess, parserInputQueue, parserOutputQueue
|
||||
HTMLParser.feed(self, data)
|
||||
tmp = SafeHTMLParser.multi_replace(data)
|
||||
tmp = unicode(tmp, 'utf-8', 'replace')
|
||||
|
||||
queue = multiprocessing.Queue()
|
||||
parser = multiprocessing.Process(target=regexpSubprocess, name="RegExParser", args=(queue,))
|
||||
parser.start()
|
||||
queue.put(tmp)
|
||||
parser.join(1)
|
||||
if parser.is_alive():
|
||||
parser.terminate()
|
||||
parser.join()
|
||||
|
||||
parserLock.acquire()
|
||||
if parserProcess is None:
|
||||
parserProcess = multiprocessing.Process(target=regexpSubprocess, name="RegExParser", args=(parserInputQueue, parserOutputQueue))
|
||||
parserProcess.start()
|
||||
parserLock.release()
|
||||
parserInputQueue.put(tmp)
|
||||
try:
|
||||
tmp = queue.get(False)
|
||||
except (Queue.Empty, StopIteration) as e:
|
||||
tmp = parserOutputQueue.get(True, 1)
|
||||
except Queue.Empty:
|
||||
logger.error("Regular expression parsing timed out, not displaying links")
|
||||
parserLock.acquire()
|
||||
parserProcess.terminate()
|
||||
parserProcess = multiprocessing.Process(target=regexpSubprocess, name="RegExParser", args=(parserInputQueue, parserOutputQueue))
|
||||
parserProcess.start()
|
||||
parserLock.release()
|
||||
else:
|
||||
pass
|
||||
self.raw += tmp
|
||||
|
||||
def is_html(self, text = None, allow_picture = False):
|
||||
|
|
|
@ -44,7 +44,9 @@ def convertStringToInt(s):
|
|||
def signal_handler(signal, frame):
|
||||
logger.error("Got signal %i in %s/%s", signal, current_process().name, current_thread().name)
|
||||
if current_process().name == "RegExParser":
|
||||
sys.exit(0)
|
||||
# on Windows this isn't triggered, but it's fine, it has its own process termination thing
|
||||
print "RegExParser interrupted"
|
||||
raise SystemExit
|
||||
if current_process().name != "MainProcess":
|
||||
raise StopIteration("Interrupted")
|
||||
if current_thread().name != "MainThread":
|
||||
|
|
|
@ -16,7 +16,7 @@ import os
|
|||
import pickle
|
||||
import Queue
|
||||
import random
|
||||
from multiprocessing import active_children
|
||||
from multiprocessing import active_children, Queue as mpQueue, Lock as mpLock
|
||||
from signal import SIGTERM
|
||||
import socket
|
||||
import sys
|
||||
|
@ -48,6 +48,10 @@ myAddressesByTag = {} # The key in this dictionary is the tag generated from the
|
|||
broadcastSendersForWhichImWatching = {}
|
||||
workerQueue = Queue.Queue()
|
||||
UISignalQueue = Queue.Queue()
|
||||
parserInputQueue = mpQueue()
|
||||
parserOutputQueue = mpQueue()
|
||||
parserProcess = None
|
||||
parserLock = mpLock()
|
||||
addressGeneratorQueue = Queue.Queue()
|
||||
knownNodesLock = threading.Lock()
|
||||
knownNodes = {}
|
||||
|
@ -504,6 +508,10 @@ def isProofOfWorkSufficient(data,
|
|||
def doCleanShutdown():
|
||||
global shutdown, thisapp
|
||||
shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
|
||||
try:
|
||||
parserInputQueue.put(None, False)
|
||||
except Queue.Full:
|
||||
pass
|
||||
for child in active_children():
|
||||
try:
|
||||
logger.info("Killing PoW child %i", child.pid)
|
||||
|
|
Reference in New Issue
Block a user