6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-02-02 10:50:42 +01:00

Added/Updated bugs\core_5796_test.py: Checked on 4.0.1.2692, 3.0.8.33535 - both on Linux and Windows.

This commit is contained in:
zotov 2022-06-12 14:20:44 +03:00
parent 0afe893260
commit 83793b347a

View File

@ -5,185 +5,116 @@ ID: issue-6059
ISSUE: 6059
TITLE: gstat may produce faulty report about presence of some none-encrypted pages in database
DESCRIPTION:
We create new database ('tmp_core_5796.fdb') and try to encrypt it usng IBSurgeon Demo Encryption package
( https://ib-aid.com/download-demo-firebird-encryption-plugin/ ; https://ib-aid.com/download/crypt/CryptTest.zip )
License file plugins\\dbcrypt.conf with unlimited expiration was provided by IBSurgeon to Firebird Foundation (FF).
This file was preliminary stored in FF Test machine.
Test assumes that this file and all neccessary libraries already were stored into FB_HOME and %FB_HOME%\\plugins.
After test database will be created, we try to encrypt it using 'alter database encrypt with <plugin_name> ...' command
(where <plugin_name> = dbcrypt - name of .dll in FB_HOME\\plugins\\ folder that implements encryption).
Then we allow engine to complete this job - take delay about 1..2 seconds BEFORE detach from database.
Test has been fully re-implemented because its initial version was based on old gstat that did not know '-e' command switch.
We create two tables and add data into both of them. After this we DROP first table which must lead to deallocating some
valuable number of pages (of all types: data, index and blobs).
Also, we create lot of generators in order they occupy more than one generators page.
After this we detach from DB, run 'gstat -h' and filter its attributes and messages from 'Variable header' section.
In the output of gstat we check that its 'tail' will look like this:
===
Attributes force write, encrypted, plugin DBCRYPT
Crypt checksum: MUB2NTJqchh9RshmP6xFAiIc2iI=
Key hash: ask88tfWbinvC6b1JvS9Mfuh47c=
Encryption key name: RED
===
(concrete values for checksum and hash will be ignored - see 'substitutions' section).
Then we run 'ALTER DATABASE ENCRYPT ...' and wait until 'SHOW DATABASE' will display text that database encrypted.
Finally, we run 'gstat -e ...' and check its output for presence of lines with statistics for encrypred/non-encrypted pages.
Every such line must finish with '0' (this is number of NON-crypted pages).
Finally, we change this temp DB statee to full shutdown in order to have 100% ability to drop this file.
15.04.2021. Adapted for run both on Windows and Linux. Checked on:
Windows: 4.0.0.2416
Linux: 4.0.0.2416
JIRA: CORE-5796
FBTEST: bugs.core_5796
NOTES:
[12.06.2022] pzotov
Checked on 4.0.1.2692, 3.0.8.33535 - both on Linux and Windows.
"""
import re
import time
import datetime as py_dt
from datetime import timedelta
import pytest
from firebird.qa import *
from firebird.driver import DatabaseError
substitutions = [('ATTRIBUTES .* ENCRYPTED, PLUGIN .*', 'ATTRIBUTES ENCRYPTED'),
('CRYPT CHECKSUM.*', 'CRYPT CHECKSUM'), ('KEY HASH.*', 'KEY HASH'),
('ENCRYPTION KEY NAME.*', 'ENCRYPTION KEY')]
db = db_factory()
act = python_act('db', substitutions=substitutions)
expected_stdout = """
ATTRIBUTES FORCE WRITE, ENCRYPTED, PLUGIN DBCRYPT
CRYPT CHECKSUM: MUB2NTJQCHH9RSHMP6XFAIIC2II=
KEY HASH: ASK88TFWBINVC6B1JVS9MFUH47C=
ENCRYPTION KEY NAME: RED
N_ROWS = 25
init_script = f"""
create table test1(s varchar(784) unique, b blob);
create table test2(s varchar(784) unique, b blob);
commit;
insert into test1(s) select lpad('',784,uuid_to_char(gen_uuid())) from (select 1 i from rdb$types rows {N_ROWS}),(select 1 i from rdb$types rows {N_ROWS});
insert into test2(s) select lpad('',784,uuid_to_char(gen_uuid())) from (select 1 i from rdb$types rows {N_ROWS});
update test1 set b = (select list( (select gen_uuid() from rdb$database) ) from (select 1 i from rdb$types rows {N_ROWS}),(select 1 i from rdb$types rows {N_ROWS}));
update test2 set b = (select list( (select gen_uuid() from rdb$database) ) from (select 1 i from rdb$types rows {N_ROWS}),(select 1 i from rdb$types rows {N_ROWS}));
commit;
drop table test1;
commit;
"""
@pytest.mark.skip('FIXME: encryption plugin')
@pytest.mark.version('>=3.0.4')
def test_1(act: Action):
pytest.fail("Not IMPLEMENTED")
db = db_factory(init = init_script)
act = python_act('db')
# test_script_1
#---
#
# import os
# import time
# import subprocess
# import re
# import fdb
#
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
# engine = db_conn.engine_version
# db_conn.close()
#
# #--------------------------------------------
#
# def flush_and_close( file_handle ):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
# #--------------------------------------------
#
# def cleanup( f_names_list ):
# global os
# for i in range(len( f_names_list )):
# if type(f_names_list[i]) == file:
# del_name = f_names_list[i].name
# elif type(f_names_list[i]) == str:
# del_name = f_names_list[i]
# else:
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.')
# print('type(f_names_list[i])=',type(f_names_list[i]))
# del_name = None
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
# #--------------------------------------------
#
#
# tmpfdb='$(DATABASE_LOCATION)'+'tmp_core_5796.fdb'
#
# cleanup( (tmpfdb,) )
#
# con = fdb.create_database( dsn = 'localhost:'+tmpfdb )
# cur = con.cursor()
#
# # 14.04.2021.
# # Name of encryption plugin depends on OS:
# # * for Windows we (currently) use plugin by IBSurgeon, its name is 'dbcrypt';
# # * for Linux we use:
# # ** 'DbCrypt_example' for FB 3.x
# # ** 'fbSampleDbCrypt' for FB 4.x+
# #
# PLUGIN_NAME = 'dbcrypt' if os.name == 'nt' else ( '"fbSampleDbCrypt"' if engine >= 4.0 else '"DbCrypt_example"')
#
# ##############################################
# # WARNING! Do NOT use 'connection_obj.execute_immediate()' for ALTER DATABASE ENCRYPT... command!
# # There is bug in FB driver which leads this command to fail with 'token unknown' message
# # The reason is that execute_immediate() silently set client dialect = 0 and any encryption statement
# # can not be used for such value of client dialect.
# # One need to to use only cursor_obj.execute() for encryption!
# # See letter from Pavel Cisar, 20.01.20 10:36
# ##############################################
#
# cur.execute('alter database encrypt with %(PLUGIN_NAME)s key Red' % locals())
# con.commit()
# time.sleep(2)
# # ^
# # +-------- !! ALLOW BACKGROUND ENCRYPTION PROCESS TO COMPLETE ITS JOB !!
#
# con.close()
#
# #--------------------------------- get DB header info --------------------
#
# f_gstat_log = open( os.path.join(context['temp_directory'],'tmp_dbstat_5796.log'), 'w')
# f_gstat_err = open( os.path.join(context['temp_directory'],'tmp_dbstat_5796.err'), 'w')
#
# subprocess.call( [ context['gstat_path'], "-e", "localhost:"+tmpfdb],
# stdout = f_gstat_log,
# stderr = f_gstat_err
# )
#
#
# flush_and_close( f_gstat_log )
# flush_and_close( f_gstat_err )
#
# #--------------------------------- shutdown temp DB --------------------
#
# f_dbshut_log = open( os.path.join(context['temp_directory'],'tmp_dbshut_5796.log'), 'w')
# subprocess.call( [ context['gfix_path'], 'localhost:'+tmpfdb, "-shut", "full", "-force", "0" ],
# stdout = f_dbshut_log,
# stderr = subprocess.STDOUT
# )
# flush_and_close( f_dbshut_log )
#
# allowed_patterns = (
# re.compile( '\\s*Attributes\\.*', re.IGNORECASE)
# ,re.compile('crypt\\s+checksum:\\s+\\S+', re.IGNORECASE)
# ,re.compile('key\\s+hash:\\s+\\S+', re.IGNORECASE)
# ,re.compile('encryption\\s+key\\s+name:\\s+\\S+', re.IGNORECASE)
# )
#
# with open( f_gstat_log.name,'r') as f:
# for line in f:
# match2some = filter( None, [ p.search(line) for p in allowed_patterns ] )
# if match2some:
# print( (' '.join( line.split()).upper() ) )
#
# with open( f_gstat_err.name,'r') as f:
# for line in f:
# print("Unexpected STDERR: "+line)
#
# # cleanup:
# ##########
# time.sleep(1)
# cleanup( (f_gstat_log, f_gstat_err, f_dbshut_log, tmpfdb) )
#
#
#
#---
MAX_ENCRYPT_DECRYPT_MS = 5000
ENCRYPTION_PLUGIN = 'fbSampleDbCrypt'
ENCRYPTION_KEY = 'Red'
@pytest.mark.encryption
@pytest.mark.version('>=3.0.4')
def test_1(act: Action, capsys):
with act.db.connect() as con:
t1=py_dt.datetime.now()
d1 = t1-t1
sttm = f'alter database encrypt with "{ENCRYPTION_PLUGIN}" key "{ENCRYPTION_KEY}"'
try:
con.execute_immediate(sttm)
con.commit()
except DatabaseError as e:
print( e.__str__() )
act.expected_stdout = ''
act.stdout = capsys.readouterr().out
assert act.clean_stdout == act.clean_expected_stdout
act.reset()
while True:
t2=py_dt.datetime.now()
d1=t2-t1
if d1.seconds*1000 + d1.microseconds//1000 > MAX_ENCRYPT_DECRYPT_MS:
con.execute_immediate(f"select 'TIMEOUT EXPIRATION: encryption took {d1.seconds*1000 + d1.microseconds//1000} ms which exceeds limit = {MAX_ENCRYPT_DECRYPT_MS} ms.' as msg from rdb$database")
break
# Possible output:
# Database not encrypted
# Database encrypted, crypt thread not complete
act.isql(switches=['-q'], input = 'show database;', combine_output = True)
if 'Database encrypted' in act.stdout:
if 'not complete' in act.stdout:
pass
else:
break
act.reset()
if d1.seconds*1000 + d1.microseconds//1000 <= MAX_ENCRYPT_DECRYPT_MS:
act.reset()
act.gstat(switches=['-e'])
# Data pages: total 884803, encrypted 884803, non-crypted 0
# ...
pattern = re.compile('(data|index|blob|generator)\\s+pages[:]{0,1}\\s+total[:]{0,1}\\s+\\d+[,]{0,1}\\s+encrypted[:]{0,1}\\s+\\d+.*[,]{0,1}non-crypted[:]{0,1}\\s+\\d+.*', re.IGNORECASE)
for line in act.stdout.splitlines():
if pattern.match(line.strip()):
# We assume that every line finishes with number of NON-crypted pages, and this number must be 0:
words = line.split()
if words[-1] == '0':
print(words[0] + ': expected, ' + words[-1])
else:
print(words[0] + ': UNEXPECTED, ' + words[-1])
else:
print(f'TIMEOUT EXPIRATION: encryption took {d1.seconds*1000 + d1.microseconds//1000} ms which exceeds limit = {MAX_ENCRYPT_DECRYPT_MS} ms.')
act.expected_stdout = """
Data: expected, 0
Index: expected, 0
Blob: expected, 0
Generator: expected, 0
"""
act.stdout = capsys.readouterr().out
assert act.clean_stdout == act.clean_expected_stdout