1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
From: Matt Chaput <matt@whoosh.ca>
Date: Fri, 8 May 2015 13:05:37 -0400
Subject: Handle an empty queue exception in mpwriter.
Fixes #414
Origin: upstream, 2.7.1
---
src/whoosh/compat.py | 6 ++++--
src/whoosh/multiproc.py | 10 +++++++---
tests/test_mpwriter.py | 6 +++---
3 files changed, 14 insertions(+), 8 deletions(-)
diff --git a/src/whoosh/compat.py b/src/whoosh/compat.py
index 5154e6d..5408849 100644
--- a/src/whoosh/compat.py
+++ b/src/whoosh/compat.py
@@ -36,7 +36,8 @@ if sys.version_info[0] < 3:
bytes_type = str
unichr = unichr
from urllib import urlretrieve
-
+ import Queue as queue
+
def byte(num):
return chr(num)
@@ -83,7 +84,8 @@ else:
bytes_type = bytes
unichr = chr
from urllib.request import urlretrieve
-
+ import queue
+
def byte(num):
return bytes((num,))
diff --git a/src/whoosh/multiproc.py b/src/whoosh/multiproc.py
index 54109a8..6353577 100644
--- a/src/whoosh/multiproc.py
+++ b/src/whoosh/multiproc.py
@@ -29,7 +29,7 @@ from __future__ import with_statement
import os
from multiprocessing import Process, Queue, cpu_count
-from whoosh.compat import xrange, iteritems, pickle
+from whoosh.compat import queue, xrange, iteritems, pickle
from whoosh.codec import base
from whoosh.writing import PostingPool, SegmentWriter
from whoosh.externalsort import imerge
@@ -111,6 +111,7 @@ class SubWriterTask(Process):
# number_of_docs_in_file). Pass those two pieces of information as
# arguments to _process_file().
self._process_file(*jobinfo)
+ # jobqueue.task_done()
if not self.running:
# I was cancelled, so I'll cancel my underlying writer
@@ -273,8 +274,11 @@ class MpWriter(SegmentWriter):
# Pull a (run_file_name, fieldnames, segment) tuple off the result
# queue for each sub-task, representing the final results of the task
results = []
- for task in self.tasks:
- results.append(self.resultqueue.get(timeout=5))
+ for _ in self.tasks:
+ try:
+ results.append(self.resultqueue.get(timeout=1))
+ except queue.Empty:
+ pass
if self.multisegment:
# If we're not merging the segments, we don't care about the runname
diff --git a/tests/test_mpwriter.py b/tests/test_mpwriter.py
index 421bcce..510fed0 100644
--- a/tests/test_mpwriter.py
+++ b/tests/test_mpwriter.py
@@ -5,7 +5,7 @@ from collections import deque
import pytest
from whoosh import fields, query
-from whoosh.compat import u, izip, xrange, permutations
+from whoosh.compat import u, izip, xrange, permutations, text_type
from whoosh.util.numeric import length_to_byte, byte_to_length
from whoosh.util.testing import TempIndex
@@ -271,7 +271,7 @@ def test_finish_segment():
w = MpWriter(ix, procs=2, batchsize=1, multisegment=False,
limitmb=0.00001)
- for i in range(9):
- w.add_document(a=u(chr(65 + i) * 50))
+ for i in range(100):
+ w.add_document(a=text_type(i) * 10)
w.commit()
|