6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-02-02 02:40:42 +01:00

Added/Updated tests\functional\replication\test_blob_access_when_no_grant_for_select.py: Checked on 6.0.0.395 (SS/CS), 5.0.1.1439, 4.0.5.3127

This commit is contained in:
pavel-zotov 2024-07-12 10:08:47 +03:00
parent 01cf8c917f
commit 34f48ef945

View File

@ -0,0 +1,479 @@
#coding:utf-8
"""
ID: replication.test_blob_access_when_no_grant_for_select
ISSUE: None.
TITLE: Replicator must have access to the table with blob regardless SELECT grant on this table to the user who created blob.
DESCRIPTION:
We create table and user ('tmp_nonpriv_user'), with revoking all grants from him except only one: allow INSERT to that table (w/o select!).
The last DDL that we do is creating table with name 't_completed'. It serves as 'flag' to be checked that all DDL actions
on master finished.
After this we wait until replica becomes actual to master, and this delay will last no more then threshold that
is defined by MAX_TIME_FOR_WAIT_DATA_IN_REPLICA variable (measured in seconds), see QA_ROOT/files/test_config.ini
Then we connect to master as <tmp_nonpriv_user> and run DML against table 'test': insert one record.
Normally it must NOT raise any error (see 'dml_err') on master, but before fix replicator thread failed at this point with access problems
to blob data and replication.log was filled with messages:
ERROR: no permission for SELECT access to ...
Effective user is <tmp_nonpriv_user>
ERROR: Replication is stopped due to critical error(s)
We must check here that replica DB eventually will have appropriate blob in the test table (see 'isql_expected_out').
NOTE. We have to avoid query of replication log - not only verbose can be disabled, but also because code is too complex.
Further, we invoke ISQL with executing auxiliary script for drop all DB objects on master (with '-nod' command switch).
After all objects will be dropped, we have to wait again until replica becomes actual with master.
Check that both DB have no custom objects is performed (see UNION-ed query to rdb$ tables + filtering on rdb$system_flag).
Finally, we extract metadata for master and replica and make comparison.
The only difference in metadata must be 'CREATE DATABASE' statement with different DB names - we suppress it,
thus metadata difference must not be issued.
NOTES:
There is no ticket for this test.
Frontported fix was 30.06.2024:
* for 5.x: https://github.com/FirebirdSQL/firebird/commit/97358d012c798aa4382c863404f3dd22befe1af6
* for 6.x: https://github.com/FirebirdSQL/firebird/commit/caf7a6bcf8dd13790c4c0ffa52ff1bec9e46f07f
Log message: Frontported bugfix for blob access vs replicator
We use 'assert' only at the final point of test, with printing detalization about encountered problem(s).
During all previous steps, we only store unexpected output to variables, e.g.: out_main = capsys.readouterr().out etc.
Confirmed bug on 6.0.0.387, 5.0.1.1428.
Checked on 6.0.0.395 (SS/CS), 5.0.1.1439, 4.0.5.3127
Thanks to dimitr for suggestions.
"""
import os
import shutil
from difflib import unified_diff
from pathlib import Path
import time
import pytest
from firebird.qa import *
from firebird.driver import connect, create_database, DbWriteMode, ReplicaMode, ShutdownMode, ShutdownMethod, DatabaseError
# QA_GLOBALS -- dict, is defined in qa/plugin.py, obtain settings
# from act.files_dir/'test_config.ini':
repl_settings = QA_GLOBALS['replication']
MAX_TIME_FOR_WAIT_DATA_IN_REPLICA = int(repl_settings['max_time_for_wait_data_in_replica'])
MAIN_DB_ALIAS = repl_settings['main_db_alias']
REPL_DB_ALIAS = repl_settings['repl_db_alias']
RUN_SWEEP_AT_END = int(repl_settings['run_sweep_at_end'])
db_main = db_factory( filename = '#' + MAIN_DB_ALIAS, do_not_create = True, do_not_drop = True)
db_repl = db_factory( filename = '#' + REPL_DB_ALIAS, do_not_create = True, do_not_drop = True)
tmp_nonpriv_user = user_factory('db_main', name='tmp_replicator_blob_access', password='123')
substitutions = [('Start removing objects in:.*', 'Start removing objects'),
('Finish. Total objects removed: [1-9]\\d*', 'Finish. Total objects removed'),
('.* CREATE DATABASE .*', ''),
('[\t ]+', ' '),
('FOUND message about replicated segment N .*', 'FOUND message about replicated segment')]
act_db_main = python_act('db_main', substitutions=substitutions)
act_db_repl = python_act('db_repl', substitutions=substitutions)
#--------------------------------------------
def cleanup_folder(p):
# Removed all files and subdirs in the folder <p>
# Used for cleanup <repl_journal> and <repl_archive> when replication must be reset
# in case when any error occurred during test execution.
assert os.path.dirname(p) != p, f"@@@ ABEND @@@ CAN NOT operate in the file system root directory. Check your code!"
for root, dirs, files in os.walk(p):
for f in files:
# ::: NB ::: 22.12.2023.
# We have to expect that attempt to delete of GUID and (maybe) archived segments can FAIL with
# PermissionError: [WinError 32] The process cannot ... used by another process: /path/to/{GUID}
# Also, we have to skip exception if file (segment) was just deleted by engine
try:
Path(root +'/' + f).unlink(missing_ok = True)
except PermissionError as x:
pass
for d in dirs:
shutil.rmtree(os.path.join(root, d), ignore_errors = True)
return os.listdir(p)
#--------------------------------------------
def reset_replication(act_db_main, act_db_repl, db_main_file, db_repl_file):
out_reset = ''
failed_shutdown_db_map = {} # K = 'db_main', 'db_repl'; V = error that occurred when we attempted to change DB state to full shutdown (if it occurred)
with act_db_main.connect_server() as srv:
# !! IT IS ASSUMED THAT REPLICATION FOLDERS ARE IN THE SAME DIR AS <DB_MAIN> !!
# DO NOT use 'a.db.db_path' for ALIASED database!
# It will return '.' rather than full path+filename.
repl_root_path = Path(db_main_file).parent
repl_jrn_sub_dir = repl_settings['journal_sub_dir']
repl_arc_sub_dir = repl_settings['archive_sub_dir']
for f in (db_main_file, db_repl_file):
# Method db.drop() changes LINGER to 0, issues 'delete from mon$att' with suppressing exceptions
# and calls 'db.drop_database()' (also with suppressing exceptions).
# We change DB state to FULL SHUTDOWN instead of call action.db.drop() because
# this is more reliable (it kills all attachments in all known cases and does not use mon$ table)
#
try:
srv.database.shutdown(database = f, mode = ShutdownMode.FULL, method = ShutdownMethod.FORCED, timeout = 0)
# REMOVE db file from disk: we can safely assume that this can be done because DB in full shutdown state.
###########################
os.unlink(f)
except DatabaseError as e:
failed_shutdown_db_map[ f ] = e.__str__()
# Clean folders repl_journal and repl_archive: remove all files from there.
# NOTE: test must NOT raise unrecoverable error if some of files in these folders can not be deleted.
# Rather, this must be displayed as diff and test must be considered as just failed.
for p in (repl_jrn_sub_dir,repl_arc_sub_dir):
remained_files = cleanup_folder(repl_root_path/p)
if remained_files:
out_reset += '\n'.join( (f"Directory '{str(repl_root_path/p)}' remains non-empty. Could not delete file(s):", '\n'.join(remained_files)) )
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxx r e c r e a t e d b _ m a i n a n d d b _ r e p l xxx
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
for a in (act_db_main,act_db_repl):
d = a.db.db_path
failed_shutdown_msg = failed_shutdown_db_map.get( str(d), '' )
if failed_shutdown_msg:
# we could NOT change state of this database to full shutdown --> we must NOT recreate it.
# Accumulate error messages in OUT arg (for displaying as diff):
#
out_reset += '\n'.join( failed_shutdown_msg )
else:
try:
dbx = create_database(str(d), user = a.db.user)
dbx.close()
with a.connect_server() as srv:
srv.database.set_write_mode(database = d, mode = DbWriteMode.ASYNC)
srv.database.set_sweep_interval(database = d, interval = 0)
if a == act_db_repl:
srv.database.set_replica_mode(database = d, mode = ReplicaMode.READ_ONLY)
else:
with a.db.connect() as con:
con.execute_immediate('alter database enable publication')
con.execute_immediate('alter database include all to publication')
con.commit()
except DatabaseError as e:
out_reset += e.__str__()
# Must remain EMPTY:
####################
return out_reset
#--------------------------------------------
def watch_replica( a: Action, max_allowed_time_for_wait, ddl_ready_query = '', isql_check_script = '', replica_expected_out = ''):
retcode = 1;
ready_to_check = False
if ddl_ready_query:
with a.db.connect(no_db_triggers = True) as con:
with con.cursor() as cur:
for i in range(0,max_allowed_time_for_wait):
cur.execute(ddl_ready_query)
count_actual = cur.fetchone()
if count_actual:
ready_to_check = True
break
else:
con.rollback()
time.sleep(1)
else:
ready_to_check = True
if not ready_to_check:
print( f'UNEXPECTED. Initial check query did not return any rows for {max_allowed_time_for_wait} seconds.' )
print('Initial check query:')
print(ddl_ready_query)
return
final_check_pass = False
if isql_check_script:
retcode = 0
for i in range(max_allowed_time_for_wait):
a.reset()
a.expected_stdout = replica_expected_out
a.isql(switches=['-q', '-nod'], input = isql_check_script, combine_output = True)
if a.return_code:
# "Token unknown", "Name longer than database column size" etc: we have to
# immediately break from this loop because isql_check_script is incorrect!
break
if a.clean_stdout == a.clean_expected_stdout:
final_check_pass = True
break
if i < max_allowed_time_for_wait-1:
time.sleep(1)
if not final_check_pass:
print(f'UNEXPECTED. Final check query did not return expected dataset for {max_allowed_time_for_wait} seconds.')
print('Final check query:')
print(isql_check_script)
print('Expected output:')
print(a.clean_expected_stdout)
print('Actual output:')
print(a.clean_stdout)
print(f'ISQL return_code={a.return_code}')
print(f'Waited for {i} seconds')
a.reset()
else:
final_check_pass = True
return
#--------------------------------------------
def drop_db_objects(act_db_main: Action, act_db_repl: Action, capsys):
# return initial state of master DB:
# remove all DB objects (tables, views, ...):
#
db_main_meta, db_repl_meta = '', ''
for a in (act_db_main,act_db_repl):
if a == act_db_main:
sql_clean = (a.files_dir / 'drop-all-db-objects.sql').read_text()
a.expected_stdout = """
Start removing objects
Finish. Total objects removed
"""
a.isql(switches=['-q', '-nod'], input = sql_clean, combine_output = True)
if a.clean_stdout == a.clean_expected_stdout:
a.reset()
else:
print(a.clean_expected_stdout)
a.reset()
break
# NB: one need to remember that rdb$system_flag can be NOT ONLY 1 for system used objects!
# For example, it has value =3 for triggers that are created to provide CHECK-constraints,
# Custom DB objects always have rdb$system_flag = 0 (or null for some very old databases).
# We can be sure that there are no custom DB objects if following query result is NON empty:
#
ddl_ready_query = """
select 1
from rdb$database
where NOT exists (
select custom_db_object_flag
from (
select rt.rdb$system_flag as custom_db_object_flag from rdb$triggers rt
UNION ALL
select rt.rdb$system_flag from rdb$relations rt
UNION ALL
select rt.rdb$system_flag from rdb$functions rt
UNION ALL
select rt.rdb$system_flag from rdb$procedures rt
UNION ALL
select rt.rdb$system_flag from rdb$exceptions rt
UNION ALL
select rt.rdb$system_flag from rdb$fields rt
UNION ALL
select rt.rdb$system_flag from rdb$collations rt
UNION ALL
select rt.rdb$system_flag from rdb$generators rt
UNION ALL
select rt.rdb$system_flag from rdb$roles rt
UNION ALL
select rt.rdb$system_flag from rdb$auth_mapping rt
UNION ALL
select 1 from sec$users s
where upper(s.sec$user_name) <> 'SYSDBA'
) t
where coalesce(t.custom_db_object_flag,0) = 0
)
"""
##############################################################################
### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
##############################################################################
watch_replica( act_db_repl, MAX_TIME_FOR_WAIT_DATA_IN_REPLICA, ddl_ready_query)
# Must be EMPTY:
print(capsys.readouterr().out)
db_main_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
else:
db_repl_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
if RUN_SWEEP_AT_END:
# Following sweep was mandatory during 2021...2022. Problem was fixed:
# * for FB 4.x: 26-jan-2023, commit 2ed48a62c60c029cd8cb2b0c914f23e1cb56580a
# * for FB 5.x: 20-apr-2023, commit 5af209a952bd2ec3723d2c788f2defa6b740ff69
# (log message: 'Avoid random generation of field IDs, respect the user-specified order instead').
# Until this problem was solved, subsequent runs of this test caused to fail with:
# 'ERROR: Record format with length NN is not found for table TEST'
#
a.gfix(switches=['-sweep', a.db.dsn])
# Final point: metadata must become equal:
#
diff_meta = ''.join(unified_diff( \
[x for x in db_main_meta.splitlines() if 'CREATE DATABASE' not in x],
[x for x in db_repl_meta.splitlines() if 'CREATE DATABASE' not in x])
)
# Must be EMPTY:
print(diff_meta)
#--------------------------------------------
@pytest.mark.replication
@pytest.mark.version('>=4.0.5')
def test_1(act_db_main: Action, act_db_repl: Action, tmp_nonpriv_user: User, capsys):
out_prep, out_main, out_drop, dml_err = '', '', '', ''
# Obtain full path + filename for DB_MAIN and DB_REPL aliases.
# NOTE: we must NOT use 'a.db.db_path' for ALIASED databases!
# It will return '.' rather than full path+filename.
# Use only con.info.name for that!
#
db_info = {}
for a in (act_db_main, act_db_repl):
with a.db.connect() as con:
db_info[a, 'db_full_path'] = con.info.name
# Must be EMPTY:
out_prep = capsys.readouterr().out
if out_prep:
# Some problem raised during change DB header(s)
pass
else:
sql_init = f"""
set bail on;
set wng off;
recreate table test (
id int generated by default as identity constraint test_pk primary key
,who varchar(31) default current_user
,dts timestamp default 'now'
-- NB: we have to specify 'sub_tupe' here, otherwise attempt to insert data in this blob will fail with
-- "TypeError: String value is not acceptable type for a non-textual BLOB column."
-- (in cur.execute() with parametrized expr, see below).
-- It looks strange but this error does NOT appear in case if we use con.execute_immediate().
,bdata blob sub_type 1
);
commit;
recreate view v_test as select id, who, bdata from test;
revoke all on all from {tmp_nonpriv_user.name};
grant insert on v_test to {tmp_nonpriv_user.name};
commit;
recreate table t_completed(id int primary key);
commit;
"""
act_db_main.isql(switches=['-q'], input = sql_init, combine_output = True)
out_prep = act_db_main.clean_stdout
act_db_main.reset()
if out_prep:
# Some problem raised during init_sql execution
pass
else:
# Query to be used for check that all DB objects present in replica (after last DML statement completed on master DB):
ddl_ready_query = "select 1 from rdb$relations where rdb$relation_name = upper('t_completed')"
##############################################################################
### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
##############################################################################
watch_replica( act_db_repl, MAX_TIME_FOR_WAIT_DATA_IN_REPLICA, ddl_ready_query)
# Must be EMPTY:
out_prep = capsys.readouterr().out
test_blob_data = 'test blob data'
if out_prep:
# Some problem raised with delivering DDL changes to replica
pass
else:
with act_db_main.db.connect(user = tmp_nonpriv_user.name, password = tmp_nonpriv_user.password) as con: # , charset = 'utf-8') as con:
cur = con.cursor()
try:
# ERROR: no permission for SELECT access to ...
# Effective user is ...
# ERROR: Replication is stopped due to critical error(s)
#
cur.execute("insert into v_test(bdata) values(?)", (test_blob_data,))
con.commit()
except DatabaseError as e:
dml_err = e.__str__()
if dml_err:
# Some problem raised with writing blob into master DB
pass
else:
# No errors must be now. We have to wait now until blob from MASTER be delivered to REPLICA.
# Query to be used that replica DB contains all expected data (after last DML statement completed on master DB):
isql_check_script = """
set bail on;
set list on;
set count on;
select
rdb$get_context('SYSTEM','REPLICA_MODE') replica_mode
,who
,cast(bdata as varchar(32760)) as bdata
from v_test;
"""
isql_expected_out = f"""
REPLICA_MODE READ-ONLY
WHO {tmp_nonpriv_user.name.upper()}
BDATA {test_blob_data}
Records affected: 1
"""
##############################################################################
### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
##############################################################################
watch_replica( act_db_repl, MAX_TIME_FOR_WAIT_DATA_IN_REPLICA, '', isql_check_script, isql_expected_out)
# Must be EMPTY:
out_main = capsys.readouterr().out
drop_db_objects(act_db_main, act_db_repl, capsys)
# Must be EMPTY:
out_drop = capsys.readouterr().out
if [ x for x in (out_prep, out_main, dml_err, out_drop) if x.strip() ]:
# We have a problem either with DDL/DML or with dropping DB objects.
# First, we have to RECREATE both master and slave databases
# (otherwise further execution of this test or other replication-related tests most likely will fail):
out_reset = reset_replication(act_db_main, act_db_repl, db_info[act_db_main,'db_full_path'], db_info[act_db_repl,'db_full_path'])
# Next, we display out_main, out_drop and out_reset:
#
print('Problem(s) detected:')
if out_prep.strip():
print('out_prep:')
print(out_prep)
if out_main.strip():
print('out_main:')
print(out_main)
if dml_err.strip():
print('dml_err:')
print(dml_err)
if out_drop.strip():
print('out_drop:')
print(out_drop)
if out_reset.strip():
print('out_reset:')
print(out_reset)
assert '' == capsys.readouterr().out