diff --git a/firebird/qa/__init__.py b/firebird/qa/__init__.py index c2326808..f3e1bee7 100644 --- a/firebird/qa/__init__.py +++ b/firebird/qa/__init__.py @@ -36,4 +36,5 @@ """ -from .plugin import db_factory, user_factory, isql_act, Database, Action +from .plugin import db_factory, user_factory, isql_act, python_act, Database, Action, \ + temp_file, temp_user, User diff --git a/firebird/qa/plugin.py b/firebird/qa/plugin.py index 5cf013be..4dee3574 100644 --- a/firebird/qa/plugin.py +++ b/firebird/qa/plugin.py @@ -50,8 +50,9 @@ from pathlib import Path #from configparser import ConfigParser, ExtendedInterpolation from packaging.specifiers import SpecifierSet from packaging.version import parse +from dataclasses import dataclass from firebird.driver import connect, connect_server, create_database, driver_config, \ - NetProtocol, Server, CHARSET_MAP + NetProtocol, Server, CHARSET_MAP, Connection, DatabaseError _vars_ = {'server': None, 'bin-dir': None, @@ -153,7 +154,7 @@ def pytest_configure(config): #else: #_vars_['bin-dir'] = Path(_vars_['bin-dir']) # tools - for tool in ['isql', 'gbak', 'nbackup', 'gstat', 'gfix', 'gsec']: + for tool in ['isql', 'gbak', 'nbackup', 'gstat', 'gfix', 'gsec', 'fbsvcmgr']: set_tool(tool) @@ -277,7 +278,8 @@ class Database: print(result.stderr) raise Exception("Database init script execution failed") return result - def execute(self, script: str, *, raise_on_fail: bool, charset: str='utf8') -> CompletedProcess: + def execute(self, script: str, *, raise_on_fail: bool, charset: str='utf8', + do_not_connect: bool=False) -> CompletedProcess: __tracebackhide__ = True #print("Running test script") if charset: @@ -285,8 +287,10 @@ class Database: else: charset = 'NONE' self.io_enc = CHARSET_MAP[charset] - result = run([_vars_['isql'], '-ch', charset, '-user', self.user, - '-password', self.password, str(self.dsn)], + params = [_vars_['isql'], '-ch', charset] + if not do_not_connect: + params.extend(['-user', self.user, '-password', self.password, str(self.dsn)]) + result = run(params, input=substitute_macros(script, self.subs), encoding=self.io_enc, capture_output=True) if result.returncode and raise_on_fail: @@ -296,11 +300,33 @@ class Database: print(result.stderr) raise Exception("ISQL script execution failed") return result + def extract_meta(self, charset: str='utf8') -> CompletedProcess: + __tracebackhide__ = True + #print("Running test script") + if charset: + charset = charset.upper() + else: + charset = 'NONE' + self.io_enc = CHARSET_MAP[charset] + result = run([_vars_['isql'], '-x', '-ch', charset, '-user', self.user, + '-password', self.password, str(self.dsn)], + encoding=self.io_enc, capture_output=True) + if result.returncode: + print(f"-- ISQL stdout {'-' * 20}") + print(result.stdout) + print(f"-- ISQL stderr {'-' * 20}") + print(result.stderr) + raise Exception("ISQL execution failed") + return result def drop(self) -> None: self._make_config() db = connect('pytest') #print(f"Removing db: {self.db_path}") db.drop_database() + def connect(self) -> Connection: + self._make_config() + return connect('pytest') + @pytest.fixture def db_path(tmp_path) -> Path: @@ -398,7 +424,7 @@ class Action: if remove_space: value = self.space_strip(value) return value - def execute(self) -> None: + def execute(self, *, do_not_connect: bool=False) -> None: __tracebackhide__ = True out_file: Path = self.outfile.with_suffix('.out') err_file: Path = self.outfile.with_suffix('.err') @@ -408,7 +434,8 @@ class Action: err_file.unlink() result: CompletedProcess = self.db.execute(self.script, raise_on_fail=not bool(self.expected_stderr), - charset=self.charset) + charset=self.charset, + do_not_connect=do_not_connect) self.return_code: int = result.returncode self.stdout: str = result.stdout self.stderr: str = result.stderr @@ -418,6 +445,112 @@ class Action: out_file.write_text(self.stdout, encoding=self.db.io_enc) if self.stderr: err_file.write_text(self.stderr, encoding=self.db.io_enc) + def extract_meta(self) -> None: + __tracebackhide__ = True + out_file: Path = self.outfile.with_suffix('.out') + err_file: Path = self.outfile.with_suffix('.err') + if out_file.is_file(): + out_file.unlink() + if err_file.is_file(): + err_file.unlink() + result: CompletedProcess = self.db.extract_meta(charset=self.charset) + self.return_code: int = result.returncode + self.stdout: str = result.stdout + self.stderr: str = result.stderr + # Store output + if _vars_['save-output']: + if self.stdout: + out_file.write_text(self.stdout, encoding=self.db.io_enc) + if self.stderr: + err_file.write_text(self.stderr, encoding=self.db.io_enc) + def gstat(self, *, switches: List[str], charset: str='utf8') -> None: + __tracebackhide__ = True + out_file: Path = self.outfile.with_suffix('.out') + err_file: Path = self.outfile.with_suffix('.err') + if out_file.is_file(): + out_file.unlink() + if err_file.is_file(): + err_file.unlink() + if charset: + charset = charset.upper() + else: + charset = 'NONE' + self.db.io_enc = CHARSET_MAP[charset] + params = [_vars_['gstat']] + params.extend(switches) + params.extend(['-user', self.db.user, '-password', self.db.password, str(self.db.dsn)]) + result: CompletedProcess = run(params, + encoding=self.db.io_enc, capture_output=True) + if result.returncode and not bool(self.expected_stderr): + print(f"-- gstat stdout {'-' * 20}") + print(result.stdout) + print(f"-- gstat stderr {'-' * 20}") + print(result.stderr) + raise Exception("gstat execution failed") + self.return_code: int = result.returncode + self.stdout: str = result.stdout + self.stderr: str = result.stderr + # Store output + if _vars_['save-output']: + if self.stdout: + out_file.write_text(self.stdout, encoding=self.db.io_enc) + if self.stderr: + err_file.write_text(self.stderr, encoding=self.db.io_enc) + def isql(self, *, switches: List[str], charset: str='utf8', io_enc: str=None, + input: str=None, input_file: Path=None, connect_db: bool=True) -> None: + __tracebackhide__ = True + out_file: Path = self.outfile.with_suffix('.out') + err_file: Path = self.outfile.with_suffix('.err') + if out_file.is_file(): + out_file.unlink() + if err_file.is_file(): + err_file.unlink() + params = [_vars_['isql']] + if charset is not None: + charset = charset.upper() + params.extend(['-ch', charset]) + else: + charset = 'NONE' + if io_enc is None: + io_enc = CHARSET_MAP[charset] + params.extend(switches) + if input_file is not None: + params.extend(['-i', str(input_file)]) + if connect_db: + params.extend(['-user', self.db.user, '-password', self.db.password, str(self.db.dsn)]) + result: CompletedProcess = run(params, input=input, + encoding=io_enc, capture_output=True) + if result.returncode and not bool(self.expected_stderr): + print(f"-- isql stdout {'-' * 20}") + print(result.stdout) + print(f"-- isql stderr {'-' * 20}") + print(result.stderr) + raise Exception("isql execution failed") + self.return_code: int = result.returncode + self.stdout: str = result.stdout + self.stderr: str = result.stderr + # Store output + if _vars_['save-output']: + if self.stdout: + out_file.write_text(self.stdout, encoding=io_enc) + if self.stderr: + err_file.write_text(self.stderr, encoding=io_enc) + def connect_server(self, *, user: str='SYSDBA', password: str=None) -> Server: + return connect_server(_vars_['server'], user=user, + password=_vars_['password'] if password is None else password) + def is_version(self, version_spec: str) -> bool: + spec = SpecifierSet(version_spec) + return _vars_['version'] in spec + def reset(self) -> None: + self._clean_stdout = None + self._clean_stderr = None + self._clean_expected_stdout = None + self._clean_expected_stderr = None + # + self.expected_stdout = '' + self.expected_stderr = '' + self.stdout = '' + self.stderr = '' @property def clean_stdout(self) -> str: if self._clean_stdout is None: @@ -438,6 +571,9 @@ class Action: if self._clean_expected_stderr is None: self._clean_expected_stderr = self.string_strip(self.expected_stderr, self.substitutions) return self._clean_expected_stderr + @property + def vars(self) -> Dict[str]: + return _vars_ def isql_act(db_fixture_name: str, script: str, *, substitutions: List[str]=None, charset: str='utf8'): @@ -449,8 +585,51 @@ def isql_act(db_fixture_name: str, script: str, *, substitutions: List[str]=None if _vars_['save-output'] and not f.parent.exists(): f.parent.mkdir(parents=True) f = f.with_name(f'{f.stem}-{request.function.__name__}.out') - #f.write_text('stdout') result: Action = Action(db, script, substitutions, f, charset) return result return isql_act_fixture + +def python_act(db_fixture_name: str, *, substitutions: List[str]=None, charset: str='utf8'): + + @pytest.fixture + def python_act_fixture(request: FixtureRequest) -> Action: + db: Database = request.getfixturevalue(db_fixture_name) + f: Path = Path.cwd() / 'out' / request.module.__name__.replace('.', '/') + if _vars_['save-output'] and not f.parent.exists(): + f.parent.mkdir(parents=True) + f = f.with_name(f'{f.stem}-{request.function.__name__}.out') + result: Action = Action(db, '', substitutions, f, charset) + return result + + return python_act_fixture + +def temp_file(filename: str): + + @pytest.fixture + def temp_file_fixture(tmp_path): + tmp_file = tmp_path / filename + yield tmp_file + if tmp_file.is_file(): + tmp_file.unlink() + + return temp_file_fixture + +@dataclass +class User: + name: str + password: str + +def temp_user(*, name: str, password: str): + + @pytest.fixture + def temp_user_fixture(request: FixtureRequest) -> User: + with connect_server(_vars_['server'], user='SYSDBA', password=_vars_['password']) as srv: + if srv.user.get(name) is None: + srv.user.add(user_name=name, password=password) + else: + srv.user.update(user_name=name, password=password) + yield User(name, password) + srv.user.delete(user_name=name) + + return temp_user_fixture diff --git a/setup.cfg b/setup.cfg index 322c0be4..4be1cbf6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,7 +5,7 @@ all-files=True [metadata] name = firebird-qa -version = 0.2.0 +version = 0.3.0 description = pytest plugin for Firebird QA long_description = file: README.rst long_description_content_type = text/x-rst; charset=UTF-8