6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-23 14:03:06 +01:00
firebird-qa/tests/functional/syspriv/test_access_shutdown_database.py

314 lines
10 KiB
Python

#coding:utf-8
"""
ID: syspriv.access-shutdown-database
TITLE: Check ability to access to database in shutdown single mode as non-sysdba
DESCRIPTION:
We create role with granting system privilege ACCESS_SHUTDOWN_DATABASE to it.
Then we create user and make this role as DEFAULT to him.
Then we check that user U01:
1. can NOT CHANGE database attribute, i.e. can NOT shutdown or bring online database;
2. CAN make attachment to DB in 'shutdown single maintenace' mode and select smth from there.
Also, we check that while U01 is connected, NO other attachment is possible.
This is done by trying to make ES EDS as SYSDBA - this should fail with "335544528 : database shutdown".
Checked on 4.0.0.267. See also letter from Alex 23.06.2016 11:46.
FBTEST: functional.syspriv.access_shutdown_database
"""
import pytest
from firebird.qa import *
substitutions = [('.* access to database.*', 'access to database'), ('.* -Some database.*', ''),
('335544528 : database.* shutdown', '335544528 : database shutdown'),
('Data source : Firebird::localhost:.*', 'Data source : Firebird::localhost:'),
('-At block line: [\\d]+, col: [\\d]+', '-At block line')]
init_script = """
set wng off;
create or alter view v_check as
select
current_user as who_ami
,r.rdb$role_name
,rdb$role_in_use(r.rdb$role_name) as RDB_ROLE_IN_USE
,r.rdb$system_privileges
from mon$database m cross join rdb$roles r;
commit;
create or alter user u01 password '123' revoke admin role;
revoke all on all from u01;
create or alter trigger trg_connect active on connect as
begin
end;
commit;
recreate table att_log (
att_id int,
att_name varchar(255),
att_user varchar(255),
att_prot varchar(255)
);
commit;
grant select on v_check to public;
grant all on att_log to public;
commit;
set term ^;
execute block as
begin
execute statement 'drop role role_for_access_shutdown_db';
when any do begin end
end
^
create or alter trigger trg_connect active on connect as
begin
if ( upper(current_user) <> upper('SYSDBA') ) then
in autonomous transaction do
insert into att_log(att_name, att_user, att_prot)
select
mon$attachment_name
,mon$user
,left(mon$remote_protocol,3)
from mon$attachments
where mon$user = current_user
;
end
^
set term ;^
commit;
create role role_for_access_shutdown_db
set system privileges to ACCESS_SHUTDOWN_DATABASE; -- CHANGE_SHUTDOWN_MODE, USE_GFIX_UTILITY, IGNORE_DB_TRIGGERS;
commit;
grant default role_for_access_shutdown_db to user u01;
commit;
"""
db = db_factory(init=init_script)
act = python_act('db', substitutions=substitutions)
expected_stderr = """
Statement failed, SQLSTATE = 42000
Execute statement error at attach :
335544528 : database shutdown
Data source : Firebird::localhost:
-At block line: 4, col: 9
"""
expected_stdout = """
WHO_AMI U01
RDB$ROLE_NAME RDB$ADMIN
RDB_ROLE_IN_USE <false>
RDB$SYSTEM_PRIVILEGES FFFFFFFFFFFFFFFF
WHO_AMI U01
RDB$ROLE_NAME ROLE_FOR_ACCESS_SHUTDOWN_DB
RDB_ROLE_IN_USE <true>
RDB$SYSTEM_PRIVILEGES 0001000000000000
Records affected: 2
MON$SHUTDOWN_MODE 2
Records affected: 1
ATT_USER U01
ATT_PROT TCP
Records affected: 1
Records affected: 0
tmp_dbshut_u01.err : no permission for shutdown access to database
tmp_online_u01.err : no permission for bring online access to database
"""
@pytest.mark.skip('FIXME: Not IMPLEMENTED')
@pytest.mark.version('>=4.0')
def test_1(act: Action):
pytest.fail("Not IMPLEMENTED")
# test_script_1
#---
#
# import os
# import subprocess
# from subprocess import Popen
#
# db_file=db_conn.database_name
# db_conn.close()
#
# #--------------------------------------------
#
# def flush_and_close( file_handle ):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
# #--------------------------------------------
#
# def cleanup( f_names_list ):
# global os
# for f in f_names_list:
# if type(f) == file:
# del_name = f.name
# elif type(f) == str:
# del_name = f
# else:
# print('Unrecognized type of element:', f, ' - can not be treated as file.')
# del_name = None
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
# #--------------------------------------------
#
# # Change DB state to 'shutdown single maintenance':
# ##################################################
#
# f_dbshut_u01_log=open( os.path.join(context['temp_directory'],'tmp_dbshut_u01.log'), 'w')
# f_dbshut_u01_err=open( os.path.join(context['temp_directory'],'tmp_dbshut_u01.err'), 'w')
#
# # Must FAIL when we do this as non-sysdba:
#
# subprocess.call([context['fbsvcmgr_path'], "localhost:service_mgr",
# "user", "U01", "password", "123",
# "action_properties",
# "dbname", db_file,
# "prp_shutdown_mode", "prp_sm_single",
# "prp_force_shutdown", "0"
# ],
# stdout=f_dbshut_u01_log,
# stderr=f_dbshut_u01_err
# )
#
# flush_and_close( f_dbshut_u01_log )
# flush_and_close( f_dbshut_u01_err )
#
# #-------------------------------------------------
# # Must PASS because 2nd time se do this as SYSDBA:
#
# f_dbshut_sys_log=open( os.path.join(context['temp_directory'],'tmp_dbshut_sys.log'), 'w')
# f_dbshut_sys_err=open( os.path.join(context['temp_directory'],'tmp_dbshut_sys.err'), 'w')
#
# subprocess.call([context['fbsvcmgr_path'], "localhost:service_mgr",
# "user", user_name, "password", user_password,
# "action_properties",
# "dbname", db_file,
# "prp_shutdown_mode", "prp_sm_single",
# "prp_force_shutdown", "0"
# ],
# stdout=f_dbshut_sys_log,
# stderr=f_dbshut_sys_err
# )
# flush_and_close( f_dbshut_sys_log )
# flush_and_close( f_dbshut_sys_err )
#
# #---------------------------------------------------------------------------------------------------
#
# sql_chk='''
# set list on;
# set count on;
# commit;
# select v.* from v_check v;
# select m.mon$shutdown_mode from mon$database m;
# select a.att_user, att_prot from att_log a;
# set term ^;
# execute block returns( who_else_here rdb$user ) as
# declare another_user varchar(31);
# begin
# execute statement 'select current_user from rdb$database'
# on external 'localhost:' || rdb$get_context('SYSTEM','DB_NAME')
# as user 'SYSDBA' password 'masterkey'
# into who_else_here;
#
# suspend;
# end
# ^
# set term ;^
# '''
#
# # Check ability to connect as NON sysdba to database that is in 'shutdown single' mode (mon$shutdown_mode=2)
# ##########################
# runProgram('isql',[dsn, '-user','U01', '-pas', '123'], sql_chk)
#
#
# # Return database to online state:
# ##################################
#
# f_online_u01_log=open( os.path.join(context['temp_directory'],'tmp_online_u01.log'), 'w')
# f_online_u01_err=open( os.path.join(context['temp_directory'],'tmp_online_u01.err'), 'w')
#
# # Must FAIL when we do this as non-sysdba:
#
# subprocess.call([context['fbsvcmgr_path'], "localhost:service_mgr",
# "user", "U01", "password", "123",
# "action_properties",
# "dbname", db_file,
# "prp_online_mode", "prp_sm_normal"
# ],
# stdout=f_online_u01_log,
# stderr=f_online_u01_err
# )
# flush_and_close( f_online_u01_log )
# flush_and_close( f_online_u01_err )
#
# #-------------------------------------------------
# # Must PASS because 2nd time se do this as SYSDBA:
#
# f_online_sys_log=open( os.path.join(context['temp_directory'],'tmp_online_sys.log'), 'w')
# f_online_sys_err=open( os.path.join(context['temp_directory'],'tmp_online_sys.err'), 'w')
#
# subprocess.call([context['fbsvcmgr_path'], "localhost:service_mgr",
# "user", user_name, "password", user_password,
# "action_properties",
# "dbname", db_file,
# "prp_online_mode", "prp_sm_normal"
# ],
# stdout=f_online_sys_log,
# stderr=f_online_sys_err
# )
#
# flush_and_close( f_online_sys_log )
# flush_and_close( f_online_sys_err )
#
# # ------------------------------------------------------------------------
#
# # Check:
# ########
#
# f_list=(
# f_dbshut_u01_log,
# f_dbshut_u01_err,
# f_dbshut_sys_log,
# f_dbshut_sys_err,
# f_online_u01_log,
# f_online_u01_err,
# f_online_sys_log,
# f_online_sys_err
# )
#
# # Only f_dbshut_u01_err and f_online_u01_err must contain single message.
# # All other files from f_list must be EMPTY:
#
# for i in range(len(f_list)):
# with open( f_list[i].name,'r') as f:
# for line in f:
# print( os.path.basename( f_list[i].name ) + ' : ' + line)
# os.remove(f_list[i].name)
#
# # cleanup: drop user 'U01'
# ##########
# runProgram('isql',[dsn, '-user', user_name, '-pas', user_password], 'drop user u01; commit;')
#
#---