6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 21:43:06 +01:00
firebird-qa/tests/bugs/core_4524_test.py

283 lines
12 KiB
Python
Raw Normal View History

2021-04-26 20:07:00 +02:00
#coding:utf-8
2022-01-23 20:41:55 +01:00
"""
ID: issue-4842
ISSUE: 4842
TITLE: New gbak option to enable encryption during restore
DESCRIPTION:
Part of this test was copied from core_6071.fbt.
We create several generators in the test DB and get number of generators page using query to RDB$PAGES (page_type = 9).
Also we get page_size and using these data we can obtain binary content of generatord page.
We check that for *all* created sequences we can obtain their names and values if read DB file as binary file
(it must be possible until DB will not encrypted).
2022-01-23 20:41:55 +01:00
Then we encrypt DB and wait until encryption process will complete.
2022-01-23 20:41:55 +01:00
After this we:
* Make backup of this temp DB, using gbak utility and '-KEYHOLDER <name_of_key_holder>' command switch.
* Make restore from just created backup.
* Make validation of just restored database by issuing command "gfix -v -full ..."
(i.e. validate both data and metadata rather than online val which can check user data only).
* Open restored DB as binary file and attempt to read again generators names - this must fail, their names must be encrypted.
* Make connect to this DB and check that command 'SHOW SEQU' show all generatord and their values.
2022-01-23 20:41:55 +01:00
JIRA: CORE-4524
FBTEST: bugs.core_4524
NOTES:
[21.09.2022] pzotov
Test reads settings that are COMMON for all encryption-related tests and stored in act.files_dir/test_config.ini.
QA-plugin prepares this by defining dictionary with name QA_GLOBALS which reads settings via ConfigParser mechanism.
Checked on Linux and Windows: 3.0.8.33535 (SS/CS), 4.0.1.2692 (SS/CS)
2022-01-23 20:41:55 +01:00
"""
import os
import binascii
import re
import locale
import datetime as py_dt
import time
from pathlib import Path
from firebird.driver import DatabaseError
2021-04-26 20:07:00 +02:00
import pytest
2022-01-23 20:41:55 +01:00
from firebird.qa import *
2021-04-26 20:07:00 +02:00
db_to_be_encrypted = db_factory()
db_encrypt_restore = db_factory(filename = 'tmp_core_4524.restored.fdb')
tmp_fbk = temp_file('tmp_core_4524.encrypted.fbk')
act_src = python_act('db_to_be_encrypted')
act_res = python_act('db_encrypt_restore')
# QA_GLOBALS -- dict, is defined in qa/plugin.py, obtain settings
# from act_src.files_dir/'test_config.ini':
enc_settings = QA_GLOBALS['encryption']
# ACHTUNG: this must be carefully tuned on every new host:
#
MAX_WAITING_ENCR_FINISH = int(enc_settings['MAX_WAIT_FOR_ENCR_FINISH_WIN' if os.name == 'nt' else 'MAX_WAIT_FOR_ENCR_FINISH_NIX'])
assert MAX_WAITING_ENCR_FINISH > 0
ENCRYPTION_PLUGIN = enc_settings['encryption_plugin'] # fbSampleDbCrypt
ENCRYPTION_HOLDER = enc_settings['encryption_holder'] # fbSampleKeyHolder
ENCRYPTION_KEY = enc_settings['encryption_key'] # Red
SUCCESS_MSG = 'All sequences FOUND.'
def check_page_for_readable_values(dbname, gen_page_number, pg_size, check_sequence_values, is_encrypted, msg_prefix = ''):
db_handle = open( dbname, "rb")
db_handle.seek( gen_page_number * pg_size )
page_content = db_handle.read( pg_size )
db_handle.close()
page_as_hex=binascii.hexlify( page_content )
# Iterate for each sequence value:
not_found_lst = []
any_found_lst = []
for n in check_sequence_values:
# Get HEX representation of digital value.
# NOTE: format( 830624, 'x') is 'caca0' contains five (odd number!) characters.
hex_string = format(abs(n),'x')
# Here we 'pad' hex representation to EVEN number of digits in it,
# otherwise binascii.hexlify fails with "Odd-length string error":
hex_string = ''.join( ('0' * ( len(hex_string) % 2 ), hex_string ) )
# ::: NOTE :::
# Generator value is stored in REVERSED bytes order.
# dec 830624 --> hex 0x0caca0 --> 0c|ac|a0 --> stored in page as three bytes: {a0; ac; 0c}
# Decode string that is stored in variable 'hex_string' to HEX number,
# REVERSE its bytes and convert it to string again for further search
# in page content:
#n_as_reversed_hex = binascii.hexlify( hex_string.decode('hex')[::-1] )
n_as_reversed_hex = binascii.hexlify( bytes.fromhex(hex_string)[::-1] )
if not n_as_reversed_hex in page_as_hex:
not_found_lst.append([n, n_as_reversed_hex])
else:
any_found_lst.append([n, n_as_reversed_hex])
if (not is_encrypted) and len(not_found_lst) == 0 or is_encrypted and len(any_found_lst) == 0:
print(msg_prefix + SUCCESS_MSG)
else:
if not is_encrypted:
print(msg_prefix + 'UNEXPECTEDLY NOT found sequences:')
for p in not_found_lst:
print(p)
if is_encrypted:
print(msg_prefix + 'UNEXPECTEDLY FOUND sequences:')
for p in any_found_lst:
print(p)
#----------------------------------------------------------------------------------------------
2022-01-23 20:41:55 +01:00
@pytest.mark.encryption
2022-01-23 20:41:55 +01:00
@pytest.mark.version('>=4.0')
def test_1(act_src: Action, act_res: Action, tmp_fbk:Path, capsys):
init_sql = """
set bail on;
create sequence gen_ba0bab start with 12192683;
create sequence gen_badf00d start with 195948557;
create sequence gen_caca0 start with 830624;
create sequence gen_c0ffee start with 12648430;
create sequence gen_dec0de start with 14598366;
create sequence gen_decade start with 14600926;
create sequence gen_7FFFFFFF start with 2147483647;
commit;
"""
act_src.expected_stdout = ''
act_src.isql(switches = ['-q'], input = init_sql, combine_output = True, io_enc = locale.getpreferredencoding())
assert act_src.clean_stdout == act_src.clean_expected_stdout
act_src.reset()
check_sequence_values=[]
with act_src.db.connect() as con:
with con.cursor() as cur:
get_current_seq_values='''
execute block returns( gen_curr bigint) as
declare gen_name rdb$generator_name;
begin
for
select rdb$generator_name from rdb$generators where rdb$system_flag is distinct from 1 order by rdb$generator_id
into gen_name
do begin
execute statement 'execute block returns(g bigint) as begin g = gen_id('|| gen_name ||', 0); suspend; end' into gen_curr;
suspend;
end
end
'''
# Obtain current values of user generators:
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
cur.execute(get_current_seq_values)
for r in cur:
check_sequence_values += r[0],
# Obtain page size and ID of generators page:
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
cur.execute('select m.mon$page_size,min(rdb$page_number) from mon$database m cross join rdb$pages p where p.rdb$page_type = 9 group by 1')
pg_size, gen_page_number = -1,-1
for r in cur:
pg_size=r[0]
gen_page_number=r[1]
# print(r[0],r[1])
# Read gen page, convert it to hex and check whether generator values can be found there or no:
# Expected result: YES for all values because DB not encrypted now.
# ~~~~~~~~~~~~~~~
check_page_for_readable_values(act_src.db.db_path, gen_page_number, pg_size, check_sequence_values, False, 'INIT. ')
act_src.expected_stdout = 'INIT. ' + SUCCESS_MSG
act_src.stdout = capsys.readouterr().out
assert act_src.clean_stdout == act_src.clean_expected_stdout
act_src.reset()
######################################################
t1=py_dt.datetime.now()
d1 = t1-t1
sttm = f'alter database encrypt with "{ENCRYPTION_PLUGIN}" key "{ENCRYPTION_KEY}"'
try:
con.execute_immediate(sttm)
con.commit()
except DatabaseError as e:
print( e.__str__() )
act_src.expected_stdout = ''
act_src.stdout = capsys.readouterr().out
assert act_src.clean_stdout == act_src.clean_expected_stdout
act_src.reset()
while True:
t2=py_dt.datetime.now()
d1=t2-t1
if d1.seconds*1000 + d1.microseconds//1000 > MAX_WAITING_ENCR_FINISH:
con.execute_immediate(f"select 'TIMEOUT EXPIRATION: encryption took {d1.seconds*1000 + d1.microseconds//1000} ms which exceeds limit = {MAX_WAITING_ENCR_FINISH} ms.' as msg from rdb$database")
break
# Possible output:
# Database not encrypted
# Database encrypted, crypt thread not complete
act_src.isql(switches=['-q'], input = 'show database;', combine_output = True)
if 'Database encrypted' in act_src.stdout:
if 'not complete' in act_src.stdout:
pass
else:
break
act_src.reset()
if d1.seconds*1000 + d1.microseconds//1000 <= MAX_WAITING_ENCR_FINISH:
act_src.reset()
act_src.gstat(switches=['-e'])
# Data pages: total 884803, encrypted 884803, non-crypted 0
# ...
pattern = re.compile('(data|index|blob|generator)\\s+pages[:]{0,1}\\s+total[:]{0,1}\\s+\\d+[,]{0,1}\\s+encrypted[:]{0,1}\\s+\\d+.*[,]{0,1}non-crypted[:]{0,1}\\s+\\d+.*', re.IGNORECASE)
for line in act_src.stdout.splitlines():
if pattern.match(line.strip()):
# We assume that every line finishes with number of NON-crypted pages, and this number must be 0:
words = line.split()
if words[-1] == '0':
print(words[0] + ': expected, ' + words[-1])
else:
print(words[0] + ': UNEXPECTED, ' + words[-1])
expected_gstat_tail = """
Data: expected, 0
Index: expected, 0
Blob: expected, 0
Generator: expected, 0
"""
act_src.expected_stdout = expected_gstat_tail
act_src.stdout = capsys.readouterr().out
assert act_src.clean_stdout == act_src.clean_expected_stdout
act_src.reset()
else:
print(f'TIMEOUT EXPIRATION: encryption took {d1.seconds*1000 + d1.microseconds//1000} ms which exceeds limit = {MAX_WAITING_ENCR_FINISH} ms.')
######################################################
# see also core_6071_test.py:
act_src.gbak(switches=['-b', '-KEYHOLDER', ENCRYPTION_HOLDER, '-crypt', ENCRYPTION_PLUGIN, act_src.db.dsn, str(tmp_fbk)])
act_src.reset()
act_src.gbak(switches=['-rep', '-KEYHOLDER', ENCRYPTION_HOLDER, str(tmp_fbk), act_res.db.dsn ])
act_src.reset()
act_src.gfix(switches=['-v', '-full', str(act_res.db.db_path)])
act_src.reset()
# Read gen page in RESTORED database, convert it to hex and check whether generator values can be found there or no.
# Expected result: NOT for all values because DB was encrypted.
# ~~~~~~~~~~~~~~~~
check_page_for_readable_values(act_res.db.db_path, gen_page_number, pg_size, check_sequence_values, True, 'FINAL. ')
act_src.expected_stdout = 'FINAL. ' + SUCCESS_MSG
act_src.stdout = capsys.readouterr().out
assert act_src.clean_stdout == act_src.clean_expected_stdout
act_src.reset()
#< with act_src.db.connect()
# Final check: ensure that sequences have proper values:
##############
act_res.expected_stdout = """
Generator GEN_7FFFFFFF, current value: 2147483646, initial value: 2147483647, increment: 1
Generator GEN_BA0BAB, current value: 12192682, initial value: 12192683, increment: 1
Generator GEN_BADF00D, current value: 195948556, initial value: 195948557, increment: 1
Generator GEN_C0FFEE, current value: 12648429, initial value: 12648430, increment: 1
Generator GEN_CACA0, current value: 830623, initial value: 830624, increment: 1
Generator GEN_DEC0DE, current value: 14598365, initial value: 14598366, increment: 1
Generator GEN_DECADE, current value: 14600925, initial value: 14600926, increment: 1
"""
act_res.isql(switches = ['-q'], input = 'show sequ;', combine_output = True, io_enc = locale.getpreferredencoding())
assert act_res.clean_stdout == act_res.clean_expected_stdout
act_res.reset()