mirror of
https://github.com/FirebirdSQL/firebird-qa.git
synced 2025-01-23 22:13:05 +01:00
217 lines
6.5 KiB
Python
217 lines
6.5 KiB
Python
#coding:utf-8
|
|
|
|
"""
|
|
ID: syspriv.change-shutdown-mode
|
|
TITLE: Check ability to change database shutdown mode by non-sysdba user who is
|
|
granted with necessary system privileges
|
|
DESCRIPTION:
|
|
FBTEST: functional.syspriv.change_shutdown_mode
|
|
"""
|
|
|
|
import pytest
|
|
from firebird.qa import *
|
|
|
|
init_script = """
|
|
set wng off;
|
|
set bail on;
|
|
set list on;
|
|
set count on;
|
|
|
|
create or alter view v_check as
|
|
select
|
|
current_user as who_ami
|
|
,r.rdb$role_name
|
|
,rdb$role_in_use(r.rdb$role_name) as RDB_ROLE_IN_USE
|
|
,r.rdb$system_privileges
|
|
from mon$database m cross join rdb$roles r;
|
|
commit;
|
|
|
|
create or alter user u01 password '123' revoke admin role;
|
|
revoke all on all from u01;
|
|
|
|
create or alter trigger trg_connect active on connect as
|
|
begin
|
|
end;
|
|
commit;
|
|
|
|
recreate table att_log (
|
|
att_id int,
|
|
att_name varchar(255),
|
|
att_user varchar(255),
|
|
att_addr varchar(255),
|
|
att_prot varchar(255),
|
|
att_dts timestamp default 'now'
|
|
);
|
|
|
|
commit;
|
|
|
|
grant select on v_check to public;
|
|
grant all on att_log to public;
|
|
commit;
|
|
|
|
set term ^;
|
|
execute block as
|
|
begin
|
|
execute statement 'drop role role_for_change_shutdown_mode';
|
|
when any do begin end
|
|
end
|
|
^
|
|
create or alter trigger trg_connect active on connect as
|
|
begin
|
|
if ( upper(current_user) <> upper('SYSDBA') ) then
|
|
in autonomous transaction do
|
|
insert into att_log(att_id, att_name, att_user, att_prot)
|
|
select
|
|
mon$attachment_id
|
|
,mon$attachment_name
|
|
,mon$user
|
|
,mon$remote_protocol
|
|
from mon$attachments
|
|
where mon$user = current_user
|
|
;
|
|
end
|
|
^
|
|
set term ;^
|
|
commit;
|
|
|
|
-- Shutdown DB and bring online
|
|
-- Add/change/delete non-system records in RDB$TYPES.
|
|
-- NB: Privilege 'IGNORE_DB_TRIGGERS' is needed when we return database to ONLINE
|
|
-- and this DB has DB-level trigger.
|
|
create role role_for_change_shutdown_mode
|
|
set system privileges to CHANGE_SHUTDOWN_MODE, USE_GFIX_UTILITY, IGNORE_DB_TRIGGERS;
|
|
commit;
|
|
grant default role_for_change_shutdown_mode to user u01;
|
|
commit;
|
|
"""
|
|
|
|
db = db_factory(init=init_script)
|
|
|
|
act = python_act('db')
|
|
|
|
expected_stdout = """
|
|
Records affected: 0
|
|
WHO_AMI U01
|
|
RDB$ROLE_NAME RDB$ADMIN
|
|
RDB_ROLE_IN_USE <false>
|
|
RDB$SYSTEM_PRIVILEGES FFFFFFFFFFFFFFFF
|
|
WHO_AMI U01
|
|
RDB$ROLE_NAME ROLE_FOR_CHANGE_SHUTDOWN_MODE
|
|
RDB_ROLE_IN_USE <true>
|
|
RDB$SYSTEM_PRIVILEGES 2060000000000000
|
|
Records affected: 2
|
|
DB HEADER: ATTRIBUTES FORCE WRITE, FULL SHUTDOWN
|
|
"""
|
|
|
|
@pytest.mark.skip('FIXME: Not IMPLEMENTED')
|
|
@pytest.mark.version('>=4.0')
|
|
def test_1(act: Action):
|
|
pytest.fail("Not IMPLEMENTED")
|
|
|
|
# test_script_1
|
|
#---
|
|
#
|
|
# import os
|
|
# import subprocess
|
|
#
|
|
# db_file = db_conn.database_name
|
|
# db_conn.close()
|
|
#
|
|
# #--------------------------------------------
|
|
#
|
|
# def flush_and_close( file_handle ):
|
|
# # https://docs.python.org/2/library/os.html#os.fsync
|
|
# # If you're starting with a Python file object f,
|
|
# # first do f.flush(), and
|
|
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
|
|
# global os
|
|
#
|
|
# file_handle.flush()
|
|
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
|
|
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
|
|
# os.fsync(file_handle.fileno())
|
|
# file_handle.close()
|
|
#
|
|
# #--------------------------------------------
|
|
#
|
|
# def cleanup( f_names_list ):
|
|
# global os
|
|
# for f in f_names_list:
|
|
# if type(f) == file:
|
|
# del_name = f.name
|
|
# elif type(f) == str:
|
|
# del_name = f
|
|
# else:
|
|
# print('Unrecognized type of element:', f, ' - can not be treated as file.')
|
|
# del_name = None
|
|
#
|
|
# if del_name and os.path.isfile( del_name ):
|
|
# os.remove( del_name )
|
|
#
|
|
# #--------------------------------------------
|
|
#
|
|
#
|
|
# # Check that current non-sysdba user:
|
|
# # 1) can SKIP db-level trigger firing:
|
|
# # 2) IS granted with role 'role_for_change_shutdown_mode':
|
|
#
|
|
# runProgram('isql',[dsn,'-nod','-user','U01', '-pas', '123'], 'set list on; set count on; select * from att_log; select * from v_check;')
|
|
#
|
|
# f_shutdown_log = open( os.path.join(context['temp_directory'],'tmp_syspriv_dbshut.log'), 'w')
|
|
# subprocess.call( [context['fbsvcmgr_path'],"localhost:service_mgr",
|
|
# "user","U01", "password", "123",
|
|
# "action_properties",
|
|
# "dbname", db_file,
|
|
# "prp_shutdown_mode", "prp_sm_full", "prp_force_shutdown", "0"
|
|
# ],
|
|
# stdout=f_shutdown_log,
|
|
# stderr=subprocess.STDOUT
|
|
# )
|
|
# flush_and_close( f_shutdown_log )
|
|
#
|
|
# f_dbheader_log = open( os.path.join(context['temp_directory'],'tmp_syspriv_dbhead.log'), 'w')
|
|
# subprocess.call([context['fbsvcmgr_path'], "localhost:service_mgr",
|
|
# "user", "U01", "password" , "123",
|
|
# "action_db_stats", "sts_hdr_pages",
|
|
# "dbname", db_file
|
|
# ],
|
|
# stdout=f_dbheader_log,
|
|
# stderr=subprocess.STDOUT
|
|
# )
|
|
# flush_and_close( f_dbheader_log )
|
|
#
|
|
# f_ret2online_log = open( os.path.join(context['temp_directory'],'tmp_syspriv_dbonline.log'), 'w')
|
|
# subprocess.call( [context['fbsvcmgr_path'], "localhost:service_mgr",
|
|
# "user","U01", "password", "123",
|
|
# "action_properties", "prp_db_online",
|
|
# "dbname", db_file,
|
|
# ],
|
|
# stdout = f_ret2online_log,
|
|
# stderr = subprocess.STDOUT
|
|
# )
|
|
# flush_and_close( f_ret2online_log )
|
|
#
|
|
# # Must be EMPTY:
|
|
# with open( f_shutdown_log.name,'r') as f:
|
|
# for line in f:
|
|
# print('DB SHUTDOWN LOG: '+line.upper())
|
|
#
|
|
#
|
|
# # Must contain: "Attributes force write, full shutdown"
|
|
# with open( f_dbheader_log.name,'r') as f:
|
|
# for line in f:
|
|
# if 'Attributes' in line:
|
|
# print('DB HEADER: ' + ' '.join(line.split()).upper() )
|
|
#
|
|
#
|
|
# # Must be EMPTY:
|
|
# with open( f_ret2online_log.name,'r') as f:
|
|
# for line in f:
|
|
# print('DB ONLINE LOG: '+line.upper())
|
|
#
|
|
#
|
|
# # Cleanup:
|
|
# ##########
|
|
# cleanup( (f_shutdown_log, f_dbheader_log, f_ret2online_log) )
|
|
#---
|