From 607cea07be8f887988ab060b5f1966d95363d814 Mon Sep 17 00:00:00 2001 From: Pavel Cisar Date: Mon, 15 Nov 2021 19:00:27 +0100 Subject: [PATCH] Plugin extensions --- firebird/qa/plugin.py | 124 +++++++++++++++++++++++++++++++++++++++++- setup.cfg | 2 +- 2 files changed, 123 insertions(+), 3 deletions(-) diff --git a/firebird/qa/plugin.py b/firebird/qa/plugin.py index d09400ce..27443cf3 100644 --- a/firebird/qa/plugin.py +++ b/firebird/qa/plugin.py @@ -326,9 +326,9 @@ class Database: db = connect('pytest') #print(f"Removing db: {self.db_path}") db.drop_database() - def connect(self, *, user: str=None, password: str=None) -> Connection: + def connect(self, *, user: str=None, password: str=None, role: str=None) -> Connection: self._make_config(user=user, password=password) - return connect('pytest') + return connect('pytest', role=role) @pytest.fixture @@ -370,6 +370,19 @@ def user_factory(*, name: str, password: str) -> None: return user_fixture +class Role: + def __init__(self, name: str, database: Database): + self.name: str = name + self.db: Connection = database.connect() + def __enter__(self) -> Server: + self.db.execute_immediate(f'create role {self.name}') + self.db.commit() + return self + def __exit__(self, exc_type, exc_value, traceback) -> None: + self.db.execute_immediate(f'drop role {self.name}') + self.db.commit() + self.db.close() + def db_factory(*, filename: str='test.fdb', init: str=None, from_backup: str=None, copy_of: str=None, page_size: int=None, sql_dialect: int=None, charset: str=None, user: str=None, password: str=None, @@ -539,6 +552,111 @@ class Action: out_file.write_text(self.stdout, encoding=io_enc) if self.stderr: err_file.write_text(self.stderr, encoding=io_enc) + def gbak(self, *, switches: List[str]=None, charset: str='utf8', io_enc: str=None, + input: str=None) -> None: + __tracebackhide__ = True + out_file: Path = self.outfile.with_suffix('.out') + err_file: Path = self.outfile.with_suffix('.err') + if out_file.is_file(): + out_file.unlink() + if err_file.is_file(): + err_file.unlink() + if charset is not None: + charset = charset.upper() + else: + charset = 'NONE' + if io_enc is None: + io_enc = CHARSET_MAP[charset] + params = [_vars_['gbak']] + if switches: + params.extend(switches) + params.extend(['-user', self.db.user, '-password', self.db.password]) + result: CompletedProcess = run(params, input=input, + encoding=io_enc, capture_output=True) + if result.returncode and not bool(self.expected_stderr): + print(f"-- gbak stdout {'-' * 20}") + print(result.stdout) + print(f"-- gbak stderr {'-' * 20}") + print(result.stderr) + raise Exception("gbak execution failed") + self.return_code: int = result.returncode + self.stdout: str = result.stdout + self.stderr: str = result.stderr + # Store output + if _vars_['save-output']: + if self.stdout: + out_file.write_text(self.stdout, encoding=io_enc) + if self.stderr: + err_file.write_text(self.stderr, encoding=io_enc) + def nbackup(self, *, switches: List[str], charset: str='utf8') -> None: + __tracebackhide__ = True + out_file: Path = self.outfile.with_suffix('.out') + err_file: Path = self.outfile.with_suffix('.err') + if out_file.is_file(): + out_file.unlink() + if err_file.is_file(): + err_file.unlink() + if charset: + charset = charset.upper() + else: + charset = 'NONE' + self.db.io_enc = CHARSET_MAP[charset] + params = [_vars_['nbackup']] + params.extend(switches) + params.extend(['-user', self.db.user, '-password', self.db.password]) + result: CompletedProcess = run(params, + encoding=self.db.io_enc, capture_output=True) + if result.returncode and not bool(self.expected_stderr): + print(f"-- nbackup stdout {'-' * 20}") + print(result.stdout) + print(f"-- nbackup stderr {'-' * 20}") + print(result.stderr) + raise Exception("nbackup execution failed") + self.return_code: int = result.returncode + self.stdout: str = result.stdout + self.stderr: str = result.stderr + # Store output + if _vars_['save-output']: + if self.stdout: + out_file.write_text(self.stdout, encoding=self.db.io_enc) + if self.stderr: + err_file.write_text(self.stderr, encoding=self.db.io_enc) + def gfix(self, *, switches: List[str]=None, charset: str='utf8', io_enc: str=None, + input: str=None) -> None: + __tracebackhide__ = True + out_file: Path = self.outfile.with_suffix('.out') + err_file: Path = self.outfile.with_suffix('.err') + if out_file.is_file(): + out_file.unlink() + if err_file.is_file(): + err_file.unlink() + if charset is not None: + charset = charset.upper() + else: + charset = 'NONE' + if io_enc is None: + io_enc = CHARSET_MAP[charset] + params = [_vars_['gfix']] + if switches: + params.extend(switches) + params.extend(['-user', self.db.user, '-password', self.db.password]) + result: CompletedProcess = run(params, input=input, + encoding=io_enc, capture_output=True) + if result.returncode and not bool(self.expected_stderr): + print(f"-- gfix stdout {'-' * 20}") + print(result.stdout) + print(f"-- gfix stderr {'-' * 20}") + print(result.stderr) + raise Exception("gfix execution failed") + self.return_code: int = result.returncode + self.stdout: str = result.stdout + self.stderr: str = result.stderr + # Store output + if _vars_['save-output']: + if self.stdout: + out_file.write_text(self.stdout, encoding=io_enc) + if self.stderr: + err_file.write_text(self.stderr, encoding=io_enc) def isql(self, *, switches: List[str], charset: str='utf8', io_enc: str=None, input: str=None, input_file: Path=None, connect_db: bool=True) -> None: __tracebackhide__ = True @@ -613,6 +731,8 @@ class Action: fieldMaxWidth = max((len(cursor.description[fieldIndex][DESCRIPTION_NAME]),cursor.description[fieldIndex][DESCRIPTION_DISPLAY_SIZE])) print (fieldValue.ljust(fieldMaxWidth), end=' ') print('') + def test_role(self, name: str) -> Role: + return Role(name, self.db) @property def clean_stdout(self) -> str: if self._clean_stdout is None: diff --git a/setup.cfg b/setup.cfg index 4be1cbf6..512f4e7b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,7 +5,7 @@ all-files=True [metadata] name = firebird-qa -version = 0.3.0 +version = 0.4.0 description = pytest plugin for Firebird QA long_description = file: README.rst long_description_content_type = text/x-rst; charset=UTF-8