Capture output from subprocs during test, and display on failure

This probably needs some more thought on synchronisation so we don't miss
critical bits of output as the test ends.
pull/37/head
Thomas Kluyver 13 years ago
parent ba5fae3f70
commit 7d98362972

@ -28,10 +28,13 @@ from __future__ import print_function
# Stdlib
import glob
from io import BytesIO
import os
import os.path as path
import re
from select import select
import sys
from threading import Thread, Lock, Event
import warnings
# Now, proceed to import nose itself
@ -40,6 +43,7 @@ from nose.plugins.xunit import Xunit
from nose import SkipTest
from nose.core import TestProgram
from nose.plugins import Plugin
from nose.util import safe_str
# Our own imports
from IPython.utils.importstring import import_item
@ -353,6 +357,89 @@ class ExclusionPlugin(Plugin):
return None
class StreamCapturer(Thread):
started = False
def __init__(self):
super(StreamCapturer, self).__init__()
self.streams = []
self.buffer = BytesIO()
self.streams_lock = Lock()
self.buffer_lock = Lock()
self.stream_added = Event()
self.stop = Event()
def run(self):
self.started = True
while not self.stop.is_set():
with self.streams_lock:
streams = self.streams
if not streams:
self.stream_added.wait(timeout=1)
self.stream_added.clear()
continue
ready = select(streams, [], [], 0.5)[0]
with self.buffer_lock:
for fd in ready:
self.buffer.write(os.read(fd, 1024))
def add_stream(self, fd):
with self.streams_lock:
self.streams.append(fd)
self.stream_added.set()
def remove_stream(self, fd):
with self.streams_lock:
self.streams.remove(fd)
def reset_buffer(self):
with self.buffer_lock:
self.buffer.truncate(0)
self.buffer.seek(0)
def get_buffer(self):
with self.buffer_lock:
return self.buffer.getvalue()
def ensure_started(self):
if not self.started:
self.start()
class SubprocessStreamCapturePlugin(Plugin):
name='subprocstreams'
def __init__(self):
Plugin.__init__(self)
self.stream_capturer = StreamCapturer()
# This is ugly, but distant parts of the test machinery need to be able
# to add streams, so we make the object globally accessible.
nose.ipy_stream_capturer = self.stream_capturer
def configure(self, options, config):
Plugin.configure(self, options, config)
# Override nose trying to disable plugin.
self.enabled = True
def startTest(self, test):
# Reset log capture
self.stream_capturer.reset_buffer()
def formatFailure(self, test, err):
# Show output
ec, ev, tb = err
ev = safe_str(ev)
out = [ev, '>> begin captured subprocess output <<',
self.stream_capturer.get_buffer().decode('utf-8', 'replace'),
'>> end captured subprocess output <<']
return ec, '\n'.join(out), tb
formatError = formatFailure
def finalize(self, result):
self.stream_capturer.stop.set()
self.stream_capturer.join()
def run_iptest():
"""Run the IPython test suite using nose.
@ -407,7 +494,8 @@ def run_iptest():
# use our plugin for doctesting. It will remove the standard doctest plugin
# if it finds it enabled
plugins = [ExclusionPlugin(section.excludes), IPythonDoctest(), KnownFailure()]
plugins = [ExclusionPlugin(section.excludes), IPythonDoctest(), KnownFailure(),
SubprocessStreamCapturePlugin() ]
# Use working directory set by parent process (see iptestcontroller)
if 'IPTEST_WORKING_DIR' in os.environ:

Loading…
Cancel
Save