From b1aff1ccdf247d6a65d555cf025fbb10197f2c66 Mon Sep 17 00:00:00 2001 From: Pavel Cisar Date: Thu, 11 Nov 2021 17:59:04 +0100 Subject: [PATCH] Plugin enhancenemts --- README.rst | 1 + firebird/qa/__init__.py | 2 +- firebird/qa/plugin.py | 116 +++++++++++++++++++++++++++------------- 3 files changed, 81 insertions(+), 38 deletions(-) diff --git a/README.rst b/README.rst index 86047c9e..7c74fd70 100644 --- a/README.rst +++ b/README.rst @@ -33,6 +33,7 @@ Usage Guide --protocol={xnet,inet,inet4,wnet} Network protocol used for database attachments --runslow Run slow tests + --save-output Save output from isql and other executed utilities into ./out subdirectory To run all tests (except slow ones) against local server use next command:: diff --git a/firebird/qa/__init__.py b/firebird/qa/__init__.py index f3e1bee7..1c87ff09 100644 --- a/firebird/qa/__init__.py +++ b/firebird/qa/__init__.py @@ -37,4 +37,4 @@ """ from .plugin import db_factory, user_factory, isql_act, python_act, Database, Action, \ - temp_file, temp_user, User + temp_file, User diff --git a/firebird/qa/plugin.py b/firebird/qa/plugin.py index 4dee3574..a0b7ca2e 100644 --- a/firebird/qa/plugin.py +++ b/firebird/qa/plugin.py @@ -50,9 +50,9 @@ from pathlib import Path #from configparser import ConfigParser, ExtendedInterpolation from packaging.specifiers import SpecifierSet from packaging.version import parse -from dataclasses import dataclass from firebird.driver import connect, connect_server, create_database, driver_config, \ - NetProtocol, Server, CHARSET_MAP, Connection, DatabaseError + NetProtocol, Server, CHARSET_MAP, Connection, DatabaseError, Cursor, \ + DESCRIPTION_NAME, DESCRIPTION_DISPLAY_SIZE _vars_ = {'server': None, 'bin-dir': None, @@ -216,13 +216,14 @@ class Database: srv_conf = driver_config.get_server(_vars_['server']) self.user: str = srv_conf.user.value if user is None else user self.password: str = srv_conf.password.value if password is None else password - def _make_config(self, page_size: int=None, sql_dialect: int=None, charset: str=None) -> None: + def _make_config(self, *, page_size: int=None, sql_dialect: int=None, charset: str=None, + user: str=None, password: str=None) -> None: db_conf = driver_config.get_database('pytest') db_conf.clear() db_conf.server.value = _vars_['server'] db_conf.database.value = str(self.db_path) - db_conf.user.value = self.user - db_conf.password.value = self.password + db_conf.user.value = self.user if user is None else user + db_conf.password.value = self.password if password is None else password if sql_dialect is not None: db_conf.db_sql_dialect.value = sql_dialect if page_size is not None: @@ -233,7 +234,7 @@ class Database: db_conf.protocol.value = NetProtocol._member_map_[_vars_['protocol'].upper()] def create(self, page_size: int=None, sql_dialect: int=None, charset: str=None) -> None: #__tracebackhide__ = True - self._make_config(page_size, sql_dialect, charset) + self._make_config(page_size=page_size, sql_dialect=sql_dialect, charset=charset) print(f"Creating db: {self.db_path} [{page_size=}, {sql_dialect=}, {charset=}, user={self.user}, password={self.password}]") db = create_database('pytest') db.close() @@ -323,8 +324,8 @@ class Database: db = connect('pytest') #print(f"Removing db: {self.db_path}") db.drop_database() - def connect(self) -> Connection: - self._make_config() + def connect(self, *, user: str=None, password: str=None) -> Connection: + self._make_config(user=user, password=password) return connect('pytest') @@ -353,6 +354,8 @@ class User: def drop(self) -> None: self.server.user.delete(self.name) #print(f"User {self.name} dropped") + def update(self, **kwargs) -> None: + self.server.user.update(user_name=self.name, **kwargs) def user_factory(*, name: str, password: str) -> None: @@ -367,19 +370,21 @@ def user_factory(*, name: str, password: str) -> None: def db_factory(*, filename: str='test.fdb', init: str=None, from_backup: str=None, copy_of: str=None, page_size: int=None, sql_dialect: int=None, - charset: str=None, user: str=None, password: str=None): + charset: str=None, user: str=None, password: str=None, + do_not_create: bool=False): @pytest.fixture def database_fixture(request: FixtureRequest, db_path) -> Database: db = Database(db_path, filename, user, password) - if from_backup is None and copy_of is None: - db.create(page_size, sql_dialect, charset) - elif from_backup is not None: - db.restore(from_backup) - elif copy_of is not None: - db.copy(copy_of) - if init is not None: - db.init(init) + if not do_not_create: + if from_backup is None and copy_of is None: + db.create(page_size, sql_dialect, charset) + elif from_backup is not None: + db.restore(from_backup) + elif copy_of is not None: + db.copy(copy_of) + if init is not None: + db.init(init) yield db db.drop() @@ -442,9 +447,9 @@ class Action: # Store output if _vars_['save-output']: if self.stdout: - out_file.write_text(self.stdout, encoding=self.db.io_enc) + out_file.write_text(self.stdout, encoding='utf8') if self.stderr: - err_file.write_text(self.stderr, encoding=self.db.io_enc) + err_file.write_text(self.stderr, encoding='utf8') def extract_meta(self) -> None: __tracebackhide__ = True out_file: Path = self.outfile.with_suffix('.out') @@ -496,6 +501,42 @@ class Action: out_file.write_text(self.stdout, encoding=self.db.io_enc) if self.stderr: err_file.write_text(self.stderr, encoding=self.db.io_enc) + def gsec(self, *, switches: List[str]=None, charset: str='utf8', io_enc: str=None, + input: str=None) -> None: + __tracebackhide__ = True + out_file: Path = self.outfile.with_suffix('.out') + err_file: Path = self.outfile.with_suffix('.err') + if out_file.is_file(): + out_file.unlink() + if err_file.is_file(): + err_file.unlink() + if charset is not None: + charset = charset.upper() + else: + charset = 'NONE' + if io_enc is None: + io_enc = CHARSET_MAP[charset] + params = [_vars_['gsec']] + if switches: + params.extend(switches) + params.extend(['-user', self.db.user, '-password', self.db.password]) + result: CompletedProcess = run(params, input=input, + encoding=io_enc, capture_output=True) + if result.returncode and not bool(self.expected_stderr): + print(f"-- gsec stdout {'-' * 20}") + print(result.stdout) + print(f"-- gsec stderr {'-' * 20}") + print(result.stderr) + raise Exception("gsec execution failed") + self.return_code: int = result.returncode + self.stdout: str = result.stdout + self.stderr: str = result.stderr + # Store output + if _vars_['save-output']: + if self.stdout: + out_file.write_text(self.stdout, encoding=io_enc) + if self.stderr: + err_file.write_text(self.stderr, encoding=io_enc) def isql(self, *, switches: List[str], charset: str='utf8', io_enc: str=None, input: str=None, input_file: Path=None, connect_db: bool=True) -> None: __tracebackhide__ = True @@ -551,6 +592,25 @@ class Action: self.expected_stderr = '' self.stdout = '' self.stderr = '' + def print_data(self, cursor: Cursor) -> None: + # Print a header. + for fieldDesc in cursor.description: + print (fieldDesc[DESCRIPTION_NAME].ljust(fieldDesc[DESCRIPTION_DISPLAY_SIZE]),end=' ') + print('') + for fieldDesc in cursor.description: + print ("-" * max((len(fieldDesc[DESCRIPTION_NAME]),fieldDesc[DESCRIPTION_DISPLAY_SIZE])),end=' ') + print('') + # For each row, print the value of each field left-justified within + # the maximum possible width of that field. + fieldIndices = range(len(cursor.description)) + for row in cursor: + for fieldIndex in fieldIndices: + fieldValue = row[fieldIndex] + if not isinstance(fieldValue, str): + fieldValue = str(fieldValue) + fieldMaxWidth = max((len(cursor.description[fieldIndex][DESCRIPTION_NAME]),cursor.description[fieldIndex][DESCRIPTION_DISPLAY_SIZE])) + print (fieldValue.ljust(fieldMaxWidth), end=' ') + print('') @property def clean_stdout(self) -> str: if self._clean_stdout is None: @@ -615,21 +675,3 @@ def temp_file(filename: str): return temp_file_fixture -@dataclass -class User: - name: str - password: str - -def temp_user(*, name: str, password: str): - - @pytest.fixture - def temp_user_fixture(request: FixtureRequest) -> User: - with connect_server(_vars_['server'], user='SYSDBA', password=_vars_['password']) as srv: - if srv.user.get(name) is None: - srv.user.add(user_name=name, password=password) - else: - srv.user.update(user_name=name, password=password) - yield User(name, password) - srv.user.delete(user_name=name) - - return temp_user_fixture