6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-23 05:53:06 +01:00
firebird-qa/tests/functional/syspriv/test_use_gstat_utility.py

387 lines
13 KiB
Python

#coding:utf-8
"""
ID: syspriv.use-gstat-utility
TITLE: Check ability to obtain database statistics
DESCRIPTION:
We create user and grant system privileges USE_GSTAT_UTILITY and IGNORE_DB_TRIGGERS to him.
NB: pivilege 'IGNORE_DB_TRIGGERS' is required for get full db statistics, otherwise we get:
Unable to perform operation: system privilege IGNORE_DB_TRIGGERS is missing
Then we check that this user can extract DB statistics in TWO ways:
1) common data except encryption info (it is called here 'base "sts_" output')
2) only encryption info (I don't know why "sts_encryption" can not be used together with other switches...)
Both these actions should not produce any error.
Also, logs of them should contain all needed 'check words' and patterns - and we check this.
Finally, we ensure that when user U01 gathered DB statistics then db-level trigger did NOT fire.
FBTEST: functional.syspriv.use_gstat_utility
NOTES:
[31.10.2019] added check for generator pages in encryption block.
[18.05.2022] refactored to be used in firebird-qa suite.
Trace must have following lines when NON-privileged user gathers DB statistics:
1) for common DB stat:
... START_SERVICE
service_mgr, (Service 000000000FE21DC0, TMP_SYSPRIV_USER:TMP_ROLE_FOR_USE_GSTAT_UTILITY, TCPv6:::1/55587, ...python.exe:16972)
"Database Stats"
-role TMP_ROLE_FOR_USE_GSTAT_UTILITY ...test.fdb -DATA -INDEX -SYSTEM -RECORD
2) for encryption-related:
... START_SERVICE
service_mgr, (Service 000000000FE21DC0, TMP_SYSPRIV_USER:TMP_ROLE_FOR_USE_GSTAT_UTILITY, TCPv6:::1/55588, ...python.exe:16972)
"Database Stats"
-role TMP_ROLE_FOR_USE_GSTAT_UTILITY ...test.fdb -ENCRYPTION
Checked on 4.0.1.2692, 5.0.0.489.
"""
import re
import pytest
from firebird.qa import *
from firebird.driver import SrvStatFlag as stf
substitutions = [('[ \t]+', ' ')]
db = db_factory()
tmp_user = user_factory('db', name='tmp_syspriv_user', password='123')
tmp_role = role_factory('db', name='tmp_role_for_use_gstat_utility')
act = python_act('db', substitutions=substitutions)
dbstat_common_expected_output="True"
dbstat_encryption_expected_output="True"
final_chk_expected_stdout = "ATT_LOG_COUNT 0"
@pytest.mark.version('>=4.0')
def test_1(act: Action, tmp_user: User, tmp_role:Role, capsys):
init_script = f"""
set wng off;
set bail on;
set list on;
set count on;
create or alter view v_check as
select
mon$database_name
,current_user as who_ami
,r.rdb$role_name
,rdb$role_in_use(r.rdb$role_name) as RDB_ROLE_IN_USE
,r.rdb$system_privileges
from mon$database m cross join rdb$roles r;
commit;
alter user {tmp_user.name} revoke admin role;
revoke all on all from {tmp_user.name};
commit;
create or alter trigger trg_connect active on connect as
begin
end;
commit;
recreate table att_log (
att_user varchar(255),
att_prot varchar(255)
);
commit;
recreate table test(s char(1000) unique using index test_s_unq);
commit;
insert into test select rpad('', 1000, uuid_to_char(gen_uuid()) ) from rdb$types;
commit;
grant select on v_check to public;
grant select on att_log to public;
--------------------------------- [ !! ] -- do NOT: grant select on test to {tmp_user.name}; -- [ !! ]
commit;
set term ^;
create or alter trigger trg_connect active on connect as
begin
if ( upper(current_user) <> upper('SYSDBA') ) then
in autonomous transaction do
insert into att_log( att_user, att_prot )
select
mon$user
,mon$remote_protocol
from mon$attachments
where mon$user = current_user
;
end
^
set term ;^
commit;
-- Ability to get database statistics.
-- NB: 'IGNORE_DB_TRIGGERS' - required for get full db statistics, otherwise:
-- Unable to perform operation: system privilege IGNORE_DB_TRIGGERS is missing
alter role {tmp_role.name}
set system privileges to USE_GSTAT_UTILITY, IGNORE_DB_TRIGGERS;
commit;
grant default {tmp_role.name} to user {tmp_user.name};
commit;
"""
act.isql(switches=['-q'], input=init_script)
with act.connect_server(user = tmp_user.name, password = tmp_user.password, role = tmp_role.name) as srv:
try:
srv.database.get_statistics(database = act.db.db_path, flags = stf.DATA_PAGES | stf.IDX_PAGES | stf.RECORD_VERSIONS | stf.SYS_RELATIONS, callback=act.print_callback)
except DatabaseError as e:
print(e.__str__())
# Common DB statistics (WITHOUT encryption process details) must contain following phrases:
dbstat_common_mandatory_phrases=[
"rdb$database"
,"rdb$index"
,"primary pointer page"
,"index root page"
,"total formats"
,"total records"
,"total versions"
,"total fragments"
,"compression ratio"
,"pointer pages"
,"data pages"
,"primary pages"
,"empty pages"
,"blobs"
,"swept pages"
,"full pages"
,"fill distribution"
,"0 - 19%"
,"80 - 99%"
]
act.expected_stdout = dbstat_common_expected_output
out_lines = capsys.readouterr().out.lower()
all_found = True
for c in dbstat_common_mandatory_phrases:
if c not in out_lines:
all_found = False
print( f'ERROR. Phrase "{c}" not found in the statistics output' )
break
print(all_found)
act.stdout = capsys.readouterr().out
assert act.clean_stdout == act.clean_expected_stdout
act.reset()
# ----------------------------------------------------------------------------
with act.connect_server(user = tmp_user.name, password = tmp_user.password, role = tmp_role.name) as srv:
try:
srv.database.get_statistics(database = act.db.db_path, flags = stf.ENCRYPTION , callback=act.print_callback)
except DatabaseError as e:
print(e.__str__())
# Every of following PATTERNS must be found in the encryption-related DB statistics output:
dbstat_encryption_mandatory_patterns=[
"data pages: total \\d+, encrypted \\d+, non-crypted \\d+"
,"index pages: total \\d+, encrypted \\d+, non-crypted \\d+"
,"blob pages: total \\d+, encrypted \\d+, non-crypted \\d+"
,"generator pages: total \\d+, encrypted \\d+, non-crypted \\d+" # this was added in octomer-2019
]
dbstat_encryption_mandatory_patterns = [re.compile(s) for s in dbstat_encryption_mandatory_patterns]
act.expected_stdout = dbstat_encryption_expected_output
out_lines = capsys.readouterr().out.lower()
all_found = True
for p in dbstat_encryption_mandatory_patterns:
if not p.search(out_lines):
all_found = False
print(f'ERROR. Pattern {p} not found in the ENCRYPTION-related output:')
print(out_lines)
break
print(all_found)
act.stdout = capsys.readouterr().out
assert act.clean_stdout == act.clean_expected_stdout
act.reset()
# ----------------------------------------------------------------------------
# Finally, we check that connections of tmp_user were not logged
# because he has granted privilege IGNORE_DB_TRIGGERS:
act.isql(switches=['-q'], input='set list on;select count(*) as att_log_count from att_log;')
act.expected_stdout = final_chk_expected_stdout
assert act.clean_stdout == act.clean_expected_stdout
act.reset()
# test_script_1
#---
#
# import os
# import subprocess
# import re
#
# db_file=db_conn.database_name
# db_conn.close()
#
# #--------------------------------------------
#
# def flush_and_close( file_handle ):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
# #--------------------------------------------
#
# def cleanup( f_names_list ):
# global os
# for f in f_names_list:
# if type(f) == file:
# del_name = f.name
# elif type(f) == str:
# del_name = f
# else:
# print('Unrecognized type of element:', f, ' - can not be treated as file.')
# del_name = None
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
# #--------------------------------------------
#
# f_db_stat_log=open( os.path.join(context['temp_directory'],'tmp_dbstat.log'), 'w')
# f_db_stat_err=open( os.path.join(context['temp_directory'],'tmp_dbstat.err'), 'w')
# subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr",
# "user","U01","password","123",
# "action_db_stats",
# "dbname", db_file,
# "sts_record_versions",
# "sts_data_pages",
# "sts_idx_pages",
# "sts_sys_relations"
# ],
# stdout=f_db_stat_log,
# stderr=f_db_stat_err
# )
#
# flush_and_close( f_db_stat_log )
# flush_and_close( f_db_stat_err )
#
# # Separate call for get encryption statistics:
#
# f_db_encr_log=open( os.path.join(context['temp_directory'],'tmp_dbencr.log'), 'w')
# f_db_encr_err=open( os.path.join(context['temp_directory'],'tmp_dbencr.err'), 'w')
#
# subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr",
# "user","U01","password","123",
# "action_db_stats",
# "dbname", db_file,
# "sts_encryption"
# ],
# stdout=f_db_encr_log,
# stderr=f_db_encr_err
# )
#
# flush_and_close( f_db_encr_log )
# flush_and_close( f_db_encr_err )
#
# #-----------------------
#
#
# # Check content of logs:
# #######
#
# # Must be EMPTY:
# with open( f_db_stat_err.name,'r') as f:
# for line in f:
# print('UNEXPECTED GSTAT STDERR: '+line.upper())
#
# # Pointer pages: 1, data page slots: 2
# # Data pages: 2, average fill: 8%
# # Primary pages: 1, secondary pages: 1, swept pages: 0
# # Empty pages: 0, full pages: 0
# # Blobs: 9, total length: 160, blob pages: 0
#
# # Must contain:
# check_words=[
# "rdb$database"
# ,"rdb$index"
# ,"primary pointer page"
# ,"index root page"
# ,"total formats"
# ,"total records"
# ,"total versions"
# ,"total fragments"
# ,"compression ratio"
# ,"pointer pages"
# ,"data pages"
# ,"primary pages"
# ,"empty pages"
# ,"blobs"
# ,"swept pages"
# ,"full pages"
# ,"fill distribution"
# ,"0 - 19%"
# ,"80 - 99%"
# ]
#
# f = open( f_db_stat_log.name, 'r')
# lines = f.read().lower()
# for i in range(len(check_words)):
# if check_words[i].lower() in lines:
# print( 'Found in base "sts_" output: ' + check_words[i].lower() )
# else:
# print( 'UNEXPECTEDLY NOT found in base "sts_" output: ' + check_words[i].lower() )
# flush_and_close( f )
#
#
# # Must be EMPTY:
# with open( f_db_encr_err.name,'r') as f:
# for line in f:
# print('UNEXPECTED STS_ENCRYPTION STDERR: '+line.upper())
#
#
# # Encryption statistics should be like this:
# # ---------------------
# # Data pages: total NNN, encrypted 0, non-crypted NNN
# # Index pages: total MMM, encrypted 0, non-crypted MMM
# # Blob pages: total 0, encrypted 0, non-crypted 0
# # Generator pages: total PPP, encrypted 0, non-crypted PPP ------------- 31.10.2019 NB: THIS WAS ADDED RECENLTLY
#
# enc_pattern=re.compile(".*total[\\s]+[\\d]+,[\\s]+encrypted[\\s]+[\\d]+,[\\s]+non-crypted[\\s]+[\\d]+")
# with open( f_db_encr_log.name,'r') as f:
# for line in f:
# # if enc_pattern.match(line):
# if 'encrypted' in line:
# print('Found in "sts_encryption" output: ' + line.lower())
#
#
# # Cleanup:
# ##########
#
# sql_final='''
# set list on;
# set count on;
# select * from att_log; -- this should output: "Records affected: 0" because U01 must ignore DB-level trigger
# commit;
# drop user u01;
# commit;
# '''
# runProgram('isql',[dsn,'-user',user_name, '-pas', user_password], sql_final)
#
# cleanup( (f_db_stat_log, f_db_stat_err, f_db_encr_log, f_db_encr_err) )
#
#---