This week, in stumping Ross for a couple of hours...
At Snap Tech, we've been breaking up some of our large-file-parsing components into smaller units (for testability, reusability, and to be able to join them up nicely with simple Unix pipes). This means they've been getting some unit-test loving as well. One component does some lengthy tasks over a small thread pool. Here's the typical threading + queues Python example:
import queue import threading q = queue.Queue() threads =  num_threads = 5 def worker(): while True: item = q.get() if item is None: break do_some_lengthy_task(item) q.task_done() # Start some threads for i in range(0, num_threads): thread = threading.Thread(target=worker) thread.start() threads.append(thread) # Plonk items into the queue for j in range(0, num_threads * 10): q.put(j) q.join() # Block until all threads are waiting on an empty queue # Signal all threads to break and join the main thread for i in range(0, num_threads): q.put(None) for t in threads: t.join()
This works fine, until we attempted to run it under pytest. The only hint I could get out was that my KeyboardInterrupt happened at the
It turned out that the
if item is None: break bit also needs to signal that the task is done, because
q.join() essentially blocks until all tasks have been acknowledged. Internally,
q.put() increments a counter and
q.task_done() decrements it.
if item is None: q.task_done() break
Still not entirely sure as to why this only seems to happen when running under pytest though...
Subscribe to Ross's codelog
Get the latest posts delivered right to your inbox