mirror of
https://github.com/FirebirdSQL/firebird-qa.git
synced 2025-01-22 21:43:06 +01:00
283 lines
12 KiB
Python
283 lines
12 KiB
Python
#coding:utf-8
|
|
|
|
"""
|
|
ID: issue-4842
|
|
ISSUE: 4842
|
|
TITLE: New gbak option to enable encryption during restore
|
|
DESCRIPTION:
|
|
Part of this test was copied from core_6071.fbt.
|
|
|
|
We create several generators in the test DB and get number of generators page using query to RDB$PAGES (page_type = 9).
|
|
Also we get page_size and using these data we can obtain binary content of generatord page.
|
|
We check that for *all* created sequences we can obtain their names and values if read DB file as binary file
|
|
(it must be possible until DB will not encrypted).
|
|
|
|
Then we encrypt DB and wait until encryption process will complete.
|
|
|
|
After this we:
|
|
* Make backup of this temp DB, using gbak utility and '-KEYHOLDER <name_of_key_holder>' command switch.
|
|
* Make restore from just created backup.
|
|
* Make validation of just restored database by issuing command "gfix -v -full ..."
|
|
(i.e. validate both data and metadata rather than online val which can check user data only).
|
|
* Open restored DB as binary file and attempt to read again generators names - this must fail, their names must be encrypted.
|
|
* Make connect to this DB and check that command 'SHOW SEQU' show all generatord and their values.
|
|
JIRA: CORE-4524
|
|
FBTEST: bugs.core_4524
|
|
NOTES:
|
|
[21.09.2022] pzotov
|
|
Test reads settings that are COMMON for all encryption-related tests and stored in act.files_dir/test_config.ini.
|
|
QA-plugin prepares this by defining dictionary with name QA_GLOBALS which reads settings via ConfigParser mechanism.
|
|
|
|
Checked on Linux and Windows: 3.0.8.33535 (SS/CS), 4.0.1.2692 (SS/CS)
|
|
"""
|
|
import os
|
|
import binascii
|
|
import re
|
|
import locale
|
|
import datetime as py_dt
|
|
import time
|
|
from pathlib import Path
|
|
from firebird.driver import DatabaseError
|
|
|
|
import pytest
|
|
from firebird.qa import *
|
|
|
|
db_to_be_encrypted = db_factory()
|
|
db_encrypt_restore = db_factory(filename = 'tmp_core_4524.restored.fdb')
|
|
tmp_fbk = temp_file('tmp_core_4524.encrypted.fbk')
|
|
|
|
act_src = python_act('db_to_be_encrypted')
|
|
act_res = python_act('db_encrypt_restore')
|
|
|
|
# QA_GLOBALS -- dict, is defined in qa/plugin.py, obtain settings
|
|
# from act_src.files_dir/'test_config.ini':
|
|
enc_settings = QA_GLOBALS['encryption']
|
|
|
|
# ACHTUNG: this must be carefully tuned on every new host:
|
|
#
|
|
MAX_WAITING_ENCR_FINISH = int(enc_settings['MAX_WAIT_FOR_ENCR_FINISH_WIN' if os.name == 'nt' else 'MAX_WAIT_FOR_ENCR_FINISH_NIX'])
|
|
assert MAX_WAITING_ENCR_FINISH > 0
|
|
|
|
ENCRYPTION_PLUGIN = enc_settings['encryption_plugin'] # fbSampleDbCrypt
|
|
ENCRYPTION_HOLDER = enc_settings['encryption_holder'] # fbSampleKeyHolder
|
|
ENCRYPTION_KEY = enc_settings['encryption_key'] # Red
|
|
|
|
SUCCESS_MSG = 'All sequences FOUND.'
|
|
|
|
def check_page_for_readable_values(dbname, gen_page_number, pg_size, check_sequence_values, is_encrypted, msg_prefix = ''):
|
|
|
|
db_handle = open( dbname, "rb")
|
|
db_handle.seek( gen_page_number * pg_size )
|
|
page_content = db_handle.read( pg_size )
|
|
db_handle.close()
|
|
page_as_hex=binascii.hexlify( page_content )
|
|
|
|
# Iterate for each sequence value:
|
|
not_found_lst = []
|
|
any_found_lst = []
|
|
for n in check_sequence_values:
|
|
|
|
# Get HEX representation of digital value.
|
|
# NOTE: format( 830624, 'x') is 'caca0' contains five (odd number!) characters.
|
|
hex_string = format(abs(n),'x')
|
|
|
|
# Here we 'pad' hex representation to EVEN number of digits in it,
|
|
# otherwise binascii.hexlify fails with "Odd-length string error":
|
|
hex_string = ''.join( ('0' * ( len(hex_string) % 2 ), hex_string ) )
|
|
|
|
# ::: NOTE :::
|
|
# Generator value is stored in REVERSED bytes order.
|
|
# dec 830624 --> hex 0x0caca0 --> 0c|ac|a0 --> stored in page as three bytes: {a0; ac; 0c}
|
|
|
|
# Decode string that is stored in variable 'hex_string' to HEX number,
|
|
# REVERSE its bytes and convert it to string again for further search
|
|
# in page content:
|
|
#n_as_reversed_hex = binascii.hexlify( hex_string.decode('hex')[::-1] )
|
|
n_as_reversed_hex = binascii.hexlify( bytes.fromhex(hex_string)[::-1] )
|
|
if not n_as_reversed_hex in page_as_hex:
|
|
not_found_lst.append([n, n_as_reversed_hex])
|
|
else:
|
|
any_found_lst.append([n, n_as_reversed_hex])
|
|
|
|
if (not is_encrypted) and len(not_found_lst) == 0 or is_encrypted and len(any_found_lst) == 0:
|
|
print(msg_prefix + SUCCESS_MSG)
|
|
else:
|
|
if not is_encrypted:
|
|
print(msg_prefix + 'UNEXPECTEDLY NOT found sequences:')
|
|
for p in not_found_lst:
|
|
print(p)
|
|
if is_encrypted:
|
|
print(msg_prefix + 'UNEXPECTEDLY FOUND sequences:')
|
|
for p in any_found_lst:
|
|
print(p)
|
|
|
|
#----------------------------------------------------------------------------------------------
|
|
|
|
@pytest.mark.encryption
|
|
@pytest.mark.version('>=4.0')
|
|
def test_1(act_src: Action, act_res: Action, tmp_fbk:Path, capsys):
|
|
|
|
init_sql = """
|
|
set bail on;
|
|
create sequence gen_ba0bab start with 12192683;
|
|
create sequence gen_badf00d start with 195948557;
|
|
create sequence gen_caca0 start with 830624;
|
|
create sequence gen_c0ffee start with 12648430;
|
|
create sequence gen_dec0de start with 14598366;
|
|
create sequence gen_decade start with 14600926;
|
|
create sequence gen_7FFFFFFF start with 2147483647;
|
|
commit;
|
|
"""
|
|
|
|
act_src.expected_stdout = ''
|
|
act_src.isql(switches = ['-q'], input = init_sql, combine_output = True, io_enc = locale.getpreferredencoding())
|
|
assert act_src.clean_stdout == act_src.clean_expected_stdout
|
|
act_src.reset()
|
|
|
|
check_sequence_values=[]
|
|
with act_src.db.connect() as con:
|
|
with con.cursor() as cur:
|
|
get_current_seq_values='''
|
|
execute block returns( gen_curr bigint) as
|
|
declare gen_name rdb$generator_name;
|
|
begin
|
|
for
|
|
select rdb$generator_name from rdb$generators where rdb$system_flag is distinct from 1 order by rdb$generator_id
|
|
into gen_name
|
|
do begin
|
|
execute statement 'execute block returns(g bigint) as begin g = gen_id('|| gen_name ||', 0); suspend; end' into gen_curr;
|
|
suspend;
|
|
end
|
|
end
|
|
'''
|
|
|
|
# Obtain current values of user generators:
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
cur.execute(get_current_seq_values)
|
|
for r in cur:
|
|
check_sequence_values += r[0],
|
|
|
|
|
|
# Obtain page size and ID of generators page:
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
cur.execute('select m.mon$page_size,min(rdb$page_number) from mon$database m cross join rdb$pages p where p.rdb$page_type = 9 group by 1')
|
|
pg_size, gen_page_number = -1,-1
|
|
for r in cur:
|
|
pg_size=r[0]
|
|
gen_page_number=r[1]
|
|
# print(r[0],r[1])
|
|
|
|
|
|
# Read gen page, convert it to hex and check whether generator values can be found there or no:
|
|
# Expected result: YES for all values because DB not encrypted now.
|
|
# ~~~~~~~~~~~~~~~
|
|
check_page_for_readable_values(act_src.db.db_path, gen_page_number, pg_size, check_sequence_values, False, 'INIT. ')
|
|
|
|
act_src.expected_stdout = 'INIT. ' + SUCCESS_MSG
|
|
act_src.stdout = capsys.readouterr().out
|
|
assert act_src.clean_stdout == act_src.clean_expected_stdout
|
|
act_src.reset()
|
|
|
|
######################################################
|
|
|
|
t1=py_dt.datetime.now()
|
|
d1 = t1-t1
|
|
sttm = f'alter database encrypt with "{ENCRYPTION_PLUGIN}" key "{ENCRYPTION_KEY}"'
|
|
try:
|
|
con.execute_immediate(sttm)
|
|
con.commit()
|
|
except DatabaseError as e:
|
|
print( e.__str__() )
|
|
|
|
act_src.expected_stdout = ''
|
|
act_src.stdout = capsys.readouterr().out
|
|
assert act_src.clean_stdout == act_src.clean_expected_stdout
|
|
act_src.reset()
|
|
|
|
while True:
|
|
t2=py_dt.datetime.now()
|
|
d1=t2-t1
|
|
if d1.seconds*1000 + d1.microseconds//1000 > MAX_WAITING_ENCR_FINISH:
|
|
con.execute_immediate(f"select 'TIMEOUT EXPIRATION: encryption took {d1.seconds*1000 + d1.microseconds//1000} ms which exceeds limit = {MAX_WAITING_ENCR_FINISH} ms.' as msg from rdb$database")
|
|
break
|
|
|
|
# Possible output:
|
|
# Database not encrypted
|
|
# Database encrypted, crypt thread not complete
|
|
act_src.isql(switches=['-q'], input = 'show database;', combine_output = True)
|
|
if 'Database encrypted' in act_src.stdout:
|
|
if 'not complete' in act_src.stdout:
|
|
pass
|
|
else:
|
|
break
|
|
act_src.reset()
|
|
|
|
if d1.seconds*1000 + d1.microseconds//1000 <= MAX_WAITING_ENCR_FINISH:
|
|
act_src.reset()
|
|
act_src.gstat(switches=['-e'])
|
|
|
|
# Data pages: total 884803, encrypted 884803, non-crypted 0
|
|
# ...
|
|
pattern = re.compile('(data|index|blob|generator)\\s+pages[:]{0,1}\\s+total[:]{0,1}\\s+\\d+[,]{0,1}\\s+encrypted[:]{0,1}\\s+\\d+.*[,]{0,1}non-crypted[:]{0,1}\\s+\\d+.*', re.IGNORECASE)
|
|
for line in act_src.stdout.splitlines():
|
|
if pattern.match(line.strip()):
|
|
# We assume that every line finishes with number of NON-crypted pages, and this number must be 0:
|
|
words = line.split()
|
|
if words[-1] == '0':
|
|
print(words[0] + ': expected, ' + words[-1])
|
|
else:
|
|
print(words[0] + ': UNEXPECTED, ' + words[-1])
|
|
|
|
expected_gstat_tail = """
|
|
Data: expected, 0
|
|
Index: expected, 0
|
|
Blob: expected, 0
|
|
Generator: expected, 0
|
|
"""
|
|
act_src.expected_stdout = expected_gstat_tail
|
|
act_src.stdout = capsys.readouterr().out
|
|
assert act_src.clean_stdout == act_src.clean_expected_stdout
|
|
act_src.reset()
|
|
|
|
else:
|
|
print(f'TIMEOUT EXPIRATION: encryption took {d1.seconds*1000 + d1.microseconds//1000} ms which exceeds limit = {MAX_WAITING_ENCR_FINISH} ms.')
|
|
|
|
######################################################
|
|
|
|
# see also core_6071_test.py:
|
|
act_src.gbak(switches=['-b', '-KEYHOLDER', ENCRYPTION_HOLDER, '-crypt', ENCRYPTION_PLUGIN, act_src.db.dsn, str(tmp_fbk)])
|
|
act_src.reset()
|
|
|
|
act_src.gbak(switches=['-rep', '-KEYHOLDER', ENCRYPTION_HOLDER, str(tmp_fbk), act_res.db.dsn ])
|
|
act_src.reset()
|
|
|
|
act_src.gfix(switches=['-v', '-full', str(act_res.db.db_path)])
|
|
act_src.reset()
|
|
|
|
# Read gen page in RESTORED database, convert it to hex and check whether generator values can be found there or no.
|
|
# Expected result: NOT for all values because DB was encrypted.
|
|
# ~~~~~~~~~~~~~~~~
|
|
check_page_for_readable_values(act_res.db.db_path, gen_page_number, pg_size, check_sequence_values, True, 'FINAL. ')
|
|
|
|
act_src.expected_stdout = 'FINAL. ' + SUCCESS_MSG
|
|
act_src.stdout = capsys.readouterr().out
|
|
assert act_src.clean_stdout == act_src.clean_expected_stdout
|
|
act_src.reset()
|
|
|
|
#< with act_src.db.connect()
|
|
|
|
# Final check: ensure that sequences have proper values:
|
|
##############
|
|
act_res.expected_stdout = """
|
|
Generator GEN_7FFFFFFF, current value: 2147483646, initial value: 2147483647, increment: 1
|
|
Generator GEN_BA0BAB, current value: 12192682, initial value: 12192683, increment: 1
|
|
Generator GEN_BADF00D, current value: 195948556, initial value: 195948557, increment: 1
|
|
Generator GEN_C0FFEE, current value: 12648429, initial value: 12648430, increment: 1
|
|
Generator GEN_CACA0, current value: 830623, initial value: 830624, increment: 1
|
|
Generator GEN_DEC0DE, current value: 14598365, initial value: 14598366, increment: 1
|
|
Generator GEN_DECADE, current value: 14600925, initial value: 14600926, increment: 1
|
|
"""
|
|
act_res.isql(switches = ['-q'], input = 'show sequ;', combine_output = True, io_enc = locale.getpreferredencoding())
|
|
assert act_res.clean_stdout == act_res.clean_expected_stdout
|
|
act_res.reset()
|