6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-23 05:53:06 +01:00
firebird-qa/tests/functional/syspriv/test_use_gstat_utility.py

312 lines
10 KiB
Python

#coding:utf-8
#
# id: functional.syspriv.use_gstat_utility
# title: Check ability to obtain database statistics.
# decription:
# We create user and grant system privileges USE_GSTAT_UTILITY and IGNORE_DB_TRIGGERS to him.
# Then we check that this user can extract DB statistics in TWO ways:
# 1) common data except encryption info (it is called here 'base "sts_" output')
# 2) only encryption info (I don't know why "sts_encryption" can not be used together with other switches...)
# Both these actions should not produce any error.
# Also, logs of them should contain all needed 'check words' and patterns - and we check this.
# Finally, we ensure that when user U01 gathered DB statistics then db-level trigger did NOT fire.
#
# Checked on 4.0.0.267.
# 31.10.2019: added check for generator pages in encryption block.
# Checked on:
# 4.0.0.1635 SS: 2.660s.
# 4.0.0.1633 CS: 3.164s.
#
# tracker_id:
# min_versions: ['4.0.0']
# versions: 4.0
# qmid: None
import pytest
from firebird.qa import db_factory, python_act, Action
# version: 4.0
# resources: None
substitutions_1 = [('[ \t]+', ' '), ('.* data pages: total [\\d]+[,]{0,1} encrypted [\\d]+[,]{0,1} non-crypted [\\d]+', 'data pages total encrypted non-crypted'), ('.* index pages: total [\\d]+[,]{0,1} encrypted [\\d]+[,]{0,1} non-crypted [\\d]+', 'index pages total encrypted non-crypted'), ('.* blob pages: total [\\d]+[,]{0,1} encrypted [\\d]+[,]{0,1} non-crypted [\\d]+', 'blob pages total encrypted non-crypted'), ('.* generator pages: total [\\d]+[,]{0,1} encrypted [\\d]+[,]{0,1} non-crypted [\\d]+', 'generator pages total encrypted non-crypted')]
init_script_1 = """
set wng off;
set bail on;
set list on;
set count on;
create or alter view v_check as
select
mon$database_name
,current_user as who_ami
,r.rdb$role_name
,rdb$role_in_use(r.rdb$role_name) as RDB_ROLE_IN_USE
,r.rdb$system_privileges
from mon$database m cross join rdb$roles r;
commit;
create or alter user u01 password '123' revoke admin role;
revoke all on all from u01;
commit;
create or alter trigger trg_connect active on connect as
begin
end;
commit;
recreate table att_log (
att_user varchar(255),
att_prot varchar(255)
);
commit;
recreate table test(s char(1000) unique using index test_s_unq);
commit;
insert into test select rpad('', 1000, uuid_to_char(gen_uuid()) ) from rdb$types;
commit;
grant select on v_check to public;
grant select on att_log to public;
--------------------------------- [ !! ] -- do NOT: grant select on test to u01; -- [ !! ]
commit;
set term ^;
execute block as
begin
execute statement 'drop role role_for_use_gstat_utility';
when any do begin end
end
^
create or alter trigger trg_connect active on connect as
begin
if ( upper(current_user) <> upper('SYSDBA') ) then
in autonomous transaction do
insert into att_log( att_user, att_prot )
select
mon$user
,mon$remote_protocol
from mon$attachments
where mon$user = current_user
;
end
^
set term ;^
commit;
-- Ability to get database statistics.
-- NB: 'IGNORE_DB_TRIGGERS' - required for get full db statistics, otherwise:
-- Unable to perform operation: system privilege IGNORE_DB_TRIGGERS is missing
create role role_for_use_gstat_utility
set system privileges to USE_GSTAT_UTILITY, IGNORE_DB_TRIGGERS;
commit;
grant default role_for_use_gstat_utility to user u01;
commit;
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
# test_script_1
#---
#
# import os
# import subprocess
# import re
#
# db_file=db_conn.database_name
# db_conn.close()
#
# #--------------------------------------------
#
# def flush_and_close( file_handle ):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
# #--------------------------------------------
#
# def cleanup( f_names_list ):
# global os
# for f in f_names_list:
# if type(f) == file:
# del_name = f.name
# elif type(f) == str:
# del_name = f
# else:
# print('Unrecognized type of element:', f, ' - can not be treated as file.')
# del_name = None
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
# #--------------------------------------------
#
# f_db_stat_log=open( os.path.join(context['temp_directory'],'tmp_dbstat.log'), 'w')
# f_db_stat_err=open( os.path.join(context['temp_directory'],'tmp_dbstat.err'), 'w')
# subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr",
# "user","U01","password","123",
# "action_db_stats",
# "dbname", db_file,
# "sts_record_versions",
# "sts_data_pages",
# "sts_idx_pages",
# "sts_sys_relations"
# ],
# stdout=f_db_stat_log,
# stderr=f_db_stat_err
# )
#
# flush_and_close( f_db_stat_log )
# flush_and_close( f_db_stat_err )
#
# # Separate call for get encryption statistics:
#
# f_db_encr_log=open( os.path.join(context['temp_directory'],'tmp_dbencr.log'), 'w')
# f_db_encr_err=open( os.path.join(context['temp_directory'],'tmp_dbencr.err'), 'w')
#
# subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr",
# "user","U01","password","123",
# "action_db_stats",
# "dbname", db_file,
# "sts_encryption"
# ],
# stdout=f_db_encr_log,
# stderr=f_db_encr_err
# )
#
# flush_and_close( f_db_encr_log )
# flush_and_close( f_db_encr_err )
#
# #-----------------------
#
#
# # Check content of logs:
# #######
#
# # Must be EMPTY:
# with open( f_db_stat_err.name,'r') as f:
# for line in f:
# print('UNEXPECTED GSTAT STDERR: '+line.upper())
#
# # Pointer pages: 1, data page slots: 2
# # Data pages: 2, average fill: 8%
# # Primary pages: 1, secondary pages: 1, swept pages: 0
# # Empty pages: 0, full pages: 0
# # Blobs: 9, total length: 160, blob pages: 0
#
# # Must contain:
# check_words=[
# "rdb$database"
# ,"rdb$index"
# ,"primary pointer page"
# ,"index root page"
# ,"total formats"
# ,"total records"
# ,"total versions"
# ,"total fragments"
# ,"compression ratio"
# ,"pointer pages"
# ,"data pages"
# ,"primary pages"
# ,"empty pages"
# ,"blobs"
# ,"swept pages"
# ,"full pages"
# ,"fill distribution"
# ,"0 - 19%"
# ,"80 - 99%"
# ]
#
# f = open( f_db_stat_log.name, 'r')
# lines = f.read().lower()
# for i in range(len(check_words)):
# if check_words[i].lower() in lines:
# print( 'Found in base "sts_" output: ' + check_words[i].lower() )
# else:
# print( 'UNEXPECTEDLY NOT found in base "sts_" output: ' + check_words[i].lower() )
# flush_and_close( f )
#
#
# # Must be EMPTY:
# with open( f_db_encr_err.name,'r') as f:
# for line in f:
# print('UNEXPECTED STS_ENCRYPTION STDERR: '+line.upper())
#
#
# # Encryption statistics should be like this:
# # ---------------------
# # Data pages: total NNN, encrypted 0, non-crypted NNN
# # Index pages: total MMM, encrypted 0, non-crypted MMM
# # Blob pages: total 0, encrypted 0, non-crypted 0
# # Generator pages: total PPP, encrypted 0, non-crypted PPP ------------- 31.10.2019 NB: THIS WAS ADDED RECENLTLY
#
# enc_pattern=re.compile(".*total[\\s]+[\\d]+,[\\s]+encrypted[\\s]+[\\d]+,[\\s]+non-crypted[\\s]+[\\d]+")
# with open( f_db_encr_log.name,'r') as f:
# for line in f:
# # if enc_pattern.match(line):
# if 'encrypted' in line:
# print('Found in "sts_encryption" output: ' + line.lower())
#
#
# # Cleanup:
# ##########
#
# sql_final='''
# set list on;
# set count on;
# select * from att_log; -- this should output: "Records affected: 0" because U01 must ignore DB-level trigger
# commit;
# drop user u01;
# commit;
# '''
# runProgram('isql',[dsn,'-user',user_name, '-pas', user_password], sql_final)
#
# cleanup( (f_db_stat_log, f_db_stat_err, f_db_encr_log, f_db_encr_err) )
#
#---
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
Found in base "sts_" output: rdb$database
Found in base "sts_" output: rdb$index
Found in base "sts_" output: primary pointer page
Found in base "sts_" output: index root page
Found in base "sts_" output: total formats
Found in base "sts_" output: total records
Found in base "sts_" output: total versions
Found in base "sts_" output: total fragments
Found in base "sts_" output: compression ratio
Found in base "sts_" output: pointer pages
Found in base "sts_" output: data pages
Found in base "sts_" output: primary pages
Found in base "sts_" output: empty pages
Found in base "sts_" output: blobs
Found in base "sts_" output: swept pages
Found in base "sts_" output: full pages
Found in base "sts_" output: fill distribution
Found in base "sts_" output: 0 - 19%
Found in base "sts_" output: 80 - 99%
Found in "sts_encryption" output: data pages: total 131, encrypted 0, non-crypted 131
Found in "sts_encryption" output: index pages: total 150, encrypted 0, non-crypted 150
Found in "sts_encryption" output: blob pages: total 0, encrypted 0, non-crypted 0
Found in "sts_encryption" output: generator pages: total 1, encrypted 0, non-crypted 1
Records affected: 0
"""
@pytest.mark.version('>=4.0')
def test_1(act_1: Action):
pytest.fail("Test not IMPLEMENTED")