From acd00de9dd982548638869fff2e8d97edba948b5 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Tue, 29 Oct 2013 12:24:24 -0700 Subject: [PATCH] Simplify StreamCapturer for subprocess testing Rather than using a transient pipe for each subprocess started, the StreamCapturer now makes a single pipe, and subprocesses redirect their output to it. So long as this works on Windows (I've done brief testing, and os.pipe() seems to be functional), this will hopefully make this much more robust. The recent failures in ShiningPanda on IPython.parallel have been caused by StreamCapturer. --- IPython/testing/iptest.py | 45 +++++++++------------------------------ 1 file changed, 10 insertions(+), 35 deletions(-) diff --git a/IPython/testing/iptest.py b/IPython/testing/iptest.py index e94540a04..e95109468 100644 --- a/IPython/testing/iptest.py +++ b/IPython/testing/iptest.py @@ -365,47 +365,22 @@ class StreamCapturer(Thread): super(StreamCapturer, self).__init__() self.streams = [] self.buffer = BytesIO() - self.streams_lock = Lock() + self.readfd, self.writefd = os.pipe() self.buffer_lock = Lock() - self.stream_added = Event() self.stop = Event() def run(self): self.started = True + while not self.stop.is_set(): - with self.streams_lock: - streams = self.streams - - if not streams: - self.stream_added.wait(timeout=1) - self.stream_added.clear() - continue - - ready = select(streams, [], [], 0.5)[0] - dead = [] - with self.buffer_lock: - for fd in ready: - try: - self.buffer.write(os.read(fd, 1024)) - except OSError as e: - import errno - if e.errno == errno.EBADF: - dead.append(fd) - else: - raise - - with self.streams_lock: - for fd in dead: - self.streams.remove(fd) - - def add_stream(self, fd): - with self.streams_lock: - self.streams.append(fd) - self.stream_added.set() + ready = select([self.readfd], [], [], 1)[0] + + if ready: + with self.buffer_lock: + self.buffer.write(os.read(self.readfd, 1024)) - def remove_stream(self, fd): - with self.streams_lock: - self.streams.remove(fd) + os.close(self.readfd) + os.close(self.writefd) def reset_buffer(self): with self.buffer_lock: @@ -426,7 +401,7 @@ class SubprocessStreamCapturePlugin(Plugin): Plugin.__init__(self) self.stream_capturer = StreamCapturer() # This is ugly, but distant parts of the test machinery need to be able - # to add streams, so we make the object globally accessible. + # to redirect streams, so we make the object globally accessible. nose.ipy_stream_capturer = self.stream_capturer def configure(self, options, config):