6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 13:33:07 +01:00

Plugin enhancenemts

This commit is contained in:
Pavel Císař 2021-11-11 17:59:04 +01:00
parent 7eb6992fd5
commit b1aff1ccdf
3 changed files with 81 additions and 38 deletions

View File

@ -33,6 +33,7 @@ Usage Guide
--protocol={xnet,inet,inet4,wnet}
Network protocol used for database attachments
--runslow Run slow tests
--save-output Save output from isql and other executed utilities into ./out subdirectory
To run all tests (except slow ones) against local server use next command::

View File

@ -37,4 +37,4 @@
"""
from .plugin import db_factory, user_factory, isql_act, python_act, Database, Action, \
temp_file, temp_user, User
temp_file, User

View File

@ -50,9 +50,9 @@ from pathlib import Path
#from configparser import ConfigParser, ExtendedInterpolation
from packaging.specifiers import SpecifierSet
from packaging.version import parse
from dataclasses import dataclass
from firebird.driver import connect, connect_server, create_database, driver_config, \
NetProtocol, Server, CHARSET_MAP, Connection, DatabaseError
NetProtocol, Server, CHARSET_MAP, Connection, DatabaseError, Cursor, \
DESCRIPTION_NAME, DESCRIPTION_DISPLAY_SIZE
_vars_ = {'server': None,
'bin-dir': None,
@ -216,13 +216,14 @@ class Database:
srv_conf = driver_config.get_server(_vars_['server'])
self.user: str = srv_conf.user.value if user is None else user
self.password: str = srv_conf.password.value if password is None else password
def _make_config(self, page_size: int=None, sql_dialect: int=None, charset: str=None) -> None:
def _make_config(self, *, page_size: int=None, sql_dialect: int=None, charset: str=None,
user: str=None, password: str=None) -> None:
db_conf = driver_config.get_database('pytest')
db_conf.clear()
db_conf.server.value = _vars_['server']
db_conf.database.value = str(self.db_path)
db_conf.user.value = self.user
db_conf.password.value = self.password
db_conf.user.value = self.user if user is None else user
db_conf.password.value = self.password if password is None else password
if sql_dialect is not None:
db_conf.db_sql_dialect.value = sql_dialect
if page_size is not None:
@ -233,7 +234,7 @@ class Database:
db_conf.protocol.value = NetProtocol._member_map_[_vars_['protocol'].upper()]
def create(self, page_size: int=None, sql_dialect: int=None, charset: str=None) -> None:
#__tracebackhide__ = True
self._make_config(page_size, sql_dialect, charset)
self._make_config(page_size=page_size, sql_dialect=sql_dialect, charset=charset)
print(f"Creating db: {self.db_path} [{page_size=}, {sql_dialect=}, {charset=}, user={self.user}, password={self.password}]")
db = create_database('pytest')
db.close()
@ -323,8 +324,8 @@ class Database:
db = connect('pytest')
#print(f"Removing db: {self.db_path}")
db.drop_database()
def connect(self) -> Connection:
self._make_config()
def connect(self, *, user: str=None, password: str=None) -> Connection:
self._make_config(user=user, password=password)
return connect('pytest')
@ -353,6 +354,8 @@ class User:
def drop(self) -> None:
self.server.user.delete(self.name)
#print(f"User {self.name} dropped")
def update(self, **kwargs) -> None:
self.server.user.update(user_name=self.name, **kwargs)
def user_factory(*, name: str, password: str) -> None:
@ -367,19 +370,21 @@ def user_factory(*, name: str, password: str) -> None:
def db_factory(*, filename: str='test.fdb', init: str=None, from_backup: str=None,
copy_of: str=None, page_size: int=None, sql_dialect: int=None,
charset: str=None, user: str=None, password: str=None):
charset: str=None, user: str=None, password: str=None,
do_not_create: bool=False):
@pytest.fixture
def database_fixture(request: FixtureRequest, db_path) -> Database:
db = Database(db_path, filename, user, password)
if from_backup is None and copy_of is None:
db.create(page_size, sql_dialect, charset)
elif from_backup is not None:
db.restore(from_backup)
elif copy_of is not None:
db.copy(copy_of)
if init is not None:
db.init(init)
if not do_not_create:
if from_backup is None and copy_of is None:
db.create(page_size, sql_dialect, charset)
elif from_backup is not None:
db.restore(from_backup)
elif copy_of is not None:
db.copy(copy_of)
if init is not None:
db.init(init)
yield db
db.drop()
@ -442,9 +447,9 @@ class Action:
# Store output
if _vars_['save-output']:
if self.stdout:
out_file.write_text(self.stdout, encoding=self.db.io_enc)
out_file.write_text(self.stdout, encoding='utf8')
if self.stderr:
err_file.write_text(self.stderr, encoding=self.db.io_enc)
err_file.write_text(self.stderr, encoding='utf8')
def extract_meta(self) -> None:
__tracebackhide__ = True
out_file: Path = self.outfile.with_suffix('.out')
@ -496,6 +501,42 @@ class Action:
out_file.write_text(self.stdout, encoding=self.db.io_enc)
if self.stderr:
err_file.write_text(self.stderr, encoding=self.db.io_enc)
def gsec(self, *, switches: List[str]=None, charset: str='utf8', io_enc: str=None,
input: str=None) -> None:
__tracebackhide__ = True
out_file: Path = self.outfile.with_suffix('.out')
err_file: Path = self.outfile.with_suffix('.err')
if out_file.is_file():
out_file.unlink()
if err_file.is_file():
err_file.unlink()
if charset is not None:
charset = charset.upper()
else:
charset = 'NONE'
if io_enc is None:
io_enc = CHARSET_MAP[charset]
params = [_vars_['gsec']]
if switches:
params.extend(switches)
params.extend(['-user', self.db.user, '-password', self.db.password])
result: CompletedProcess = run(params, input=input,
encoding=io_enc, capture_output=True)
if result.returncode and not bool(self.expected_stderr):
print(f"-- gsec stdout {'-' * 20}")
print(result.stdout)
print(f"-- gsec stderr {'-' * 20}")
print(result.stderr)
raise Exception("gsec execution failed")
self.return_code: int = result.returncode
self.stdout: str = result.stdout
self.stderr: str = result.stderr
# Store output
if _vars_['save-output']:
if self.stdout:
out_file.write_text(self.stdout, encoding=io_enc)
if self.stderr:
err_file.write_text(self.stderr, encoding=io_enc)
def isql(self, *, switches: List[str], charset: str='utf8', io_enc: str=None,
input: str=None, input_file: Path=None, connect_db: bool=True) -> None:
__tracebackhide__ = True
@ -551,6 +592,25 @@ class Action:
self.expected_stderr = ''
self.stdout = ''
self.stderr = ''
def print_data(self, cursor: Cursor) -> None:
# Print a header.
for fieldDesc in cursor.description:
print (fieldDesc[DESCRIPTION_NAME].ljust(fieldDesc[DESCRIPTION_DISPLAY_SIZE]),end=' ')
print('')
for fieldDesc in cursor.description:
print ("-" * max((len(fieldDesc[DESCRIPTION_NAME]),fieldDesc[DESCRIPTION_DISPLAY_SIZE])),end=' ')
print('')
# For each row, print the value of each field left-justified within
# the maximum possible width of that field.
fieldIndices = range(len(cursor.description))
for row in cursor:
for fieldIndex in fieldIndices:
fieldValue = row[fieldIndex]
if not isinstance(fieldValue, str):
fieldValue = str(fieldValue)
fieldMaxWidth = max((len(cursor.description[fieldIndex][DESCRIPTION_NAME]),cursor.description[fieldIndex][DESCRIPTION_DISPLAY_SIZE]))
print (fieldValue.ljust(fieldMaxWidth), end=' ')
print('')
@property
def clean_stdout(self) -> str:
if self._clean_stdout is None:
@ -615,21 +675,3 @@ def temp_file(filename: str):
return temp_file_fixture
@dataclass
class User:
name: str
password: str
def temp_user(*, name: str, password: str):
@pytest.fixture
def temp_user_fixture(request: FixtureRequest) -> User:
with connect_server(_vars_['server'], user='SYSDBA', password=_vars_['password']) as srv:
if srv.user.get(name) is None:
srv.user.add(user_name=name, password=password)
else:
srv.user.update(user_name=name, password=password)
yield User(name, password)
srv.user.delete(user_name=name)
return temp_user_fixture