6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-02-02 02:40:42 +01:00

Added/Updated tests\functional\replication\test_permission_error_on_ddl_issued_by_non_sysdba.py: Reimplemented. We have to query replica database for presense of data that we know there must appear. Replication log must NOT be scanned every N seconds for that task.

This commit is contained in:
pavel-zotov 2023-04-17 18:59:09 +03:00
parent ac4a58b2fb
commit 313cb0b42d

View File

@ -2,7 +2,7 @@
"""
ID: replication.permission_error_on_ddl_issued_by_non_sysdba
ISSUE: 6856
ISSUE: https://github.com/FirebirdSQL/firebird/issues/6856
TITLE: Permission error with replication
DESCRIPTION:
Test run actions which are specified in the ticket (create user with granting admin access to him, etc).
@ -39,27 +39,25 @@ NOTES:
"""
import os
import shutil
import re
from difflib import unified_diff
from pathlib import Path
import time
import pytest
from firebird.qa import *
from firebird.driver import connect, create_database, DbWriteMode, ReplicaMode
from firebird.driver import connect, create_database, DbWriteMode, ReplicaMode, ShutdownMode, ShutdownMethod, DatabaseError
# QA_GLOBALS -- dict, is defined in qa/plugin.py, obtain settings
# from act.files_dir/'test_config.ini':
repl_settings = QA_GLOBALS['replication']
MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = int(repl_settings['max_time_for_wait_segment_in_log'])
MAX_TIME_FOR_WAIT_DATA_IN_REPLICA = int(repl_settings['max_time_for_wait_data_in_replica'])
MAIN_DB_ALIAS = repl_settings['main_db_alias']
REPL_DB_ALIAS = repl_settings['repl_db_alias']
db_main = db_factory( filename = '#' + MAIN_DB_ALIAS, do_not_create = True, do_not_drop = True)
db_repl = db_factory( filename = '#' + REPL_DB_ALIAS, do_not_create = True, do_not_drop = True)
#create or alter user dba_helper password '123' using plugin Srp grant admin role;
tmp_dba = user_factory('db_main', name='tmp_user_gh_6856', password='123', plugin = 'Srp', admin = True)
substitutions = [('Start removing objects in:.*', 'Start removing objects'),
@ -88,227 +86,311 @@ def cleanup_folder(p):
#--------------------------------------------
def wait_for_data_in_replica( act_db_main: Action, max_allowed_time_for_wait, prefix_msg = '' ):
def reset_replication(act_db_main, act_db_repl, db_main_file, db_repl_file):
out_reset = ''
replication_log = act_db_main.home_dir / 'replication.log'
with act_db_main.connect_server() as srv:
replold_lines = []
with open(replication_log, 'r') as f:
replold_lines = f.readlines()
# !! IT IS ASSUMED THAT REPLICATION FOLDERS ARE IN THE SAME DIR AS <DB_MAIN> !!
# DO NOT use 'a.db.db_path' for ALIASED database!
# It will return '.' rather than full path+filename.
with act_db_main.db.connect(no_db_triggers = True) as con:
with con.cursor() as cur:
cur.execute("select rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') from rdb$database")
last_generated_repl_segment = cur.fetchone()[0]
repl_root_path = Path(db_main_file).parent
repl_jrn_sub_dir = repl_settings['journal_sub_dir']
repl_arc_sub_dir = repl_settings['archive_sub_dir']
# VERBOSE: Segment 1 (2582 bytes) is replicated in 1 second(s), deleting the file
# VERBOSE: Segment 2 (200 bytes) is replicated in 82 ms, deleting the file
p_successfully_replicated = re.compile( f'\\+\\s+verbose:\\s+segment\\s+{last_generated_repl_segment}\\s+\\(\\d+\\s+bytes\\)\\s+is\\s+replicated.*deleting', re.IGNORECASE)
for f in (db_main_file, db_repl_file):
# Method db.drop() changes LINGER to 0, issues 'delete from mon$att' with suppressing exceptions
# and calls 'db.drop_database()' (also with suppressing exceptions).
# We change DB state to FULL SHUTDOWN instead of call action.db.drop() because
# this is more reliable (it kills all attachments in all known cases and does not use mon$ table)
#
try:
srv.database.shutdown(database = f, mode = ShutdownMode.FULL, method = ShutdownMethod.FORCED, timeout = 0)
except DatabaseError as e:
out_reset += e.__str__()
# VERBOSE: Segment 16 replication failure at offset 33628
p_replication_failure = re.compile('segment\\s+\\d+\\s+replication\\s+failure', re.IGNORECASE)
# REMOVE db file from disk:
###########################
os.unlink(f)
found_required_message = False
found_replfail_message = False
found_common_error_msg = False
# Clean folders repl_journal and repl_archive: remove all files from there.
for p in (repl_jrn_sub_dir,repl_arc_sub_dir):
if cleanup_folder(repl_root_path / p) > 0:
out_reset += f"Directory {str(p)} remains non-empty.\n"
for i in range(0,max_allowed_time_for_wait):
if out_reset == '':
for a in (act_db_main,act_db_repl):
d = a.db.db_path
time.sleep(1)
try:
dbx = create_database(str(d), user = a.db.user)
dbx.close()
with a.connect_server() as srv:
srv.database.set_write_mode(database = d, mode = DbWriteMode.ASYNC)
srv.database.set_sweep_interval(database = d, interval = 0)
if a == act_db_repl:
srv.database.set_replica_mode(database = d, mode = ReplicaMode.READ_ONLY)
else:
with a.db.connect() as con:
con.execute_immediate('alter database enable publication')
con.execute_immediate('alter database include all to publication')
con.commit()
except DatabaseError as e:
out_reset += e.__str__()
# Must remain EMPTY:
return out_reset
# Get content of fb_home replication.log _after_ isql finish:
with open(replication_log, 'r') as f:
diff_data = unified_diff(
replold_lines,
f.readlines()
)
#--------------------------------------------
for k,d in enumerate(diff_data):
if p_successfully_replicated.search(d):
# We FOUND phrase "VERBOSE: Segment <last_generated_repl_segment> ... is replicated ..." in the replication log.
# This is expected success, break!
print( (prefix_msg + ' ' if prefix_msg else '') + f'FOUND message about replicated segment N {last_generated_repl_segment}.' )
found_required_message = True
def watch_replica( a: Action, max_allowed_time_for_wait, ddl_ready_query = '', isql_check_script = '', replica_expected_out = ''):
retcode = 1;
ready_to_check = False
if ddl_ready_query:
with a.db.connect(no_db_triggers = True) as con:
with con.cursor() as cur:
for i in range(0,max_allowed_time_for_wait):
cur.execute(ddl_ready_query)
count_actual = cur.fetchone()
if count_actual:
ready_to_check = True
break
else:
con.rollback()
time.sleep(1)
else:
ready_to_check = True
if not ready_to_check:
print( f'UNEXPECTED. Initial check query did not return any rows for {max_allowed_time_for_wait} seconds.' )
print('Initial check query:')
print(ddl_ready_query)
return
final_check_pass = False
if isql_check_script:
retcode = 0
for i in range(max_allowed_time_for_wait):
a.reset()
a.expected_stdout = replica_expected_out
a.isql(switches=['-q', '-nod'], input = isql_check_script, combine_output = True)
if a.return_code:
# "Token unknown", "Name longer than database column size" etc: we have to
# immediately break from this loop because isql_check_script is incorrect!
break
if a.clean_stdout == a.clean_expected_stdout:
final_check_pass = True
break
if i < max_allowed_time_for_wait-1:
time.sleep(1)
if not final_check_pass:
print(f'UNEXPECTED. Final check query did not return expected dataset for {max_allowed_time_for_wait} seconds.')
print('Final check query:')
print(isql_check_script)
print('Expected output:')
print(a.clean_expected_stdout)
print('Actual output:')
print(a.clean_stdout)
print(f'ISQL return_code={a.return_code}')
print(f'Waited for {i} seconds')
a.reset()
else:
final_check_pass = True
return
#--------------------------------------------
def drop_db_objects(act_db_main: Action, act_db_repl: Action, capsys):
# return initial state of master DB:
# remove all DB objects (tables, views, ...):
#
db_main_meta, db_repl_meta = '', ''
for a in (act_db_main,act_db_repl):
if a == act_db_main:
sql_clean = (a.files_dir / 'drop-all-db-objects.sql').read_text()
a.expected_stdout = """
Start removing objects
Finish. Total objects removed
"""
a.isql(switches=['-q', '-nod'], input = sql_clean, combine_output = True)
if a.clean_stdout == a.clean_expected_stdout:
a.reset()
else:
print(a.clean_expected_stdout)
a.reset()
break
if p_replication_failure.search(d):
print( (prefix_msg + ' ' if prefix_msg else '') + '@@@ SEGMENT REPLICATION FAILURE @@@ ' + d )
found_replfail_message = True
break
# NB: one need to remember that rdb$system_flag can be NOT ONLY 1 for system used objects!
# For example, it has value =3 for triggers that are created to provide CHECK-constraints,
# Custom DB objects always have rdb$system_flag = 0 (or null for some very old databases).
# We can be sure that there are no custom DB objects if following query result is NON empty:
#
ddl_ready_query = """
select 1
from rdb$database
where NOT exists (
select custom_db_object_flag
from (
select rt.rdb$system_flag as custom_db_object_flag from rdb$triggers rt
UNION ALL
select rt.rdb$system_flag from rdb$relations rt
UNION ALL
select rt.rdb$system_flag from rdb$functions rt
UNION ALL
select rt.rdb$system_flag from rdb$procedures rt
UNION ALL
select rt.rdb$system_flag from rdb$exceptions rt
UNION ALL
select rt.rdb$system_flag from rdb$fields rt
UNION ALL
select rt.rdb$system_flag from rdb$collations rt
UNION ALL
select rt.rdb$system_flag from rdb$generators rt
UNION ALL
select rt.rdb$system_flag from rdb$roles rt
UNION ALL
select rt.rdb$system_flag from rdb$auth_mapping rt
UNION ALL
select 1 from sec$users s
where upper(s.sec$user_name) <> 'SYSDBA'
) t
where coalesce(t.custom_db_object_flag,0) = 0
)
"""
if 'ERROR:' in d:
print( (prefix_msg + ' ' if prefix_msg else '') + '@@@ REPLICATION ERROR @@@ ' + d )
found_common_error_msg = True
break
if found_required_message or found_replfail_message or found_common_error_msg:
break
##############################################################################
### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
##############################################################################
watch_replica( act_db_repl, MAX_TIME_FOR_WAIT_DATA_IN_REPLICA, ddl_ready_query)
if not found_required_message:
print('UNEXPECTED RESULT: no message about replicating segment N {last_generated_repl_segment} for {max_allowed_time_for_wait} seconds.')
# Must be EMPTY:
print(capsys.readouterr().out)
db_main_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
else:
db_repl_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
######################
### A C H T U N G ###
######################
# MANDATORY, OTHERWISE REPLICATION GETS STUCK ON SECOND RUN OF THIS TEST
# WITH 'ERROR: Record format with length NN is not found for table TEST':
a.gfix(switches=['-sweep', a.db.dsn])
# Final point: metadata must become equal:
#
diff_meta = ''.join(unified_diff( \
[x for x in db_main_meta.splitlines() if 'CREATE DATABASE' not in x],
[x for x in db_repl_meta.splitlines() if 'CREATE DATABASE' not in x])
)
# Must be EMPTY:
print(diff_meta)
#--------------------------------------------
@pytest.mark.version('>=4.0.1')
def test_1(act_db_main: Action, act_db_repl: Action, tmp_dba: User, capsys):
#assert '' == capsys.readouterr().out
###################
somewhat_failed = 1
###################
try:
out_prep, out_main, out_drop = '', '', ''
sql_init = f'''
set bail on;
# Obtain full path + filename for DB_MAIN and DB_REPL aliases.
# NOTE: we must NOT use 'a.db.db_path' for ALIASED databases!
# It will return '.' rather than full path+filename.
# Use only con.info.name for that!
#
db_info = {}
for a in (act_db_main, act_db_repl):
with a.db.connect(no_db_triggers = True) as con:
if a == act_db_main and a.vars['server-arch'] == 'Classic' and os.name != 'nt':
pytest.skip("Waiting for FIX: 'Engine is shutdown' in replication log for CS. Linux only.")
db_info[a, 'db_full_path'] = con.info.name
sql_init = f"""
execute block as
begin
execute statement 'create or alter view v_test as select 1 x from rdb$database'
with autonomous transaction
as user '{tmp_dba.name}' password '{tmp_dba.password}' role 'rdb$admin';
end
"""
with act_db_main.db.connect(no_db_triggers = True) as con:
con.execute_immediate(f'grant rdb$admin to user {tmp_dba.name}')
con.commit()
con.execute_immediate(sql_init)
con.commit()
# Must be EMPTY:
out_prep = capsys.readouterr().out
if out_prep:
# Some problem raised during initial DDL and/or blob writing
pass
else:
# Query to be used for check that all DB objects present in replica (after last DML statement completed on master DB):
ddl_ready_query = "select 1 from rdb$relations where rdb$relation_name = upper('v_test')"
# Query to be used that replica DB contains all expected data (after last DML statement completed on master DB):
isql_check_script = """
set list on;
set count on;
select
rdb$get_context('SYSTEM','REPLICA_MODE') replica_mode
,v.x
,r.rdb$owner_name
from v_test v
cross join rdb$relations r
where coalesce(r.rdb$system_flag,0) = 0 and r.rdb$relation_name = upper('v_test')
;
"""
grant rdb$admin to user {tmp_dba.name};
set term ^;
execute block as
begin
execute statement 'create or alter view v_test as select 1 x from rdb$database'
with autonomous transaction
as user '{tmp_dba.name}' password '{tmp_dba.password}' role 'rdb$admin';
end
^
set term ;^
commit;
'''
act_db_main.expected_stderr = ''
act_db_main.isql(switches=['-q'], input = sql_init)
assert act_db_main.clean_stderr == act_db_main.clean_expected_stderr
act_db_main.reset()
act_db_main.expected_stdout = 'POINT-1 FOUND message about replicated segment'
isql_expected_out = f"""
REPLICA_MODE READ-ONLY
X 1
RDB$OWNER_NAME {tmp_dba.name.upper()}
Records affected: 1
"""
##############################################################################
### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
##############################################################################
wait_for_data_in_replica( act_db_main, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, 'POINT-1' )
watch_replica( act_db_repl, MAX_TIME_FOR_WAIT_DATA_IN_REPLICA, ddl_ready_query, isql_check_script, isql_expected_out)
# Must be EMPTY:
out_main = capsys.readouterr().out
act_db_main.stdout = capsys.readouterr().out
assert act_db_main.clean_stdout == act_db_main.clean_expected_stdout
act_db_main.reset()
drop_db_objects(act_db_main, act_db_repl, capsys)
#-----------------------------------------------------------------------
for a in (act_db_main,act_db_repl):
db_name = a.db.db_path.upper()
# Must be EMPTY:
out_drop = capsys.readouterr().out
sql_chk = f'''
set list on;
set blob all;
set count on;
select
'{db_name}' as db_name
,v.*
from v_test v;
if [ x for x in (out_prep, out_main, out_drop) if x.strip() ]:
# We have a problem either with DDL/DML or with dropping DB objects.
# First, we have to RECREATE both master and slave databases
# (otherwise further execution of this test or other replication-related tests most likely will fail):
out_reset = reset_replication(act_db_main, act_db_repl, db_info[act_db_main,'db_full_path'], db_info[act_db_repl,'db_full_path'])
select rdb$owner_name
from rdb$relations
where rdb$system_flag is distinct from 1 and rdb$relation_name = upper('v_test');
commit;
'''
for a in (act_db_main,act_db_repl):
a.expected_stdout = f"""
DB_NAME {db_name}
X 1
Records affected: 1
RDB$OWNER_NAME {tmp_dba.name.upper()}
Records affected: 1
"""
a.isql(switches=['-q', '-nod'], input = sql_chk)
assert a.clean_stdout == a.clean_expected_stdout
a.reset()
#---------------------------------------------------
# return initial state of master DB:
# remove all DB objects (tables, views, ...):
# Next, we display out_main, out_drop and out_reset:
#
db_main_meta, db_repl_meta = '', ''
for a in (act_db_main,act_db_repl):
if a == act_db_main:
sql_clean = (a.files_dir / 'drop-all-db-objects.sql').read_text()
a.expected_stdout = """
Start removing objects
Finish. Total objects removed
"""
a.isql(switches=['-q', '-nod'], input = sql_clean, combine_output = True)
assert a.clean_stdout == a.clean_expected_stdout
a.reset()
print('Problem(s) detected:')
if out_prep.strip():
print('out_prep:\n', out_prep)
if out_main.strip():
print('out_main:\n', out_main)
if out_drop.strip():
print('out_drop:\n', out_drop)
if out_reset.strip():
print('out_reset:\n', out_reset)
a.expected_stdout = 'POINT-2 FOUND message about replicated segment'
##############################################################################
### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
##############################################################################
wait_for_data_in_replica( a, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, 'POINT-2' )
a.stdout = capsys.readouterr().out
assert a.clean_stdout == a.clean_expected_stdout
a.reset()
db_main_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
else:
db_repl_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
######################
### A C H T U N G ###
######################
# MANDATORY, OTHERWISE REPLICATION GETS STUCK ON SECOND RUN OF THIS TEST
# WITH 'ERROR: Record format with length NN is not found for table TEST':
a.gfix(switches=['-sweep', a.db.dsn])
# Final point: metadata must become equal:
#
diff_meta = ''.join(unified_diff( \
[x for x in db_main_meta.splitlines() if 'CREATE DATABASE' not in x],
[x for x in db_repl_meta.splitlines() if 'CREATE DATABASE' not in x])
)
assert diff_meta == ''
###################
somewhat_failed = 0
###################
except Exception as e:
pass
if somewhat_failed:
# If any previous assert failed, we have to RECREATE both master and slave databases.
# Otherwise further execution of this test or other replication-related tests most likely will fail.
for a in (act_db_main,act_db_repl):
d = a.db.db_path
a.db.drop()
dbx = create_database(str(d), user = a.db.user)
dbx.close()
with a.connect_server() as srv:
srv.database.set_write_mode(database = d, mode=DbWriteMode.ASYNC)
srv.database.set_sweep_interval(database = d, interval = 0)
if a == act_db_repl:
srv.database.set_replica_mode(database = d, mode = ReplicaMode.READ_ONLY)
else:
with a.db.connect() as con:
# !! IT IS ASSUMED THAT REPLICATION FOLDERS ARE IN THE SAME DIR AS <DB_MAIN> !!
# DO NOT use 'a.db.db_path' for ALIASED database!
# Its '.parent' property will be '.' rather than full path.
# Use only con.info.name for that:
repl_root_path = Path(con.info.name).parent
repl_jrn_sub_dir = repl_settings['journal_sub_dir']
repl_arc_sub_dir = repl_settings['archive_sub_dir']
# Clean folders repl_journal and repl_archive (remove all files from there):
for p in (repl_jrn_sub_dir,repl_arc_sub_dir):
assert cleanup_folder(repl_root_path / p) == 0, f"Directory {str(p)} remains non-empty."
con.execute_immediate('alter database enable publication')
con.execute_immediate('alter database include all to publication')
con.commit()
assert somewhat_failed == 0
assert '' == capsys.readouterr().out