mirror of
https://github.com/FirebirdSQL/firebird-qa.git
synced 2025-01-22 13:33:07 +01:00
Plugin extensions
This commit is contained in:
parent
515fbfb7f8
commit
607cea07be
@ -326,9 +326,9 @@ class Database:
|
||||
db = connect('pytest')
|
||||
#print(f"Removing db: {self.db_path}")
|
||||
db.drop_database()
|
||||
def connect(self, *, user: str=None, password: str=None) -> Connection:
|
||||
def connect(self, *, user: str=None, password: str=None, role: str=None) -> Connection:
|
||||
self._make_config(user=user, password=password)
|
||||
return connect('pytest')
|
||||
return connect('pytest', role=role)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -370,6 +370,19 @@ def user_factory(*, name: str, password: str) -> None:
|
||||
|
||||
return user_fixture
|
||||
|
||||
class Role:
|
||||
def __init__(self, name: str, database: Database):
|
||||
self.name: str = name
|
||||
self.db: Connection = database.connect()
|
||||
def __enter__(self) -> Server:
|
||||
self.db.execute_immediate(f'create role {self.name}')
|
||||
self.db.commit()
|
||||
return self
|
||||
def __exit__(self, exc_type, exc_value, traceback) -> None:
|
||||
self.db.execute_immediate(f'drop role {self.name}')
|
||||
self.db.commit()
|
||||
self.db.close()
|
||||
|
||||
def db_factory(*, filename: str='test.fdb', init: str=None, from_backup: str=None,
|
||||
copy_of: str=None, page_size: int=None, sql_dialect: int=None,
|
||||
charset: str=None, user: str=None, password: str=None,
|
||||
@ -539,6 +552,111 @@ class Action:
|
||||
out_file.write_text(self.stdout, encoding=io_enc)
|
||||
if self.stderr:
|
||||
err_file.write_text(self.stderr, encoding=io_enc)
|
||||
def gbak(self, *, switches: List[str]=None, charset: str='utf8', io_enc: str=None,
|
||||
input: str=None) -> None:
|
||||
__tracebackhide__ = True
|
||||
out_file: Path = self.outfile.with_suffix('.out')
|
||||
err_file: Path = self.outfile.with_suffix('.err')
|
||||
if out_file.is_file():
|
||||
out_file.unlink()
|
||||
if err_file.is_file():
|
||||
err_file.unlink()
|
||||
if charset is not None:
|
||||
charset = charset.upper()
|
||||
else:
|
||||
charset = 'NONE'
|
||||
if io_enc is None:
|
||||
io_enc = CHARSET_MAP[charset]
|
||||
params = [_vars_['gbak']]
|
||||
if switches:
|
||||
params.extend(switches)
|
||||
params.extend(['-user', self.db.user, '-password', self.db.password])
|
||||
result: CompletedProcess = run(params, input=input,
|
||||
encoding=io_enc, capture_output=True)
|
||||
if result.returncode and not bool(self.expected_stderr):
|
||||
print(f"-- gbak stdout {'-' * 20}")
|
||||
print(result.stdout)
|
||||
print(f"-- gbak stderr {'-' * 20}")
|
||||
print(result.stderr)
|
||||
raise Exception("gbak execution failed")
|
||||
self.return_code: int = result.returncode
|
||||
self.stdout: str = result.stdout
|
||||
self.stderr: str = result.stderr
|
||||
# Store output
|
||||
if _vars_['save-output']:
|
||||
if self.stdout:
|
||||
out_file.write_text(self.stdout, encoding=io_enc)
|
||||
if self.stderr:
|
||||
err_file.write_text(self.stderr, encoding=io_enc)
|
||||
def nbackup(self, *, switches: List[str], charset: str='utf8') -> None:
|
||||
__tracebackhide__ = True
|
||||
out_file: Path = self.outfile.with_suffix('.out')
|
||||
err_file: Path = self.outfile.with_suffix('.err')
|
||||
if out_file.is_file():
|
||||
out_file.unlink()
|
||||
if err_file.is_file():
|
||||
err_file.unlink()
|
||||
if charset:
|
||||
charset = charset.upper()
|
||||
else:
|
||||
charset = 'NONE'
|
||||
self.db.io_enc = CHARSET_MAP[charset]
|
||||
params = [_vars_['nbackup']]
|
||||
params.extend(switches)
|
||||
params.extend(['-user', self.db.user, '-password', self.db.password])
|
||||
result: CompletedProcess = run(params,
|
||||
encoding=self.db.io_enc, capture_output=True)
|
||||
if result.returncode and not bool(self.expected_stderr):
|
||||
print(f"-- nbackup stdout {'-' * 20}")
|
||||
print(result.stdout)
|
||||
print(f"-- nbackup stderr {'-' * 20}")
|
||||
print(result.stderr)
|
||||
raise Exception("nbackup execution failed")
|
||||
self.return_code: int = result.returncode
|
||||
self.stdout: str = result.stdout
|
||||
self.stderr: str = result.stderr
|
||||
# Store output
|
||||
if _vars_['save-output']:
|
||||
if self.stdout:
|
||||
out_file.write_text(self.stdout, encoding=self.db.io_enc)
|
||||
if self.stderr:
|
||||
err_file.write_text(self.stderr, encoding=self.db.io_enc)
|
||||
def gfix(self, *, switches: List[str]=None, charset: str='utf8', io_enc: str=None,
|
||||
input: str=None) -> None:
|
||||
__tracebackhide__ = True
|
||||
out_file: Path = self.outfile.with_suffix('.out')
|
||||
err_file: Path = self.outfile.with_suffix('.err')
|
||||
if out_file.is_file():
|
||||
out_file.unlink()
|
||||
if err_file.is_file():
|
||||
err_file.unlink()
|
||||
if charset is not None:
|
||||
charset = charset.upper()
|
||||
else:
|
||||
charset = 'NONE'
|
||||
if io_enc is None:
|
||||
io_enc = CHARSET_MAP[charset]
|
||||
params = [_vars_['gfix']]
|
||||
if switches:
|
||||
params.extend(switches)
|
||||
params.extend(['-user', self.db.user, '-password', self.db.password])
|
||||
result: CompletedProcess = run(params, input=input,
|
||||
encoding=io_enc, capture_output=True)
|
||||
if result.returncode and not bool(self.expected_stderr):
|
||||
print(f"-- gfix stdout {'-' * 20}")
|
||||
print(result.stdout)
|
||||
print(f"-- gfix stderr {'-' * 20}")
|
||||
print(result.stderr)
|
||||
raise Exception("gfix execution failed")
|
||||
self.return_code: int = result.returncode
|
||||
self.stdout: str = result.stdout
|
||||
self.stderr: str = result.stderr
|
||||
# Store output
|
||||
if _vars_['save-output']:
|
||||
if self.stdout:
|
||||
out_file.write_text(self.stdout, encoding=io_enc)
|
||||
if self.stderr:
|
||||
err_file.write_text(self.stderr, encoding=io_enc)
|
||||
def isql(self, *, switches: List[str], charset: str='utf8', io_enc: str=None,
|
||||
input: str=None, input_file: Path=None, connect_db: bool=True) -> None:
|
||||
__tracebackhide__ = True
|
||||
@ -613,6 +731,8 @@ class Action:
|
||||
fieldMaxWidth = max((len(cursor.description[fieldIndex][DESCRIPTION_NAME]),cursor.description[fieldIndex][DESCRIPTION_DISPLAY_SIZE]))
|
||||
print (fieldValue.ljust(fieldMaxWidth), end=' ')
|
||||
print('')
|
||||
def test_role(self, name: str) -> Role:
|
||||
return Role(name, self.db)
|
||||
@property
|
||||
def clean_stdout(self) -> str:
|
||||
if self._clean_stdout is None:
|
||||
|
Loading…
Reference in New Issue
Block a user