6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 13:33:07 +01:00

Added/Updated functional\syspriv\test_change_header_settings.py: checked on 4.0.1.2692, 5.0.0.489.

This commit is contained in:
zotov 2022-05-19 21:52:17 +03:00
parent fbf8caa8ba
commit 84481a4549

View File

@ -2,216 +2,123 @@
"""
ID: syspriv.change-header-settings
TITLE: Check ability to change some database header attributes by non-sysdba user who
is granted with necessary system privileges
TITLE: Check ability to change some database header attributes by non-sysdba user who is granted with necessary system privileges
DESCRIPTION:
NB: attributes should be changed one at a time, i.e. one fbsvcmgr call should change only ONE atribute.
NB: attributes should be changed one at a time, i.e. one Services API call should change only ONE atribute.
FBTEST: functional.syspriv.change_header_settings
NOTES:
[18.05.2022] pzotov: refactored to be used in firebird-qa suite.
Checked on 4.0.1.2692, 5.0.0.489.
"""
import pytest
from firebird.qa import *
from firebird.driver import DbSpaceReservation
from firebird.driver.types import DatabaseError
init_script = """
set wng off;
set bail on;
set list on;
set count on;
substitutions = [('[ \t]+', ' ')]
db = db_factory()
tmp_user = user_factory('db', name='tmp_syspriv_user', password='123')
tmp_role = role_factory('db', name='tmp_role_for_change_db_header')
create or alter view v_check as
select
current_user as who_ami
,r.rdb$role_name
,rdb$role_in_use(r.rdb$role_name) as RDB_ROLE_IN_USE
,r.rdb$system_privileges
from mon$database m cross join rdb$roles r;
commit;
act = python_act('db', substitutions = substitutions)
create or alter user u01 password '123' revoke admin role;
revoke all on all from u01;
create or alter trigger trg_connect active on connect as
begin
end;
commit;
recreate table att_log (
att_id int,
att_name varchar(255),
att_user varchar(255),
att_addr varchar(255),
att_prot varchar(255),
att_dts timestamp default 'now'
);
commit;
grant select on v_check to public;
grant all on att_log to public;
commit;
set term ^;
execute block as
begin
execute statement 'drop role role_for_change_header_settings';
when any do begin end
end
^
create or alter trigger trg_connect active on connect as
begin
if ( upper(current_user) <> upper('SYSDBA') ) then
in autonomous transaction do
insert into att_log(att_id, att_name, att_user, att_prot)
select
mon$attachment_id
,mon$attachment_name
,mon$user
,mon$remote_protocol
from mon$attachments
where mon$user = current_user
;
end
^
set term ;^
commit;
-- NB: Privilege 'IGNORE_DB_TRIGGERS' is needed when we return database to ONLINE
-- and this DB has DB-level trigger.
create role role_for_change_header_settings
set system privileges to CHANGE_HEADER_SETTINGS, USE_GFIX_UTILITY, IGNORE_DB_TRIGGERS;
commit;
grant default role_for_change_header_settings to user u01;
commit;
"""
db = db_factory(init=init_script)
act = python_act('db')
expected_stdout = """
Records affected: 0
WHO_AMI U01
RDB$ROLE_NAME RDB$ADMIN
RDB_ROLE_IN_USE <false>
RDB$SYSTEM_PRIVILEGES FFFFFFFFFFFFFFFF
WHO_AMI U01
RDB$ROLE_NAME ROLE_FOR_CHANGE_HEADER_SETTINGS
RDB_ROLE_IN_USE <true>
RDB$SYSTEM_PRIVILEGES 00E0000000000000
Records affected: 2
expected_stdout_isql = """
MON$SWEEP_INTERVAL 54321
MON$SQL_DIALECT 1
MON$FORCED_WRITES 0
Records affected: 1
MON$RESERVE_SPACE 0
"""
@pytest.mark.skip('FIXME: Not IMPLEMENTED')
@pytest.mark.version('>=4.0')
def test_1(act: Action):
pytest.fail("Not IMPLEMENTED")
# test_script_1
#---
#
# import os
# import subprocess
#
# db_file = db_conn.database_name
# db_conn.close()
#
# #--------------------------------------------
#
# def flush_and_close( file_handle ):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
# #--------------------------------------------
#
# def cleanup( f_names_list ):
# global os
# for f in f_names_list:
# if type(f) == file:
# del_name = f.name
# elif type(f) == str:
# del_name = f
# else:
# print('Unrecognized type of element:', f, ' - can not be treated as file.')
# del_name = None
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
# #--------------------------------------------
#
#
# # Check that current non-sysdba user:
# # 1) can SKIP db-level trigger firing:
# # 2) IS granted with role 'role_for_change_header_settings':
#
# runProgram('isql',[dsn,'-nod','-user','U01', '-pas', '123'], 'set list on; set count on; select * from att_log; select * from v_check;')
#
# f_hdr_props_log = open( os.path.join(context['temp_directory'],'tmp_syspriv_hdr_props.log'), 'w')
# subprocess.call( [context['fbsvcmgr_path'],"localhost:service_mgr",
# "user","U01", "password", "123",
# "action_properties",
# "dbname", db_file,
# "prp_sweep_interval", "54321",
# ],
# stdout=f_hdr_props_log,
# stderr=subprocess.STDOUT
# )
#
# subprocess.call( [context['fbsvcmgr_path'],"localhost:service_mgr",
# "user","U01", "password", "123",
# "action_properties",
# "dbname", db_file,
# "prp_set_sql_dialect", "1",
# ],
# stdout=f_hdr_props_log,
# stderr=subprocess.STDOUT
# )
#
# subprocess.call( [context['fbsvcmgr_path'],"localhost:service_mgr",
# "user","U01", "password", "123",
# "action_properties",
# "dbname", db_file,
# "prp_write_mode", "prp_wm_async"
# ],
# stdout=f_hdr_props_log,
# stderr=subprocess.STDOUT
# )
#
# flush_and_close( f_hdr_props_log )
#
# # Checks
# ########
#
# sql_chk='''
# set list on;
# set count on;
# select m.mon$sweep_interval, m.mon$sql_dialect, m.mon$forced_writes from mon$database m;
# '''
# runProgram('isql',[dsn,'-nod','-user','U01', '-pas', '123'], sql_chk)
#
#
# # Must be EMPTY:
# ################
# with open( f_hdr_props_log.name,'r') as f:
# for line in f:
# print('DB SHUTDOWN LOG: '+line.upper())
#
#
# # Cleanup:
# ##########
# cleanup( (f_hdr_props_log,) )
#
#---
@pytest.mark.version('>=4.0')
def test_1(act: Action, tmp_user: User, tmp_role:Role, capsys):
init_script = \
f'''
set wng off;
set bail on;
set list on;
set count on;
create or alter view v_check as
select
current_user as who_ami
,r.rdb$role_name
,rdb$role_in_use(r.rdb$role_name) as RDB_ROLE_IN_USE
,r.rdb$system_privileges
from mon$database m cross join rdb$roles r;
commit;
alter user {tmp_user.name} password '123' revoke admin role;
revoke all on all from {tmp_user.name};
create or alter trigger trg_connect active on connect as
begin
end;
commit;
recreate table att_log (
att_id int,
att_name varchar(255),
att_user varchar(255),
att_addr varchar(255),
att_prot varchar(255),
att_dts timestamp default 'now'
);
commit;
grant select on v_check to public;
grant all on att_log to public;
commit;
set term ^;
create or alter trigger trg_connect active on connect as
begin
if ( upper(current_user) <> upper('SYSDBA') ) then
in autonomous transaction do
insert into att_log(att_id, att_name, att_user, att_prot)
select
mon$attachment_id
,mon$attachment_name
,mon$user
,mon$remote_protocol
from mon$attachments
where mon$user = current_user
;
end
^
set term ;^
commit;
-- NB: Privilege 'IGNORE_DB_TRIGGERS' is needed when we return database to ONLINE
-- and this DB has DB-level trigger.
alter role {tmp_role.name}
set system privileges to CHANGE_HEADER_SETTINGS, USE_GFIX_UTILITY, IGNORE_DB_TRIGGERS;
commit;
grant default {tmp_role.name} to user {tmp_user.name};
commit;
'''
act.isql(switches=['-q'], input=init_script)
with act.connect_server(user = tmp_user.name, password = tmp_user.password, role = tmp_role.name) as srv_nondba:
# All subsequent actions must not issue any output:
try:
# change SWEEP interval:
srv_nondba.database.set_sweep_interval(database=act.db.db_path, interval = 54321)
# set SQL dialect to 1:
srv_nondba.database.set_sql_dialect(database=act.db.db_path, dialect = 1)
# change default DB cache size:
srv_nondba.database.set_space_reservation(database=act.db.db_path, mode = DbSpaceReservation.USE_FULL)
except DatabaseError as e:
print(e.__str__())
sql_check = "set list on; select m.mon$sweep_interval, m.mon$sql_dialect, m.mon$reserve_space from mon$database m;"
act.isql(switches=['-q'], input=sql_check)
act.expected_stdout = expected_stdout_isql
assert act.clean_stdout == act.clean_expected_stdout