diff --git a/firebird/qa/plugin.py b/firebird/qa/plugin.py index 6b1c2527..2f8956b4 100644 --- a/firebird/qa/plugin.py +++ b/firebird/qa/plugin.py @@ -38,12 +38,16 @@ from __future__ import annotations from typing import List, Dict, Union, Optional +import sys +import locale import os import re import shutil import platform import pytest from _pytest.fixtures import FixtureRequest +from _pytest.terminal import TerminalReporter, _get_raw_skip_reason, _format_trimmed +from _pytest.reports import TestReport from subprocess import run, CompletedProcess, PIPE, STDOUT from pathlib import Path from configparser import ConfigParser, ExtendedInterpolation @@ -64,6 +68,65 @@ _vars_ = {'server': None, } _platform = platform.system() +_nodemap = {} + +FIELD_ID = 'ID:' +FIELD_ISSUE = 'ISSUE:' +FIELD_TITLE = 'TITLE:' +FIELD_DECRIPTION = 'DESCRIPTION:' +FIELD_NOTES = 'NOTES:' + +def format_title(title, patterns): + return _remove_patterns(title, patterns).replace('_', ' ').strip() + +def format_class_name(class_name, patterns): + formatted = '' + + class_name = _remove_patterns(class_name, patterns) + if ' ' not in class_name: + for index, letter in enumerate(class_name): + if letter.isupper() and \ + _has_lower_letter_besides(index, class_name): + formatted += ' ' + + formatted += letter + else: + formatted = class_name + + return formatted.strip() + +def format_module_name(module_name, patterns): + return format_title(module_name.split('/')[-1], patterns) + +def _remove_patterns(statement, patterns): + for glob_pattern in patterns: + pattern = glob_pattern.replace('*', '') + + if glob_pattern.startswith('*'): + pattern = '{0}$'.format(pattern) + statement = re.sub(pattern, '', statement) + + elif glob_pattern.endswith('*'): + + pattern = '^{0}'.format(pattern) + statement = re.sub(pattern, '', statement) + + elif '*' in glob_pattern: + infix_patterns = glob_pattern.split('*', 2) + infix_patterns[0] = '{}*'.format(infix_patterns[0]) + infix_patterns[1] = '*{}'.format(infix_patterns[1]) + statement = _remove_patterns(statement, infix_patterns) + + else: + pattern = '^{0}'.format(pattern) + statement = re.sub(pattern, '', statement) + return statement + +def _has_lower_letter_besides(index, string): + letter_before = string[index - 1] if index > 0 else '' + letter_after = string[index + 1] if index < len(string) - 1 else '' + + return letter_before.islower() or letter_after.islower() def pytest_addoption(parser, pluginmanager): """Adds plugin-specific pytest command-line options. @@ -84,21 +147,13 @@ def pytest_report_header(config): .. seealso:: `pytest documentation <_pytest.hookspec.pytest_report_header>` for details. """ - return ["Firebird:", - f" root: {_vars_['root']}", - f" databases: {_vars_['databases']}", - f" backups: {_vars_['backups']}", - f" files: {_vars_['files']}", - f" driver configuration: {_vars_['firebird-config']}", - f" server: {_vars_['server']}", - f" protocol: {_vars_['protocol']}", - f" engine: v{_vars_['version']}, {_vars_['arch']}", + return ["System:", + f" encodings: sys: {sys.getdefaultencoding()} locale: {locale.getpreferredencoding()} filesystem: {sys.getfilesystemencoding()}", + "Firebird:", + f" server: {_vars_['server']} [v{_vars_['version']}, {_vars_['server-arch']}, {_vars_['arch']}]", f" home: {_vars_['home-dir']}", f" bin: {_vars_['bin-dir']}", - f" security db: {_vars_['security-db']}", f" client library: {_vars_['fbclient']}", - f" run slow tests: {_vars_['runslow']}", - f" save test output: {_vars_['save-output']}", ] def set_tool(tool: str): @@ -110,21 +165,109 @@ def set_tool(tool: str): pytest.exit(f"Can't find '{tool}' in {_vars_['bin-dir']}") _vars_[tool] = path +class QATerminalReporter(TerminalReporter): + def _getfailureheadline(self, rep): + head_line = rep.head_line + if head_line: + return rep.nodeid + return "test session" # XXX? + def pytest_runtest_logstart(self, nodeid: str, location: Tuple[str, Optional[int], str]) -> None: + # Ensure that the path is printed before the + # 1st test of a module starts running. + nodeid = _nodemap.get(nodeid, nodeid) + if self.showlongtestinfo: + line = nodeid + ' ' + self.write_ensure_prefix(line, "") + self.flush() + elif self.showfspath: + self.write_fspath_result(nodeid, "") + self.flush() + def pytest_runtest_logreport(self, report: TestReport) -> None: + self._tests_ran = True + rep = report + res: Tuple[ + str, str, Union[str, Tuple[str, Mapping[str, bool]]] + ] = self.config.hook.pytest_report_teststatus(report=rep, config=self.config) + category, letter, word = res + if not isinstance(word, tuple): + markup = None + else: + word, markup = word + self._add_stats(category, [rep]) + if not letter and not word: + # Probably passed setup/teardown. + return + running_xdist = hasattr(rep, "node") + if markup is None: + was_xfail = hasattr(report, "wasxfail") + if rep.passed and not was_xfail: + markup = {"green": True} + elif rep.passed and was_xfail: + markup = {"yellow": True} + elif rep.failed: + markup = {"red": True} + elif rep.skipped: + markup = {"yellow": True} + else: + markup = {} + if self.verbosity <= 0: + self._tw.write(letter, **markup) + else: + self._progress_nodeids_reported.add(rep.nodeid) + line = rep._qa_id_ + ' ' + if not running_xdist: + self.write_ensure_prefix(line, word, **markup) + + if self._show_progress_info == "count": + num_tests = self._session.testscollected + progress_length = len(" [{}/{}]".format(str(num_tests), str(num_tests))) + else: + progress_length = len(" [100%]") + + if rep.skipped or hasattr(report, "wasxfail"): + available_width = ( + (self._tw.fullwidth - self._tw.width_of_current_line) + - progress_length + - 1 + ) + reason = _get_raw_skip_reason(rep) + reason_ = _format_trimmed(" ({})", reason, available_width) + if reason and reason_ is not None: + self._tw.write(reason_) + if self._show_progress_info: + self._write_progress_information_filling_space() + else: + self.ensure_newline() + self._tw.write("[%s]" % rep.node.gateway.id) + if self._show_progress_info: + self._tw.write( + self._get_progress_information_message() + " ", cyan=True + ) + else: + self._tw.write(" ") + self._tw.write(word, **markup) + self._tw.write(" " + line) + self.currentfspath = -2 + self.flush() + +@pytest.mark.tryfirst +def pytest_runtest_makereport(item, call): + result = TestReport.from_item_and_call(item, call) + for attr in dir(item): + if attr.startswith('_qa_'): + setattr(result, attr, getattr(item, attr)) + return result + +@pytest.mark.trylast def pytest_configure(config): """Plugin configuration. .. seealso:: `pytest documentation <_pytest.hookspec.pytest_configure>` for details. """ # pytest.ini - config.addinivalue_line( - "markers", "version(versions): Firebird version specifications" - ) - config.addinivalue_line( - "markers", "platform(platforms): Platform names" - ) - config.addinivalue_line( - "markers", "slow: Mark test as slow to run" - ) + config.addinivalue_line("markers", "version(versions): Firebird version specifications") + config.addinivalue_line("markers", "platform(platforms): Platform names") + config.addinivalue_line("markers", "slow: Mark test as slow to run") if config.getoption('help'): return config_path: Path = Path.cwd() / 'firebird-driver.conf' @@ -186,7 +329,35 @@ def pytest_configure(config): # Driver encoding for NONE charset CHARSET_MAP['NONE'] = 'utf-8' CHARSET_MAP[None] = 'utf-8' - + # Server architecture [CS,SS,SC] + with connect('employee') as con1, connect('employee') as con2: + sql = f""" + select count(distinct a.mon$server_pid), min(a.mon$remote_protocol), + max(iif(a.mon$remote_protocol is null, 1, 0)) + from mon$attachments a + where a.mon$attachment_id in ({con1.info.id}, {con2.info.id}) or upper(a.mon$user) = upper('cache writer') + """ + cur1 = con1.cursor() + cur1.execute(sql) + server_cnt, server_pro, cache_wrtr = cur1.fetchone() + if server_pro is None: + result = 'Embedded' + elif cache_wrtr == 1: + result = 'SuperServer' + elif server_cnt == 2: + result = 'Classic' + else: + f1 = con1.info.get_info(DbInfoCode.FETCHES) + cur2 = con2.cursor() + cur2.execute('select 1 from rdb$database').fetchall() + f2 = con1.info.get_info(DbInfoCode.FETCHES) + result = 'SuperClassic' if f1 == f2 else 'SuperServer' + _vars_['server-arch'] = result + # Change terminal reporter + standard_reporter = config.pluginmanager.getplugin('terminalreporter') + pspec_reporter = QATerminalReporter(standard_reporter.config) + config.pluginmanager.unregister(standard_reporter) + config.pluginmanager.register(pspec_reporter, 'terminalreporter') def pytest_collection_modifyitems(session, config, items): skip_slow = pytest.mark.skip(reason="need --runslow option to run") @@ -210,16 +381,46 @@ def pytest_collection_modifyitems(session, config, items): deselected.append(item) items[:] = selected config.hook.pytest_deselected(items=deselected) - -#@pytest.hookimpl(hookwrapper=True) -#def pytest_runtest_makereport(item, call): - #outcome = yield - #report = outcome.get_result() - - #test_fn = item.obj - #docstring = getattr(test_fn, '__doc__') - #if docstring: - #report.nodeid = docstring + # Add OUR OWN test metadata to Item + for item in items: + item._qa_id_ = item.nodeid + item._qa_issue_ = None + item._qa_title_ = 'UNKNOWN' + item._qa_description_ = '' + item._qa_notes_ = '' + module_doc = item.parent.obj.__doc__ + if module_doc is None: + continue + current_field = 'NONE' + for line in module_doc.splitlines(): + uline = line.upper() + if uline.startswith(FIELD_ID): + current_field = FIELD_ID + item._qa_id_ = line[len(FIELD_ID):].strip() + _nodemap[item.nodeid] = item._qa_id_ + elif uline.startswith(FIELD_ISSUE): + current_field = FIELD_ISSUE + item._qa_issue_ = line[len(FIELD_ISSUE):].strip() + elif uline.startswith(FIELD_TITLE): + current_field = FIELD_TITLE + item._qa_title_ = line[len(FIELD_TITLE):].strip() + elif uline.startswith(FIELD_DECRIPTION): + current_field = FIELD_DECRIPTION + item._qa_description_ = line[len(FIELD_DECRIPTION):].strip() + elif uline.startswith(FIELD_NOTES): + current_field = FIELD_NOTES + item._qa_notes_ = line[len(FIELD_NOTES):].strip() + elif current_field == FIELD_DECRIPTION: + if item._qa_description_: + item._qa_description_ += '\n' + item._qa_description_ += line.strip() + elif current_field == FIELD_NOTES: + if item._qa_notes_: + item._qa_notes_ += '\n' + item._qa_notes_ += line.strip() + else: + # Unknown field + pass def substitute_macros(text: str, macros: Dict[str, str]): """Helper function to substitute `$(name)` macros in text. @@ -1051,7 +1252,7 @@ class Action: self.expected_stderr: str = '' self._clean_expected_stderr: str = None #: REGEX substitutions applied on stdout/stderr on cleanup. - self.substitutions: List[str] = [x for x in substitutions] + self.substitutions: List[str] = [] if substitutions is None else [x for x in substitutions] #: Base path/filename for stdout/stderr protocols of external Firebird tool execution. self.outfile: Path = outfile #: Output from last executed trace session. diff --git a/setup.cfg b/setup.cfg index bf1722ab..ce8101bf 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,7 +5,7 @@ all-files=True [metadata] name = firebird-qa -version = 0.9.2 +version = 0.10.0 description = pytest plugin for Firebird QA long_description = file: README.rst long_description_content_type = text/x-rst; charset=UTF-8