You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
433 lines
17 KiB
433 lines
17 KiB
# coding=utf-8
|
|
"""
|
|
Aaron Buchanan
|
|
Nov. 2012
|
|
|
|
Plug-in for py.test for reporting to TeamCity server
|
|
Report results to TeamCity during test execution for immediate reporting
|
|
when using TeamCity.
|
|
|
|
This should be installed as a py.test plugin and will be automatically enabled by running
|
|
tests under TeamCity build.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import traceback
|
|
from datetime import timedelta
|
|
|
|
from teamcity import diff_tools
|
|
from teamcity import is_running_under_teamcity
|
|
from teamcity.common import convert_error_to_string, dump_test_stderr, dump_test_stdout
|
|
from teamcity.messages import TeamcityServiceMessages
|
|
|
|
diff_tools.patch_unittest_diff()
|
|
_ASSERTION_FAILURE_KEY = '_teamcity_assertion_failure'
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
group = parser.getgroup("terminal reporting", "reporting", after="general")
|
|
|
|
group._addoption('--teamcity', action="count",
|
|
dest="teamcity", default=0, help="force output of JetBrains TeamCity service messages")
|
|
group._addoption('--no-teamcity', action="count",
|
|
dest="no_teamcity", default=0, help="disable output of JetBrains TeamCity service messages")
|
|
parser.addoption('--jb-swapdiff', action="store_true", dest="swapdiff", default=False, help="Swap actual/expected in diff")
|
|
|
|
kwargs = {"help": "skip output of passed tests for JetBrains TeamCity service messages"}
|
|
kwargs.update({"type": "bool"})
|
|
|
|
parser.addini("skippassedoutput", **kwargs)
|
|
parser.addini("swapdiff", **kwargs)
|
|
|
|
|
|
def pytest_configure(config):
|
|
if config.option.no_teamcity >= 1:
|
|
enabled = False
|
|
elif config.option.teamcity >= 1:
|
|
enabled = True
|
|
else:
|
|
enabled = is_running_under_teamcity()
|
|
|
|
if enabled:
|
|
output_capture_enabled = getattr(config.option, 'capture', 'fd') != 'no'
|
|
coverage_controller = _get_coverage_controller(config)
|
|
skip_passed_output = bool(config.getini('skippassedoutput'))
|
|
|
|
config.option.verbose = 2 # don't truncate assert explanations
|
|
config._teamcityReporting = EchoTeamCityMessages(
|
|
output_capture_enabled,
|
|
coverage_controller,
|
|
skip_passed_output,
|
|
bool(config.getini('swapdiff') or config.option.swapdiff)
|
|
)
|
|
config.pluginmanager.register(config._teamcityReporting)
|
|
|
|
|
|
def pytest_unconfigure(config):
|
|
teamcity_reporting = getattr(config, '_teamcityReporting', None)
|
|
if teamcity_reporting:
|
|
del config._teamcityReporting
|
|
config.pluginmanager.unregister(teamcity_reporting)
|
|
|
|
|
|
def _get_coverage_controller(config):
|
|
cov_plugin = config.pluginmanager.getplugin('_cov')
|
|
if not cov_plugin:
|
|
return None
|
|
|
|
return cov_plugin.cov_controller
|
|
|
|
|
|
class EchoTeamCityMessages(object):
|
|
def __init__(self, output_capture_enabled, coverage_controller, skip_passed_output, swap_diff):
|
|
self.coverage_controller = coverage_controller
|
|
self.output_capture_enabled = output_capture_enabled
|
|
self.skip_passed_output = skip_passed_output
|
|
|
|
self.teamcity = TeamcityServiceMessages()
|
|
self.test_start_reported_mark = set()
|
|
self.current_test_item = None
|
|
|
|
self.max_reported_output_size = 1 * 1024 * 1024
|
|
self.reported_output_chunk_size = 50000
|
|
self.swap_diff = swap_diff
|
|
|
|
def get_id_from_location(self, location):
|
|
if type(location) is not tuple or len(location) != 3 or not hasattr(location[2], "startswith"):
|
|
return None
|
|
|
|
def convert_file_to_id(filename):
|
|
filename = re.sub(r"\.pyc?$", "", filename)
|
|
return filename.replace(os.sep, ".").replace("/", ".")
|
|
|
|
def add_prefix_to_filename_id(filename_id, prefix):
|
|
dot_location = filename_id.rfind('.')
|
|
if dot_location <= 0 or dot_location >= len(filename_id) - 1:
|
|
return None
|
|
|
|
return filename_id[:dot_location + 1] + prefix + filename_id[dot_location + 1:]
|
|
|
|
pylint_prefix = '[pylint] '
|
|
if location[2].startswith(pylint_prefix):
|
|
id_from_file = convert_file_to_id(location[2][len(pylint_prefix):])
|
|
return id_from_file + ".Pylint"
|
|
|
|
if location[2] == "PEP8-check":
|
|
id_from_file = convert_file_to_id(location[0])
|
|
return id_from_file + ".PEP8"
|
|
|
|
return None
|
|
|
|
def format_test_id(self, nodeid, location):
|
|
id_from_location = self.get_id_from_location(location)
|
|
|
|
if id_from_location is not None:
|
|
return id_from_location
|
|
|
|
test_id = nodeid
|
|
|
|
if test_id:
|
|
if test_id.find("::") < 0:
|
|
test_id += "::top_level"
|
|
else:
|
|
test_id = "top_level"
|
|
|
|
first_bracket = test_id.find("[")
|
|
if first_bracket > 0:
|
|
# [] -> (), make it look like nose parameterized tests
|
|
params = "(" + test_id[first_bracket + 1:]
|
|
if params.endswith("]"):
|
|
params = params[:-1] + ")"
|
|
test_id = test_id[:first_bracket]
|
|
if test_id.endswith("::"):
|
|
test_id = test_id[:-2]
|
|
else:
|
|
params = ""
|
|
|
|
test_id = test_id.replace("::()::", "::")
|
|
test_id = re.sub(r"\.pyc?::", r"::", test_id)
|
|
test_id = test_id.replace(".", "_").replace(os.sep, ".").replace("/", ".").replace('::', '.')
|
|
|
|
if params:
|
|
params = params.replace(".", "_")
|
|
test_id += params
|
|
|
|
return test_id
|
|
|
|
def format_location(self, location):
|
|
if type(location) is tuple and len(location) == 3:
|
|
return "%s:%s (%s)" % (str(location[0]), str(location[1]), str(location[2]))
|
|
return str(location)
|
|
|
|
def pytest_collection_finish(self, session):
|
|
self.teamcity.testCount(len(session.items))
|
|
|
|
def pytest_runtest_logstart(self, nodeid, location):
|
|
# test name fetched from location passed as metainfo to PyCharm
|
|
# it will be used to run specific test
|
|
# See IDEA-176950, PY-31836
|
|
test_name = location[2]
|
|
if test_name:
|
|
test_name = str(test_name).split(".")[-1]
|
|
self.ensure_test_start_reported(self.format_test_id(nodeid, location), test_name)
|
|
|
|
def pytest_runtest_protocol(self, item):
|
|
self.current_test_item = item
|
|
return None # continue to next hook
|
|
|
|
def ensure_test_start_reported(self, test_id, metainfo=None):
|
|
if test_id not in self.test_start_reported_mark:
|
|
if self.output_capture_enabled:
|
|
capture_standard_output = "false"
|
|
else:
|
|
capture_standard_output = "true"
|
|
self.teamcity.testStarted(test_id, flowId=test_id, captureStandardOutput=capture_standard_output, metainfo=metainfo)
|
|
self.test_start_reported_mark.add(test_id)
|
|
|
|
def report_has_output(self, report):
|
|
for (secname, data) in report.sections:
|
|
if report.when in secname and ('stdout' in secname or 'stderr' in secname):
|
|
return True
|
|
return False
|
|
|
|
def report_test_output(self, report, test_id):
|
|
for (secname, data) in report.sections:
|
|
# https://github.com/JetBrains/teamcity-messages/issues/112
|
|
# CollectReport didn't have 'when' property, but now it has.
|
|
# But we still need output on 'collect' state
|
|
if hasattr(report, "when") and report.when not in secname and report.when != 'collect':
|
|
continue
|
|
if not data:
|
|
continue
|
|
|
|
if 'stdout' in secname:
|
|
dump_test_stdout(self.teamcity, test_id, test_id, data)
|
|
elif 'stderr' in secname:
|
|
dump_test_stderr(self.teamcity, test_id, test_id, data)
|
|
|
|
def report_test_finished(self, test_id, duration=None):
|
|
self.teamcity.testFinished(test_id, testDuration=duration, flowId=test_id)
|
|
self.test_start_reported_mark.remove(test_id)
|
|
|
|
def report_test_failure(self, test_id, report, message=None, report_output=True):
|
|
if hasattr(report, 'duration'):
|
|
duration = timedelta(seconds=report.duration)
|
|
else:
|
|
duration = None
|
|
|
|
if message is None:
|
|
message = self.format_location(report.location)
|
|
|
|
self.ensure_test_start_reported(test_id)
|
|
if report_output:
|
|
self.report_test_output(report, test_id)
|
|
|
|
diff_error = None
|
|
try:
|
|
err_message = str(report.longrepr.reprcrash.message)
|
|
diff_name = diff_tools.EqualsAssertionError.__name__
|
|
# There is a string like "foo.bar.DiffError: [serialized_data]"
|
|
if diff_name in err_message:
|
|
serialized_data = err_message[err_message.index(diff_name) + len(diff_name) + 1:]
|
|
diff_error = diff_tools.deserialize_error(serialized_data)
|
|
|
|
assertion_tuple = getattr(self.current_test_item, _ASSERTION_FAILURE_KEY, None)
|
|
if assertion_tuple:
|
|
op, left, right = assertion_tuple
|
|
if self.swap_diff:
|
|
left, right = right, left
|
|
diff_error = diff_tools.EqualsAssertionError(expected=right, actual=left)
|
|
except Exception:
|
|
pass
|
|
|
|
if not diff_error:
|
|
from .jb_local_exc_store import get_exception
|
|
diff_error = get_exception()
|
|
|
|
if diff_error:
|
|
# Cut everything after postfix: it is internal view of DiffError
|
|
strace = str(report.longrepr)
|
|
data_postfix = "_ _ _ _ _"
|
|
# Error message in pytest must be in "file.py:22 AssertionError" format
|
|
# This message goes to strace
|
|
# With custom error we must add real exception class explicitly
|
|
if data_postfix in strace:
|
|
strace = strace[0:strace.index(data_postfix)].strip()
|
|
if strace.endswith(":") and diff_error.real_exception:
|
|
strace += " " + type(diff_error.real_exception).__name__
|
|
self.teamcity.testFailed(test_id, diff_error.msg or message, strace,
|
|
flowId=test_id,
|
|
comparison_failure=diff_error
|
|
)
|
|
else:
|
|
self.teamcity.testFailed(test_id, message, str(report.longrepr), flowId=test_id)
|
|
self.report_test_finished(test_id, duration)
|
|
|
|
def report_test_skip(self, test_id, report):
|
|
if type(report.longrepr) is tuple and len(report.longrepr) == 3:
|
|
reason = report.longrepr[2]
|
|
else:
|
|
reason = str(report.longrepr)
|
|
|
|
if hasattr(report, 'duration'):
|
|
duration = timedelta(seconds=report.duration)
|
|
else:
|
|
duration = None
|
|
|
|
self.ensure_test_start_reported(test_id)
|
|
self.report_test_output(report, test_id)
|
|
self.teamcity.testIgnored(test_id, reason, flowId=test_id)
|
|
self.report_test_finished(test_id, duration)
|
|
|
|
def pytest_assertrepr_compare(self, config, op, left, right):
|
|
setattr(self.current_test_item, _ASSERTION_FAILURE_KEY, (op, left, right))
|
|
|
|
def pytest_runtest_logreport(self, report):
|
|
"""
|
|
:type report: _pytest.runner.TestReport
|
|
"""
|
|
test_id = self.format_test_id(report.nodeid, report.location)
|
|
|
|
duration = timedelta(seconds=report.duration)
|
|
|
|
if report.passed:
|
|
# Do not report passed setup/teardown if no output
|
|
if report.when == 'call':
|
|
self.ensure_test_start_reported(test_id)
|
|
if not self.skip_passed_output:
|
|
self.report_test_output(report, test_id)
|
|
self.report_test_finished(test_id, duration)
|
|
else:
|
|
if self.report_has_output(report) and not self.skip_passed_output:
|
|
block_name = "test " + report.when
|
|
self.teamcity.blockOpened(block_name, flowId=test_id)
|
|
self.report_test_output(report, test_id)
|
|
self.teamcity.blockClosed(block_name, flowId=test_id)
|
|
elif report.failed:
|
|
if report.when == 'call':
|
|
self.report_test_failure(test_id, report)
|
|
elif report.when == 'setup':
|
|
if self.report_has_output(report):
|
|
self.teamcity.blockOpened("test setup", flowId=test_id)
|
|
self.report_test_output(report, test_id)
|
|
self.teamcity.blockClosed("test setup", flowId=test_id)
|
|
|
|
self.report_test_failure(test_id, report, message="test setup failed", report_output=False)
|
|
elif report.when == 'teardown':
|
|
# Report failed teardown as a separate test as original test is already finished
|
|
self.report_test_failure(test_id + "_teardown", report)
|
|
elif report.skipped:
|
|
self.report_test_skip(test_id, report)
|
|
|
|
def pytest_collectreport(self, report):
|
|
test_id = self.format_test_id(report.nodeid, report.location) + "_collect"
|
|
|
|
if report.failed:
|
|
self.report_test_failure(test_id, report)
|
|
elif report.skipped:
|
|
self.report_test_skip(test_id, report)
|
|
|
|
def pytest_terminal_summary(self):
|
|
if self.coverage_controller is not None:
|
|
try:
|
|
self._report_coverage()
|
|
except Exception:
|
|
tb = traceback.format_exc()
|
|
self.teamcity.customMessage("Coverage statistics reporting failed", "ERROR", errorDetails=tb)
|
|
|
|
def _report_coverage(self):
|
|
from coverage.misc import NotPython
|
|
from coverage.results import Numbers
|
|
|
|
class _Reporter(object):
|
|
def __init__(self, coverage, config):
|
|
try:
|
|
from coverage.report import Reporter
|
|
except ImportError:
|
|
# Support for coverage >= 5.0.1.
|
|
from coverage.report import get_analysis_to_report
|
|
|
|
class Reporter(object):
|
|
|
|
def __init__(self, coverage, config):
|
|
self.coverage = coverage
|
|
self.config = config
|
|
self._file_reporters = []
|
|
|
|
def find_file_reporters(self, morfs):
|
|
return [fr for fr, _ in get_analysis_to_report(self.coverage, morfs)]
|
|
|
|
self._reporter = Reporter(coverage, config)
|
|
|
|
def find_file_reporters(self, morfs):
|
|
self.file_reporters = self._reporter.find_file_reporters(morfs)
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self._reporter, name)
|
|
|
|
class _CoverageReporter(_Reporter):
|
|
def __init__(self, coverage, config, messages):
|
|
super(_CoverageReporter, self).__init__(coverage, config)
|
|
|
|
if hasattr(coverage, 'data'):
|
|
self.branches = coverage.data.has_arcs()
|
|
else:
|
|
self.branches = coverage.get_data().has_arcs()
|
|
self.messages = messages
|
|
|
|
def report(self, morfs, outfile=None):
|
|
if hasattr(self, 'find_code_units'):
|
|
self.find_code_units(morfs)
|
|
else:
|
|
self.find_file_reporters(morfs)
|
|
|
|
total = Numbers()
|
|
|
|
if hasattr(self, 'code_units'):
|
|
units = self.code_units
|
|
else:
|
|
units = self.file_reporters
|
|
|
|
for cu in units:
|
|
try:
|
|
analysis = self.coverage._analyze(cu)
|
|
nums = analysis.numbers
|
|
total += nums
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except Exception:
|
|
if self.config.ignore_errors:
|
|
continue
|
|
|
|
err = sys.exc_info()
|
|
typ, msg = err[:2]
|
|
if typ is NotPython and not cu.should_be_python():
|
|
continue
|
|
|
|
test_id = cu.name
|
|
details = convert_error_to_string(err)
|
|
|
|
self.messages.testStarted(test_id, flowId=test_id)
|
|
self.messages.testFailed(test_id, message="Coverage analysis failed", details=details, flowId=test_id)
|
|
self.messages.testFinished(test_id, flowId=test_id)
|
|
|
|
if total.n_files > 0:
|
|
covered = total.n_executed
|
|
total_statements = total.n_statements
|
|
|
|
if self.branches:
|
|
covered += total.n_executed_branches
|
|
total_statements += total.n_branches
|
|
|
|
self.messages.buildStatisticLinesCovered(covered)
|
|
self.messages.buildStatisticTotalLines(total_statements)
|
|
self.messages.buildStatisticLinesUncovered(total_statements - covered)
|
|
reporter = _CoverageReporter(
|
|
self.coverage_controller.cov,
|
|
self.coverage_controller.cov.config,
|
|
self.teamcity,
|
|
)
|
|
reporter.report(None)
|