mirror of
https://github.com/FirebirdSQL/firebird-qa.git
synced 2025-01-23 05:53:06 +01:00
387 lines
13 KiB
Python
387 lines
13 KiB
Python
#coding:utf-8
|
|
|
|
"""
|
|
ID: syspriv.use-gstat-utility
|
|
TITLE: Check ability to obtain database statistics
|
|
DESCRIPTION:
|
|
We create user and grant system privileges USE_GSTAT_UTILITY and IGNORE_DB_TRIGGERS to him.
|
|
NB: pivilege 'IGNORE_DB_TRIGGERS' is required for get full db statistics, otherwise we get:
|
|
Unable to perform operation: system privilege IGNORE_DB_TRIGGERS is missing
|
|
|
|
Then we check that this user can extract DB statistics in TWO ways:
|
|
1) common data except encryption info (it is called here 'base "sts_" output')
|
|
2) only encryption info (I don't know why "sts_encryption" can not be used together with other switches...)
|
|
Both these actions should not produce any error.
|
|
Also, logs of them should contain all needed 'check words' and patterns - and we check this.
|
|
Finally, we ensure that when user U01 gathered DB statistics then db-level trigger did NOT fire.
|
|
|
|
FBTEST: functional.syspriv.use_gstat_utility
|
|
|
|
NOTES:
|
|
[31.10.2019] added check for generator pages in encryption block.
|
|
|
|
[18.05.2022] refactored to be used in firebird-qa suite.
|
|
Trace must have following lines when NON-privileged user gathers DB statistics:
|
|
1) for common DB stat:
|
|
... START_SERVICE
|
|
service_mgr, (Service 000000000FE21DC0, TMP_SYSPRIV_USER:TMP_ROLE_FOR_USE_GSTAT_UTILITY, TCPv6:::1/55587, ...python.exe:16972)
|
|
"Database Stats"
|
|
-role TMP_ROLE_FOR_USE_GSTAT_UTILITY ...test.fdb -DATA -INDEX -SYSTEM -RECORD
|
|
2) for encryption-related:
|
|
... START_SERVICE
|
|
service_mgr, (Service 000000000FE21DC0, TMP_SYSPRIV_USER:TMP_ROLE_FOR_USE_GSTAT_UTILITY, TCPv6:::1/55588, ...python.exe:16972)
|
|
"Database Stats"
|
|
-role TMP_ROLE_FOR_USE_GSTAT_UTILITY ...test.fdb -ENCRYPTION
|
|
|
|
Checked on 4.0.1.2692, 5.0.0.489.
|
|
"""
|
|
|
|
import re
|
|
import pytest
|
|
from firebird.qa import *
|
|
from firebird.driver import SrvStatFlag as stf
|
|
|
|
substitutions = [('[ \t]+', ' ')]
|
|
db = db_factory()
|
|
tmp_user = user_factory('db', name='tmp_syspriv_user', password='123')
|
|
tmp_role = role_factory('db', name='tmp_role_for_use_gstat_utility')
|
|
|
|
act = python_act('db', substitutions=substitutions)
|
|
|
|
dbstat_common_expected_output="True"
|
|
dbstat_encryption_expected_output="True"
|
|
final_chk_expected_stdout = "ATT_LOG_COUNT 0"
|
|
|
|
@pytest.mark.version('>=4.0')
|
|
def test_1(act: Action, tmp_user: User, tmp_role:Role, capsys):
|
|
|
|
init_script = f"""
|
|
set wng off;
|
|
set bail on;
|
|
set list on;
|
|
set count on;
|
|
|
|
create or alter view v_check as
|
|
select
|
|
mon$database_name
|
|
,current_user as who_ami
|
|
,r.rdb$role_name
|
|
,rdb$role_in_use(r.rdb$role_name) as RDB_ROLE_IN_USE
|
|
,r.rdb$system_privileges
|
|
from mon$database m cross join rdb$roles r;
|
|
commit;
|
|
|
|
alter user {tmp_user.name} revoke admin role;
|
|
revoke all on all from {tmp_user.name};
|
|
commit;
|
|
|
|
create or alter trigger trg_connect active on connect as
|
|
begin
|
|
end;
|
|
commit;
|
|
|
|
recreate table att_log (
|
|
att_user varchar(255),
|
|
att_prot varchar(255)
|
|
);
|
|
|
|
commit;
|
|
|
|
|
|
recreate table test(s char(1000) unique using index test_s_unq);
|
|
commit;
|
|
|
|
insert into test select rpad('', 1000, uuid_to_char(gen_uuid()) ) from rdb$types;
|
|
commit;
|
|
|
|
grant select on v_check to public;
|
|
grant select on att_log to public;
|
|
--------------------------------- [ !! ] -- do NOT: grant select on test to {tmp_user.name}; -- [ !! ]
|
|
commit;
|
|
|
|
set term ^;
|
|
create or alter trigger trg_connect active on connect as
|
|
begin
|
|
if ( upper(current_user) <> upper('SYSDBA') ) then
|
|
in autonomous transaction do
|
|
insert into att_log( att_user, att_prot )
|
|
select
|
|
mon$user
|
|
,mon$remote_protocol
|
|
from mon$attachments
|
|
where mon$user = current_user
|
|
;
|
|
end
|
|
^
|
|
set term ;^
|
|
commit;
|
|
|
|
-- Ability to get database statistics.
|
|
-- NB: 'IGNORE_DB_TRIGGERS' - required for get full db statistics, otherwise:
|
|
-- Unable to perform operation: system privilege IGNORE_DB_TRIGGERS is missing
|
|
alter role {tmp_role.name}
|
|
set system privileges to USE_GSTAT_UTILITY, IGNORE_DB_TRIGGERS;
|
|
commit;
|
|
grant default {tmp_role.name} to user {tmp_user.name};
|
|
commit;
|
|
"""
|
|
|
|
act.isql(switches=['-q'], input=init_script)
|
|
|
|
with act.connect_server(user = tmp_user.name, password = tmp_user.password, role = tmp_role.name) as srv:
|
|
try:
|
|
srv.database.get_statistics(database = act.db.db_path, flags = stf.DATA_PAGES | stf.IDX_PAGES | stf.RECORD_VERSIONS | stf.SYS_RELATIONS, callback=act.print_callback)
|
|
except DatabaseError as e:
|
|
print(e.__str__())
|
|
|
|
# Common DB statistics (WITHOUT encryption process details) must contain following phrases:
|
|
dbstat_common_mandatory_phrases=[
|
|
"rdb$database"
|
|
,"rdb$index"
|
|
,"primary pointer page"
|
|
,"index root page"
|
|
,"total formats"
|
|
,"total records"
|
|
,"total versions"
|
|
,"total fragments"
|
|
,"compression ratio"
|
|
,"pointer pages"
|
|
,"data pages"
|
|
,"primary pages"
|
|
,"empty pages"
|
|
,"blobs"
|
|
,"swept pages"
|
|
,"full pages"
|
|
,"fill distribution"
|
|
,"0 - 19%"
|
|
,"80 - 99%"
|
|
]
|
|
|
|
act.expected_stdout = dbstat_common_expected_output
|
|
|
|
out_lines = capsys.readouterr().out.lower()
|
|
all_found = True
|
|
for c in dbstat_common_mandatory_phrases:
|
|
if c not in out_lines:
|
|
all_found = False
|
|
print( f'ERROR. Phrase "{c}" not found in the statistics output' )
|
|
break
|
|
print(all_found)
|
|
|
|
act.stdout = capsys.readouterr().out
|
|
assert act.clean_stdout == act.clean_expected_stdout
|
|
act.reset()
|
|
|
|
# ----------------------------------------------------------------------------
|
|
|
|
with act.connect_server(user = tmp_user.name, password = tmp_user.password, role = tmp_role.name) as srv:
|
|
try:
|
|
srv.database.get_statistics(database = act.db.db_path, flags = stf.ENCRYPTION , callback=act.print_callback)
|
|
except DatabaseError as e:
|
|
print(e.__str__())
|
|
|
|
# Every of following PATTERNS must be found in the encryption-related DB statistics output:
|
|
dbstat_encryption_mandatory_patterns=[
|
|
"data pages: total \\d+, encrypted \\d+, non-crypted \\d+"
|
|
,"index pages: total \\d+, encrypted \\d+, non-crypted \\d+"
|
|
,"blob pages: total \\d+, encrypted \\d+, non-crypted \\d+"
|
|
,"generator pages: total \\d+, encrypted \\d+, non-crypted \\d+" # this was added in octomer-2019
|
|
]
|
|
dbstat_encryption_mandatory_patterns = [re.compile(s) for s in dbstat_encryption_mandatory_patterns]
|
|
|
|
act.expected_stdout = dbstat_encryption_expected_output
|
|
out_lines = capsys.readouterr().out.lower()
|
|
all_found = True
|
|
for p in dbstat_encryption_mandatory_patterns:
|
|
if not p.search(out_lines):
|
|
all_found = False
|
|
print(f'ERROR. Pattern {p} not found in the ENCRYPTION-related output:')
|
|
print(out_lines)
|
|
break
|
|
print(all_found)
|
|
|
|
act.stdout = capsys.readouterr().out
|
|
assert act.clean_stdout == act.clean_expected_stdout
|
|
act.reset()
|
|
|
|
# ----------------------------------------------------------------------------
|
|
|
|
# Finally, we check that connections of tmp_user were not logged
|
|
# because he has granted privilege IGNORE_DB_TRIGGERS:
|
|
|
|
act.isql(switches=['-q'], input='set list on;select count(*) as att_log_count from att_log;')
|
|
act.expected_stdout = final_chk_expected_stdout
|
|
assert act.clean_stdout == act.clean_expected_stdout
|
|
act.reset()
|
|
|
|
|
|
|
|
|
|
|
|
# test_script_1
|
|
#---
|
|
#
|
|
# import os
|
|
# import subprocess
|
|
# import re
|
|
#
|
|
# db_file=db_conn.database_name
|
|
# db_conn.close()
|
|
#
|
|
# #--------------------------------------------
|
|
#
|
|
# def flush_and_close( file_handle ):
|
|
# # https://docs.python.org/2/library/os.html#os.fsync
|
|
# # If you're starting with a Python file object f,
|
|
# # first do f.flush(), and
|
|
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
|
|
# global os
|
|
#
|
|
# file_handle.flush()
|
|
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
|
|
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
|
|
# os.fsync(file_handle.fileno())
|
|
# file_handle.close()
|
|
#
|
|
# #--------------------------------------------
|
|
#
|
|
# def cleanup( f_names_list ):
|
|
# global os
|
|
# for f in f_names_list:
|
|
# if type(f) == file:
|
|
# del_name = f.name
|
|
# elif type(f) == str:
|
|
# del_name = f
|
|
# else:
|
|
# print('Unrecognized type of element:', f, ' - can not be treated as file.')
|
|
# del_name = None
|
|
#
|
|
# if del_name and os.path.isfile( del_name ):
|
|
# os.remove( del_name )
|
|
#
|
|
# #--------------------------------------------
|
|
#
|
|
# f_db_stat_log=open( os.path.join(context['temp_directory'],'tmp_dbstat.log'), 'w')
|
|
# f_db_stat_err=open( os.path.join(context['temp_directory'],'tmp_dbstat.err'), 'w')
|
|
# subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr",
|
|
# "user","U01","password","123",
|
|
# "action_db_stats",
|
|
# "dbname", db_file,
|
|
# "sts_record_versions",
|
|
# "sts_data_pages",
|
|
# "sts_idx_pages",
|
|
# "sts_sys_relations"
|
|
# ],
|
|
# stdout=f_db_stat_log,
|
|
# stderr=f_db_stat_err
|
|
# )
|
|
#
|
|
# flush_and_close( f_db_stat_log )
|
|
# flush_and_close( f_db_stat_err )
|
|
#
|
|
# # Separate call for get encryption statistics:
|
|
#
|
|
# f_db_encr_log=open( os.path.join(context['temp_directory'],'tmp_dbencr.log'), 'w')
|
|
# f_db_encr_err=open( os.path.join(context['temp_directory'],'tmp_dbencr.err'), 'w')
|
|
#
|
|
# subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr",
|
|
# "user","U01","password","123",
|
|
# "action_db_stats",
|
|
# "dbname", db_file,
|
|
# "sts_encryption"
|
|
# ],
|
|
# stdout=f_db_encr_log,
|
|
# stderr=f_db_encr_err
|
|
# )
|
|
#
|
|
# flush_and_close( f_db_encr_log )
|
|
# flush_and_close( f_db_encr_err )
|
|
#
|
|
# #-----------------------
|
|
#
|
|
#
|
|
# # Check content of logs:
|
|
# #######
|
|
#
|
|
# # Must be EMPTY:
|
|
# with open( f_db_stat_err.name,'r') as f:
|
|
# for line in f:
|
|
# print('UNEXPECTED GSTAT STDERR: '+line.upper())
|
|
#
|
|
# # Pointer pages: 1, data page slots: 2
|
|
# # Data pages: 2, average fill: 8%
|
|
# # Primary pages: 1, secondary pages: 1, swept pages: 0
|
|
# # Empty pages: 0, full pages: 0
|
|
# # Blobs: 9, total length: 160, blob pages: 0
|
|
#
|
|
# # Must contain:
|
|
# check_words=[
|
|
# "rdb$database"
|
|
# ,"rdb$index"
|
|
# ,"primary pointer page"
|
|
# ,"index root page"
|
|
# ,"total formats"
|
|
# ,"total records"
|
|
# ,"total versions"
|
|
# ,"total fragments"
|
|
# ,"compression ratio"
|
|
# ,"pointer pages"
|
|
# ,"data pages"
|
|
# ,"primary pages"
|
|
# ,"empty pages"
|
|
# ,"blobs"
|
|
# ,"swept pages"
|
|
# ,"full pages"
|
|
# ,"fill distribution"
|
|
# ,"0 - 19%"
|
|
# ,"80 - 99%"
|
|
# ]
|
|
#
|
|
# f = open( f_db_stat_log.name, 'r')
|
|
# lines = f.read().lower()
|
|
# for i in range(len(check_words)):
|
|
# if check_words[i].lower() in lines:
|
|
# print( 'Found in base "sts_" output: ' + check_words[i].lower() )
|
|
# else:
|
|
# print( 'UNEXPECTEDLY NOT found in base "sts_" output: ' + check_words[i].lower() )
|
|
# flush_and_close( f )
|
|
#
|
|
#
|
|
# # Must be EMPTY:
|
|
# with open( f_db_encr_err.name,'r') as f:
|
|
# for line in f:
|
|
# print('UNEXPECTED STS_ENCRYPTION STDERR: '+line.upper())
|
|
#
|
|
#
|
|
# # Encryption statistics should be like this:
|
|
# # ---------------------
|
|
# # Data pages: total NNN, encrypted 0, non-crypted NNN
|
|
# # Index pages: total MMM, encrypted 0, non-crypted MMM
|
|
# # Blob pages: total 0, encrypted 0, non-crypted 0
|
|
# # Generator pages: total PPP, encrypted 0, non-crypted PPP ------------- 31.10.2019 NB: THIS WAS ADDED RECENLTLY
|
|
#
|
|
# enc_pattern=re.compile(".*total[\\s]+[\\d]+,[\\s]+encrypted[\\s]+[\\d]+,[\\s]+non-crypted[\\s]+[\\d]+")
|
|
# with open( f_db_encr_log.name,'r') as f:
|
|
# for line in f:
|
|
# # if enc_pattern.match(line):
|
|
# if 'encrypted' in line:
|
|
# print('Found in "sts_encryption" output: ' + line.lower())
|
|
#
|
|
#
|
|
# # Cleanup:
|
|
# ##########
|
|
#
|
|
# sql_final='''
|
|
# set list on;
|
|
# set count on;
|
|
# select * from att_log; -- this should output: "Records affected: 0" because U01 must ignore DB-level trigger
|
|
# commit;
|
|
# drop user u01;
|
|
# commit;
|
|
# '''
|
|
# runProgram('isql',[dsn,'-user',user_name, '-pas', user_password], sql_final)
|
|
#
|
|
# cleanup( (f_db_stat_log, f_db_stat_err, f_db_encr_log, f_db_encr_err) )
|
|
#
|
|
#---
|