contains trivial static inline functions and macros, and, therefore,
including it does not make babeltrace a derivative work on this header.
Please refer to the LGPLv2.1 license for details.
+
+* BSD 2-Clause
+
+The files in tests/utils/python/tap/ are licensed under the BSD 2-Clause. They
+are only used when running the tests in the source tree.
]
)
-AS_IF([test "x$enable_python_bindings_tests" = xyes],
- [
- AM_CHECK_PYTHON_TAPPY([PYTHON])
- AS_IF([test "x$PYTHON_TAPPY_EXISTS" = xno],
- [AC_MSG_ERROR([You need the tappy Python project to test the Python bindings (see <https://github.com/python-tap/tappy>)])]
- )
- ]
-)
-
AS_IF([test "x$enable_debug_info" = xyes],
[
# Check if libelf and libdw are present
SUBDIRS = tap
-EXTRA_DIST = python/testrunner.py
+EXTRA_DIST = python/testrunner.py \
+ python/tap/adapter.py \
+ python/tap/directive.py \
+ python/tap/formatter.py \
+ python/tap/i18n.py \
+ python/tap/__init__.py \
+ python/tap/LICENSE \
+ python/tap/line.py \
+ python/tap/loader.py \
+ python/tap/main.py \
+ python/tap/parser.py \
+ python/tap/rules.py \
+ python/tap/runner.py \
+ python/tap/tracker.py
--- /dev/null
+Copyright (c) 2016, Matt Layman and contributors. See AUTHORS for more details.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
--- /dev/null
+# Copyright (c) 2016, Matt Layman
+
+from .runner import TAPTestRunner
+
+__all__ = ['TAPTestRunner']
+__version__ = '2.1'
--- /dev/null
+# Copyright (c) 2016, Matt Layman
+
+
+class Adapter(object):
+ """The adapter processes a TAP test line and updates a unittest result.
+
+ It is an alternative to TestCase to collect TAP results.
+ """
+ failureException = AssertionError
+
+ def __init__(self, filename, line):
+ self._filename = filename
+ self._line = line
+
+ def shortDescription(self):
+ """Get the short description for verbeose results."""
+ return self._line.description
+
+ def __call__(self, result):
+ """Update test result with the lines in the TAP file.
+
+ Provide the interface that TestCase provides to a suite or runner.
+ """
+ result.startTest(self)
+
+ if self._line.skip:
+ result.addSkip(None, self._line.directive.reason)
+ return
+
+ if self._line.todo:
+ if self._line.ok:
+ result.addUnexpectedSuccess(self)
+ else:
+ result.addExpectedFailure(self, (Exception, Exception(), None))
+ return
+
+ if self._line.ok:
+ result.addSuccess(self)
+ else:
+ self.addFailure(result)
+
+ def addFailure(self, result):
+ """Add a failure to the result."""
+ result.addFailure(self, (Exception, Exception(), None))
+ # Since TAP will not provide assertion data, clean up the assertion
+ # section so it is not so spaced out.
+ test, err = result.failures[-1]
+ result.failures[-1] = (test, '')
+
+ def __repr__(self):
+ return '<file={filename}>'.format(filename=self._filename)
--- /dev/null
+# Copyright (c) 2016, Matt Layman
+
+import re
+
+
+class Directive(object):
+ """A representation of a result line directive."""
+
+ skip_pattern = re.compile(
+ r"""^SKIP\S*
+ (?P<whitespace>\s*) # Optional whitespace.
+ (?P<reason>.*) # Slurp up the rest.""",
+ re.IGNORECASE | re.VERBOSE)
+ todo_pattern = re.compile(
+ r"""^TODO\b # The directive name
+ (?P<whitespace>\s*) # Immediately following must be whitespace.
+ (?P<reason>.*) # Slurp up the rest.""",
+ re.IGNORECASE | re.VERBOSE)
+
+ def __init__(self, text):
+ """Initialize the directive by parsing the text.
+
+ The text is assumed to be everything after a '#\s*' on a result line.
+ """
+ self._text = text
+ self._skip = False
+ self._todo = False
+ self._reason = None
+
+ match = self.skip_pattern.match(text)
+ if match:
+ self._skip = True
+ self._reason = match.group('reason')
+
+ match = self.todo_pattern.match(text)
+ if match:
+ if match.group('whitespace'):
+ self._todo = True
+ else:
+ # Catch the case where the directive has no descriptive text.
+ if match.group('reason') == '':
+ self._todo = True
+ self._reason = match.group('reason')
+
+ @property
+ def text(self):
+ """Get the entire text."""
+ return self._text
+
+ @property
+ def skip(self):
+ """Check if the directive is a SKIP type."""
+ return self._skip
+
+ @property
+ def todo(self):
+ """Check if the directive is a TODO type."""
+ return self._todo
+
+ @property
+ def reason(self):
+ """Get the reason for the directive."""
+ return self._reason
--- /dev/null
+import traceback
+
+
+def format_exception(exception):
+ """Format an exception as diagnostics output.
+
+ exception is the tuple as expected from sys.exc_info.
+ """
+ exception_lines = traceback.format_exception(*exception)
+ # The lines returned from format_exception do not strictly contain
+ # one line per element in the list (i.e. some elements have new
+ # line characters in the middle). Normalize that oddity.
+ lines = ''.join(exception_lines).splitlines(True)
+ return format_as_diagnostics(lines)
+
+
+def format_as_diagnostics(lines):
+ """Format the lines as diagnostics output by prepending the diagnostic #.
+
+ This function makes no assumptions about the line endings.
+ """
+ return ''.join(['# ' + line for line in lines])
--- /dev/null
+# Copyright (c) 2016, Matt Layman
+
+import gettext
+import os
+
+localedir = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'locale')
+translate = gettext.translation('tappy', localedir, fallback=True)
+_ = translate.gettext
--- /dev/null
+# Copyright (c) 2016, Matt Layman
+
+
+class Line(object):
+ """Base type for TAP data.
+
+ TAP is a line based protocol. Thus, the most primitive type is a line.
+ """
+ @property
+ def category(self):
+ raise NotImplementedError
+
+
+class Result(Line):
+ """Information about an individual test line."""
+
+ def __init__(
+ self, ok, number=None, description='', directive=None,
+ diagnostics=None):
+ self._ok = ok
+ if number:
+ self._number = int(number)
+ else:
+ # The number may be an empty string so explicitly set to None.
+ self._number = None
+ self._description = description
+ self.directive = directive
+ self.diagnostics = diagnostics
+
+ @property
+ def category(self):
+ """:returns: ``test``"""
+ return 'test'
+
+ @property
+ def ok(self):
+ """Get the ok status.
+
+ :rtype: bool
+ """
+ return self._ok
+
+ @property
+ def number(self):
+ """Get the test number.
+
+ :rtype: int
+ """
+ return self._number
+
+ @property
+ def description(self):
+ """Get the description."""
+ return self._description
+
+ @property
+ def skip(self):
+ """Check if this test was skipped.
+
+ :rtype: bool
+ """
+ return self.directive.skip
+
+ @property
+ def todo(self):
+ """Check if this test was a TODO.
+
+ :rtype: bool
+ """
+ return self.directive.todo
+
+ def __str__(self):
+ is_not = ''
+ if not self.ok:
+ is_not = 'not '
+ directive = ''
+ if self.directive is not None and self.directive.text:
+ directive = ' # {0}'.format(self.directive.text)
+ diagnostics = ''
+ if self.diagnostics is not None:
+ diagnostics = '\n' + self.diagnostics.rstrip()
+ return "{0}ok {1} - {2}{3}{4}".format(
+ is_not, self.number, self.description, directive, diagnostics)
+
+
+class Plan(Line):
+ """A plan line to indicate how many tests to expect."""
+
+ def __init__(self, expected_tests, directive=None):
+ self._expected_tests = expected_tests
+ self.directive = directive
+
+ @property
+ def category(self):
+ """:returns: ``plan``"""
+ return 'plan'
+
+ @property
+ def expected_tests(self):
+ """Get the number of expected tests.
+
+ :rtype: int
+ """
+ return self._expected_tests
+
+ @property
+ def skip(self):
+ """Check if this plan should skip the file.
+
+ :rtype: bool
+ """
+ return self.directive.skip
+
+
+class Diagnostic(Line):
+ """A diagnostic line (i.e. anything starting with a hash)."""
+
+ def __init__(self, text):
+ self._text = text
+
+ @property
+ def category(self):
+ """:returns: ``diagnostic``"""
+ return 'diagnostic'
+
+ @property
+ def text(self):
+ """Get the text."""
+ return self._text
+
+
+class Bail(Line):
+ """A bail out line (i.e. anything starting with 'Bail out!')."""
+
+ def __init__(self, reason):
+ self._reason = reason
+
+ @property
+ def category(self):
+ """:returns: ``bail``"""
+ return 'bail'
+
+ @property
+ def reason(self):
+ """Get the reason."""
+ return self._reason
+
+
+class Version(Line):
+ """A version line (i.e. of the form 'TAP version 13')."""
+
+ def __init__(self, version):
+ self._version = version
+
+ @property
+ def category(self):
+ """:returns: ``version``"""
+ return 'version'
+
+ @property
+ def version(self):
+ """Get the version number.
+
+ :rtype: int
+ """
+ return self._version
+
+
+class Unknown(Line):
+ """A line that represents something that is not a known TAP line.
+
+ This exists for the purpose of a Null Object pattern.
+ """
+ @property
+ def category(self):
+ """:returns: ``unknown``"""
+ return 'unknown'
--- /dev/null
+# Copyright (c) 2016, Matt Layman
+
+import os
+import unittest
+
+from tap.adapter import Adapter
+from tap.parser import Parser
+from tap.rules import Rules
+
+
+class Loader(object):
+ """Load TAP lines into unittest-able objects."""
+
+ ignored_lines = set(['diagnostic', 'unknown'])
+
+ def __init__(self):
+ self._parser = Parser()
+
+ def load(self, files):
+ """Load any files found into a suite.
+
+ Any directories are walked and their files are added as TAP files.
+
+ :returns: A ``unittest.TestSuite`` instance
+ """
+ suite = unittest.TestSuite()
+ for filepath in files:
+ if os.path.isdir(filepath):
+ self._find_tests_in_directory(filepath, suite)
+ else:
+ suite.addTest(self.load_suite_from_file(filepath))
+ return suite
+
+ def load_suite_from_file(self, filename):
+ """Load a test suite with test lines from the provided TAP file.
+
+ :returns: A ``unittest.TestSuite`` instance
+ """
+ suite = unittest.TestSuite()
+ rules = Rules(filename, suite)
+
+ if not os.path.exists(filename):
+ rules.handle_file_does_not_exist()
+ return suite
+
+ line_generator = self._parser.parse_file(filename)
+ return self._load_lines(filename, line_generator, suite, rules)
+
+ def load_suite_from_stdin(self):
+ """Load a test suite with test lines from the TAP stream on STDIN.
+
+ :returns: A ``unittest.TestSuite`` instance
+ """
+ suite = unittest.TestSuite()
+ rules = Rules('stream', suite)
+ line_generator = self._parser.parse_stdin()
+ return self._load_lines('stream', line_generator, suite, rules)
+
+ def _find_tests_in_directory(self, directory, suite):
+ """Find test files in the directory and add them to the suite."""
+ for dirpath, dirnames, filenames in os.walk(directory):
+ for filename in filenames:
+ filepath = os.path.join(dirpath, filename)
+ suite.addTest(self.load_suite_from_file(filepath))
+
+ def _load_lines(self, filename, line_generator, suite, rules):
+ """Load a suite with lines produced by the line generator."""
+ line_counter = 0
+ for line in line_generator:
+ line_counter += 1
+
+ if line.category in self.ignored_lines:
+ continue
+
+ if line.category == 'test':
+ suite.addTest(Adapter(filename, line))
+ rules.saw_test()
+ elif line.category == 'plan':
+ if line.skip:
+ rules.handle_skipping_plan(line)
+ return suite
+ rules.saw_plan(line, line_counter)
+ elif line.category == 'bail':
+ rules.handle_bail(line)
+ return suite
+ elif line.category == 'version':
+ rules.saw_version_at(line_counter)
+
+ rules.check(line_counter)
+ return suite
--- /dev/null
+# Copyright (c) 2016, Matt Layman
+
+import argparse
+import sys
+import unittest
+
+from tap.i18n import _
+from tap.loader import Loader
+
+
+def main(argv=sys.argv, stream=sys.stderr):
+ """Entry point for ``tappy`` command."""
+ args = parse_args(argv)
+ suite = build_suite(args)
+ runner = unittest.TextTestRunner(verbosity=args.verbose, stream=stream)
+ result = runner.run(suite)
+
+ return get_status(result)
+
+
+def build_suite(args):
+ """Build a test suite by loading TAP files or a TAP stream."""
+ loader = Loader()
+ if len(args.files) == 0 or args.files[0] == '-':
+ suite = loader.load_suite_from_stdin()
+ else:
+ suite = loader.load(args.files)
+ return suite
+
+
+def parse_args(argv):
+ description = _('A TAP consumer for Python')
+ epilog = _(
+ 'When no files are given or a dash (-) is used for the file name, '
+ 'tappy will read a TAP stream from STDIN.')
+ parser = argparse.ArgumentParser(description=description, epilog=epilog)
+ parser.add_argument(
+ 'files', metavar='FILE', nargs='*', help=_(
+ 'A file containing TAP output. Any directories listed will be '
+ 'scanned for files to include as TAP files.'))
+ parser.add_argument(
+ '-v', '--verbose', action='store_const', default=1, const=2,
+ help=_('use verbose messages'))
+
+ # argparse expects the executable to be removed from argv.
+ args = parser.parse_args(argv[1:])
+
+ # When no files are provided, the user wants to use a TAP stream on STDIN.
+ # But they probably didn't mean it if there is no pipe connected.
+ # In that case, print the help and exit.
+ if not args.files and sys.stdin.isatty():
+ sys.exit(parser.print_help())
+
+ return args
+
+
+def get_status(result):
+ """Get a return status from the result."""
+ if result.wasSuccessful():
+ return 0
+ else:
+ return 1
--- /dev/null
+# Copyright (c) 2016, Matt Layman
+
+from io import StringIO
+import re
+import sys
+
+from tap.directive import Directive
+from tap.i18n import _
+from tap.line import Bail, Diagnostic, Plan, Result, Unknown, Version
+
+
+class Parser(object):
+ """A parser for TAP files and lines."""
+
+ # ok and not ok share most of the same characteristics.
+ result_base = r"""
+ \s* # Optional whitespace.
+ (?P<number>\d*) # Optional test number.
+ \s* # Optional whitespace.
+ (?P<description>[^#]*) # Optional description before #.
+ \#? # Optional directive marker.
+ \s* # Optional whitespace.
+ (?P<directive>.*) # Optional directive text.
+ """
+ ok = re.compile(r'^ok' + result_base, re.VERBOSE)
+ not_ok = re.compile(r'^not\ ok' + result_base, re.VERBOSE)
+ plan = re.compile(r"""
+ ^1..(?P<expected>\d+) # Match the plan details.
+ [^#]* # Consume any non-hash character to confirm only
+ # directives appear with the plan details.
+ \#? # Optional directive marker.
+ \s* # Optional whitespace.
+ (?P<directive>.*) # Optional directive text.
+ """, re.VERBOSE)
+ diagnostic = re.compile(r'^#')
+ bail = re.compile(r"""
+ ^Bail\ out!
+ \s* # Optional whitespace.
+ (?P<reason>.*) # Optional reason.
+ """, re.VERBOSE)
+ version = re.compile(r'^TAP version (?P<version>\d+)$')
+
+ TAP_MINIMUM_DECLARED_VERSION = 13
+
+ def parse_file(self, filename):
+ """Parse a TAP file to an iterable of tap.line.Line objects.
+
+ This is a generator method that will yield an object for each
+ parsed line. The file given by `filename` is assumed to exist.
+ """
+ return self.parse(open(filename, 'r'))
+
+ def parse_stdin(self):
+ """Parse a TAP stream from standard input.
+
+ Note: this has the side effect of closing the standard input
+ filehandle after parsing.
+ """
+ return self.parse(sys.stdin)
+
+ def parse_text(self, text):
+ """Parse a string containing one or more lines of TAP output."""
+ return self.parse(StringIO(text))
+
+ def parse(self, fh):
+ """Generate tap.line.Line objects, given a file-like object `fh`.
+
+ `fh` may be any object that implements both the iterator and
+ context management protocol (i.e. it can be used in both a
+ "with" statement and a "for...in" statement.)
+
+ Trailing whitespace and newline characters will be automatically
+ stripped from the input lines.
+ """
+ with fh:
+ for line in fh:
+ yield self.parse_line(line.rstrip())
+
+ def parse_line(self, text):
+ """Parse a line into whatever TAP category it belongs."""
+ match = self.ok.match(text)
+ if match:
+ return self._parse_result(True, match)
+
+ match = self.not_ok.match(text)
+ if match:
+ return self._parse_result(False, match)
+
+ if self.diagnostic.match(text):
+ return Diagnostic(text)
+
+ match = self.plan.match(text)
+ if match:
+ return self._parse_plan(match)
+
+ match = self.bail.match(text)
+ if match:
+ return Bail(match.group('reason'))
+
+ match = self.version.match(text)
+ if match:
+ return self._parse_version(match)
+
+ return Unknown()
+
+ def _parse_plan(self, match):
+ """Parse a matching plan line."""
+ expected_tests = int(match.group('expected'))
+ directive = Directive(match.group('directive'))
+
+ # Only SKIP directives are allowed in the plan.
+ if directive.text and not directive.skip:
+ return Unknown()
+
+ return Plan(expected_tests, directive)
+
+ def _parse_result(self, ok, match):
+ """Parse a matching result line into a result instance."""
+ return Result(
+ ok, match.group('number'), match.group('description').strip(),
+ Directive(match.group('directive')))
+
+ def _parse_version(self, match):
+ version = int(match.group('version'))
+ if version < self.TAP_MINIMUM_DECLARED_VERSION:
+ raise ValueError(_('It is an error to explicitly specify '
+ 'any version lower than 13.'))
+ return Version(version)
--- /dev/null
+# Copyright (c) 2016, Matt Layman
+
+from tap.adapter import Adapter
+from tap.directive import Directive
+from tap.i18n import _
+from tap.line import Result
+
+
+class Rules(object):
+
+ def __init__(self, filename, suite):
+ self._filename = filename
+ self._suite = suite
+ self._lines_seen = {'plan': [], 'test': 0, 'version': []}
+
+ def check(self, final_line_count):
+ """Check the status of all provided data and update the suite."""
+ if self._lines_seen['version']:
+ self._process_version_lines()
+ self._process_plan_lines(final_line_count)
+
+ def _process_version_lines(self):
+ """Process version line rules."""
+ if len(self._lines_seen['version']) > 1:
+ self._add_error(_('Multiple version lines appeared.'))
+ elif self._lines_seen['version'][0] != 1:
+ self._add_error(_('The version must be on the first line.'))
+
+ def _process_plan_lines(self, final_line_count):
+ """Process plan line rules."""
+ if not self._lines_seen['plan']:
+ self._add_error(_('Missing a plan.'))
+ return
+
+ if len(self._lines_seen['plan']) > 1:
+ self._add_error(_('Only one plan line is permitted per file.'))
+ return
+
+ plan, at_line = self._lines_seen['plan'][0]
+ if not self._plan_on_valid_line(at_line, final_line_count):
+ self._add_error(
+ _('A plan must appear at the beginning or end of the file.'))
+ return
+
+ if plan.expected_tests != self._lines_seen['test']:
+ self._add_error(_(
+ 'Expected {expected_count} tests '
+ 'but only {seen_count} ran.').format(
+ expected_count=plan.expected_tests,
+ seen_count=self._lines_seen['test']))
+
+ def _plan_on_valid_line(self, at_line, final_line_count):
+ """Check if a plan is on a valid line."""
+ # Put the common cases first.
+ if at_line == 1 or at_line == final_line_count:
+ return True
+
+ # The plan may only appear on line 2 if the version is at line 1.
+ after_version = (
+ self._lines_seen['version'] and
+ self._lines_seen['version'][0] == 1 and
+ at_line == 2)
+ if after_version:
+ return True
+
+ return False
+
+ def handle_bail(self, bail):
+ """Handle a bail line."""
+ self._add_error(_('Bailed: {reason}').format(reason=bail.reason))
+
+ def handle_file_does_not_exist(self):
+ """Handle a test file that does not exist."""
+ self._add_error(_('{filename} does not exist.').format(
+ filename=self._filename))
+
+ def handle_skipping_plan(self, skip_plan):
+ """Handle a plan that contains a SKIP directive."""
+ skip_line = Result(
+ True, None, skip_plan.directive.text, Directive('SKIP'))
+ self._suite.addTest(Adapter(self._filename, skip_line))
+
+ def saw_plan(self, plan, at_line):
+ """Record when a plan line was seen."""
+ self._lines_seen['plan'].append((plan, at_line))
+
+ def saw_test(self):
+ """Record when a test line was seen."""
+ self._lines_seen['test'] += 1
+
+ def saw_version_at(self, line_counter):
+ """Record when a version line was seen."""
+ self._lines_seen['version'].append(line_counter)
+
+ def _add_error(self, message):
+ """Add an error test to the suite."""
+ error_line = Result(False, None, message, Directive(''))
+ self._suite.addTest(Adapter(self._filename, error_line))
--- /dev/null
+# Copyright (c) 2016, Matt Layman
+
+import os
+from unittest import TextTestResult, TextTestRunner
+from unittest.runner import _WritelnDecorator
+import sys
+
+from tap import formatter
+from tap.i18n import _
+from tap.tracker import Tracker
+
+
+class TAPTestResult(TextTestResult):
+
+ FORMAT = None
+
+ def __init__(self, stream, descriptions, verbosity):
+ super(TAPTestResult, self).__init__(stream, descriptions, verbosity)
+
+ def stopTestRun(self):
+ """Once the test run is complete, generate each of the TAP files."""
+ super(TAPTestResult, self).stopTestRun()
+ self.tracker.generate_tap_reports()
+
+ def addError(self, test, err):
+ super(TAPTestResult, self).addError(test, err)
+ diagnostics = formatter.format_exception(err)
+ self.tracker.add_not_ok(
+ self._cls_name(test), self._description(test),
+ diagnostics=diagnostics)
+
+ def addFailure(self, test, err):
+ super(TAPTestResult, self).addFailure(test, err)
+ diagnostics = formatter.format_exception(err)
+ self.tracker.add_not_ok(
+ self._cls_name(test), self._description(test),
+ diagnostics=diagnostics)
+
+ def addSuccess(self, test):
+ super(TAPTestResult, self).addSuccess(test)
+ self.tracker.add_ok(self._cls_name(test), self._description(test))
+
+ def addSkip(self, test, reason):
+ super(TAPTestResult, self).addSkip(test, reason)
+ self.tracker.add_skip(
+ self._cls_name(test), self._description(test), reason)
+
+ def addExpectedFailure(self, test, err):
+ super(TAPTestResult, self).addExpectedFailure(test, err)
+ diagnostics = formatter.format_exception(err)
+ self.tracker.add_not_ok(
+ self._cls_name(test), self._description(test),
+ _('(expected failure)'), diagnostics=diagnostics)
+
+ def addUnexpectedSuccess(self, test):
+ super(TAPTestResult, self).addUnexpectedSuccess(test)
+ self.tracker.add_ok(self._cls_name(test), self._description(test),
+ _('(unexpected success)'))
+
+ def _cls_name(self, test):
+ return test.__class__.__name__
+
+ def _description(self, test):
+ if self.FORMAT:
+ try:
+ return self.FORMAT.format(
+ method_name=str(test),
+ short_description=test.shortDescription() or '')
+ except KeyError:
+ sys.exit(_(
+ 'Bad format string: {format}\n'
+ 'Replacement options are: {{short_description}} and '
+ '{{method_name}}').format(format=self.FORMAT))
+
+ return test.shortDescription() or str(test)
+
+
+# TODO: 2016-7-30 mblayman - Since the 2.6 signature is no longer relevant,
+# check the possibility of removing the module level scope.
+
+# Module level state stinks, but this is the only way to keep compatibility
+# with Python 2.6. The best place for the tracker is as an instance variable
+# on the runner, but __init__ is so different that it is not easy to create
+# a runner that satisfies every supported Python version.
+_tracker = Tracker()
+
+
+class TAPTestRunner(TextTestRunner):
+ """A test runner that will behave exactly like TextTestRunner and will
+ additionally generate TAP files for each test case"""
+
+ resultclass = TAPTestResult
+
+ def set_stream(self, streaming):
+ """Set the streaming boolean option to stream TAP directly to stdout.
+
+ The test runner default output will be suppressed in favor of TAP.
+ """
+ self.stream = _WritelnDecorator(open(os.devnull, 'w'))
+ _tracker.streaming = streaming
+ _tracker.stream = sys.stdout
+
+ def _makeResult(self):
+ result = self.resultclass(
+ self.stream, self.descriptions, self.verbosity)
+ result.tracker = _tracker
+ return result
+
+ @classmethod
+ def set_outdir(cls, outdir):
+ """Set the output directory so that TAP files are written to the
+ specified outdir location.
+ """
+ # Blame the lack of unittest extensibility for this hacky method.
+ _tracker.outdir = outdir
+
+ @classmethod
+ def set_combined(cls, combined):
+ """Set the tracker to use a single output file."""
+ _tracker.combined = combined
+
+ @classmethod
+ def set_header(cls, header):
+ """Set the header display flag."""
+ _tracker.header = header
+
+ @classmethod
+ def set_format(cls, fmt):
+ """Set the format of each test line.
+
+ The format string can use:
+ * {method_name}: The test method name
+ * {short_description}: The test's docstring short description
+ """
+ TAPTestResult.FORMAT = fmt
--- /dev/null
+# Copyright (c) 2016, Matt Layman
+
+from __future__ import print_function
+import os
+import string
+import sys
+
+from tap.directive import Directive
+from tap.i18n import _
+from tap.line import Result
+
+
+class Tracker(object):
+
+ def __init__(
+ self, outdir=None, combined=False, streaming=False, stream=None,
+ header=True):
+ self.outdir = outdir
+
+ # Combine all the test results into one file.
+ self.combined = combined
+ self.combined_line_number = 0
+ # Test case ordering is important for the combined results
+ # because of how numbers are assigned. The test cases
+ # must be tracked in order so that reporting can sequence
+ # the line numbers properly.
+ self.combined_test_cases_seen = []
+
+ # Stream output directly to a stream instead of file output.
+ self.streaming = streaming
+ self.stream = stream
+
+ # Display the test case header unless told not to.
+ self.header = header
+
+ # Internal state for tracking each test case.
+ self._test_cases = {}
+
+ # Python versions 2 and 3 keep maketrans in different locations.
+ if sys.version_info[0] < 3:
+ self._sanitized_table = string.maketrans(' \\/\n', '----')
+ else: # pragma: no cover
+ self._sanitized_table = str.maketrans(' \\/\n', '----')
+
+ def _get_outdir(self):
+ return self._outdir
+
+ def _set_outdir(self, outdir):
+ self._outdir = outdir
+ if outdir and not os.path.exists(outdir):
+ os.makedirs(outdir)
+
+ outdir = property(_get_outdir, _set_outdir)
+
+ def _track(self, class_name):
+ """Keep track of which test cases have executed."""
+ if self._test_cases.get(class_name) is None:
+ if self.streaming and self.header:
+ self._write_test_case_header(class_name, self.stream)
+
+ self._test_cases[class_name] = []
+ if self.combined:
+ self.combined_test_cases_seen.append(class_name)
+
+ def add_ok(self, class_name, description, directive=''):
+ result = Result(
+ ok=True, number=self._get_next_line_number(class_name),
+ description=description, directive=Directive(directive))
+ self._add_line(class_name, result)
+
+ def add_not_ok(
+ self, class_name, description, directive='', diagnostics=None):
+ result = Result(
+ ok=False, number=self._get_next_line_number(class_name),
+ description=description, diagnostics=diagnostics,
+ directive=Directive(directive))
+ self._add_line(class_name, result)
+
+ def add_skip(self, class_name, description, reason):
+ directive = 'SKIP {0}'.format(reason)
+ result = Result(
+ ok=True, number=self._get_next_line_number(class_name),
+ description=description, directive=Directive(directive))
+ self._add_line(class_name, result)
+
+ def _add_line(self, class_name, result):
+ self._track(class_name)
+ if self.streaming:
+ print(result, file=self.stream)
+ self._test_cases[class_name].append(result)
+
+ def _get_next_line_number(self, class_name):
+ if self.combined or self.streaming:
+ # This has an obvious side effect. Oh well.
+ self.combined_line_number += 1
+ return self.combined_line_number
+ else:
+ try:
+ return len(self._test_cases[class_name]) + 1
+ except KeyError:
+ # A result is created before the call to _track so the test
+ # case may not be tracked yet. In that case, the line is 1.
+ return 1
+
+ def generate_tap_reports(self):
+ """Generate TAP reports.
+
+ The results are either combined into a single output file or
+ the output file name is generated from the test case.
+ """
+ if self.streaming:
+ # The results already went to the stream, record the plan.
+ print('1..{0}'.format(self.combined_line_number), file=self.stream)
+ return
+
+ if self.combined:
+ combined_file = 'testresults.tap'
+ if self.outdir:
+ combined_file = os.path.join(self.outdir, combined_file)
+ with open(combined_file, 'w') as out_file:
+ for test_case in self.combined_test_cases_seen:
+ self.generate_tap_report(
+ test_case, self._test_cases[test_case], out_file)
+ print(
+ '1..{0}'.format(self.combined_line_number), file=out_file)
+ else:
+ for test_case, tap_lines in self._test_cases.items():
+ with open(self._get_tap_file_path(test_case), 'w') as out_file:
+ self.generate_tap_report(test_case, tap_lines, out_file)
+
+ def generate_tap_report(self, test_case, tap_lines, out_file):
+ self._write_test_case_header(test_case, out_file)
+
+ for tap_line in tap_lines:
+ print(tap_line, file=out_file)
+
+ # For combined results, the plan is only output once after
+ # all the test cases complete.
+ if not self.combined:
+ print('1..{0}'.format(len(tap_lines)), file=out_file)
+
+ def _write_test_case_header(self, test_case, stream):
+ print(_('# TAP results for {test_case}').format(
+ test_case=test_case), file=stream)
+
+ def _get_tap_file_path(self, test_case):
+ """Get the TAP output file path for the test case."""
+ sanitized_test_case = test_case.translate(self._sanitized_table)
+ tap_file = sanitized_test_case + '.tap'
+ if self.outdir:
+ return os.path.join(self.outdir, tap_file)
+ return tap_file