mirror of
https://github.com/FirebirdSQL/firebird-qa.git
synced 2025-01-22 21:43:06 +01:00
Added/Updated bugs\core_5077_test.py: Checked on 4.0.1.2692, 3.0.8.33535 - both on Linux and Windows.
This commit is contained in:
parent
205706fb7b
commit
32e9b1f04b
@ -5,168 +5,91 @@ ID: issue-5364
|
||||
ISSUE: 5364
|
||||
TITLE: ISQL does not show encryption status of database
|
||||
DESCRIPTION:
|
||||
We create new database and try to encrypt it usng IBSurgeon Demo Encryption package
|
||||
( https://ib-aid.com/download-demo-firebird-encryption-plugin/ ; https://ib-aid.com/download/crypt/CryptTest.zip )
|
||||
License file plugins\\dbcrypt.conf with unlimited expiration was provided by IBSurgeon to Firebird Foundation (FF).
|
||||
This file was preliminary stored in FF Test machine.
|
||||
Test assumes that this file and all neccessary libraries already were stored into FB_HOME and %FB_HOME%\\plugins.
|
||||
Test uses Firebird built-in encryption plugin wich actually does encryption using trivial algorithm.
|
||||
Before running this test following prerequisites must be met:
|
||||
1. Files fbSampleDbCrypt.conf and libfbSampleDbCrypt.so/fbSampleDbCrypt.dll must present in the $FB_HOME/plugins folder;
|
||||
2. File fbSampleDbCrypt.conf must contain line: Auto = yes
|
||||
3. File $QA_HOME/pytest.ini must contain line with 'encryption' marker declaration.
|
||||
### ACHTUNG ###
|
||||
Unfortunately, there is no files fbSampleDbCrypt.* in FB 3.x snapshots (at least for June-2022).
|
||||
One need to take both thiese files from any FB 4.x snapshot (they have backward compatibility).
|
||||
On FB 4.x these files can be found here: $FB_HOME/examples/prebuilt/plugins/
|
||||
###############
|
||||
|
||||
After test database will be created, we try to encrypt it using 'alter database encrypt with <plugin_name> ...' command
|
||||
(where <plugin_name> = dbcrypt - name of .dll in FB_HOME\\plugins\\ folder that implements encryption).
|
||||
Then we allow engine to complete this job - take delay about 1..2 seconds BEFORE detach from database.
|
||||
We open connection to DB and run 'ALTER DATABASE ENCRYPT.../DECRYPT'.
|
||||
One need to keep connection opened for several seconds in order to give encryption thread be fully completed.
|
||||
Duration of this delay depends on concurrent workload, usually it is almost zero.
|
||||
But in this test it can be tuned - see variable 'MAX_ENCRYPT_DECRYPT_MS'.
|
||||
Immediately after launch encryption/decryption, we run isql and ask it to give result of 'SHOW DATABASE' command.
|
||||
If this output contains text 'Database [not] encrypted' and *not* contains phrase 'not complete' then we can assume
|
||||
that encryption/decryption thread completed. Otherwise we loop until such conditions will raise or timeout expired.
|
||||
|
||||
After this we run ISQL with 'SHOW DATABASE' command. Its output has to contain string 'Database encrypted'.
|
||||
|
||||
Finally, we change this temp DB state to full shutdown in order to have 100% ability to drop this file.
|
||||
|
||||
Checked on: 4.0.0.1629: OK, 6.264s; 3.0.5.33179: OK, 4.586s.
|
||||
|
||||
13.04.2021. Adapted for run both on Windows and Linux. Checked on:
|
||||
Windows: 4.0.0.2416
|
||||
Linux: 4.0.0.2416
|
||||
Note: different names for encryption plugin and keyholde rare used for Windows vs Linux:
|
||||
PLUGIN_NAME = 'dbcrypt' if os.name == 'nt' else '"fbSampleDbCrypt"'
|
||||
KHOLDER_NAME = 'KeyHolder' if os.name == 'nt' else "fbSampleKeyHolder"
|
||||
JIRA: CORE-5077
|
||||
FBTEST: bugs.core_5077
|
||||
NOTES:
|
||||
[06.06.2022] pzotov
|
||||
Checked on 4.0.1.2692, 3.0.8.33535 - both on Linux and Windows.
|
||||
"""
|
||||
|
||||
import time
|
||||
import datetime as py_dt
|
||||
from datetime import timedelta
|
||||
|
||||
import pytest
|
||||
from firebird.qa import *
|
||||
from firebird.driver import DatabaseError
|
||||
|
||||
MAX_ENCRYPT_DECRYPT_MS = 5000
|
||||
db = db_factory()
|
||||
|
||||
act = python_act('db')
|
||||
|
||||
expected_stdout = """
|
||||
DATABASE ENCRYPTED
|
||||
"""
|
||||
|
||||
@pytest.mark.skip('FIXME: encryption plugin')
|
||||
@pytest.mark.version('>=3.0')
|
||||
def test_1(act: Action):
|
||||
pytest.fail("Not IMPLEMENTED")
|
||||
@pytest.mark.encryption
|
||||
def test_1(act: Action, capsys):
|
||||
for m in ('encryption', 'decryption'):
|
||||
expected_stdout_console = f"""
|
||||
Expected: {m} status presents in the 'SHOW DATABASE' output.
|
||||
"""
|
||||
|
||||
# test_script_1
|
||||
#---
|
||||
#
|
||||
# import os
|
||||
# import time
|
||||
# import subprocess
|
||||
# import re
|
||||
# import fdb
|
||||
# from fdb import services
|
||||
#
|
||||
# os.environ["ISC_USER"] = user_name
|
||||
# os.environ["ISC_PASSWORD"] = user_password
|
||||
# engine = db_conn.engine_version
|
||||
# db_conn.close()
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# def flush_and_close( file_handle ):
|
||||
# # https://docs.python.org/2/library/os.html#os.fsync
|
||||
# # If you're starting with a Python file object f,
|
||||
# # first do f.flush(), and
|
||||
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
|
||||
# global os
|
||||
#
|
||||
# file_handle.flush()
|
||||
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
|
||||
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
|
||||
# os.fsync(file_handle.fileno())
|
||||
# file_handle.close()
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# def cleanup( f_names_list ):
|
||||
# global os
|
||||
# for i in range(len( f_names_list )):
|
||||
# if type(f_names_list[i]) == file:
|
||||
# del_name = f_names_list[i].name
|
||||
# elif type(f_names_list[i]) == str:
|
||||
# del_name = f_names_list[i]
|
||||
# else:
|
||||
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.')
|
||||
# print('type(f_names_list[i])=',type(f_names_list[i]))
|
||||
# del_name = None
|
||||
#
|
||||
# if del_name and os.path.isfile( del_name ):
|
||||
# os.remove( del_name )
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# tmpfdb='$(DATABASE_LOCATION)'+'tmp_core_5077.fdb'
|
||||
#
|
||||
# cleanup( (tmpfdb,) )
|
||||
#
|
||||
# con = fdb.create_database( dsn = 'localhost:'+tmpfdb )
|
||||
#
|
||||
# # 14.04.2021.
|
||||
# # Name of encryption plugin depends on OS:
|
||||
# # * for Windows we (currently) use plugin by IBSurgeon, its name is 'dbcrypt';
|
||||
# # * for Linux we use:
|
||||
# # ** 'DbCrypt_example' for FB 3.x
|
||||
# # ** 'fbSampleDbCrypt' for FB 4.x+
|
||||
# #
|
||||
# PLUGIN_NAME = 'dbcrypt' if os.name == 'nt' else ( '"fbSampleDbCrypt"' if engine >= 4.0 else '"DbCrypt_example"')
|
||||
# KHOLDER_NAME = 'KeyHolder' if os.name == 'nt' else "fbSampleKeyHolder"
|
||||
#
|
||||
# cur = con.cursor()
|
||||
# cur.execute('alter database encrypt with %(PLUGIN_NAME)s key Red' % locals())
|
||||
# ### DOES NOT WORK ON LINUX! ISSUES 'TOKEN UNKNOWN' !! >>> con.execute_immediate('alter database encrypt with %(PLUGIN_NAME)s key Red' % locals()) // sent letter to Alex and dimitr, 14.04.2021
|
||||
# con.commit()
|
||||
# time.sleep(2)
|
||||
# # ^
|
||||
# # +-------- !! ALLOW BACKGROUND ENCRYPTION PROCESS TO COMPLETE ITS JOB !!
|
||||
#
|
||||
# con.close()
|
||||
#
|
||||
# ########################################
|
||||
# # run ISQL with 'SHOW DATABASE' command:
|
||||
# ########################################
|
||||
# f_isql_cmd=open( os.path.join(context['temp_directory'],'tmp_5077.sql'), 'w')
|
||||
# f_isql_cmd.write('show database;')
|
||||
# flush_and_close( f_isql_cmd )
|
||||
#
|
||||
# f_isql_log=open( os.path.join(context['temp_directory'], 'tmp_5077.log'), 'w')
|
||||
# f_isql_err=open( os.path.join(context['temp_directory'], 'tmp_5077.err'), 'w')
|
||||
# subprocess.call( [context['isql_path'], 'localhost:' + tmpfdb, '-q', '-n', '-i', f_isql_cmd.name ], stdout=f_isql_log, stderr = f_isql_err)
|
||||
# flush_and_close( f_isql_log )
|
||||
# flush_and_close( f_isql_err )
|
||||
#
|
||||
# #---------------------------- shutdown temp DB --------------------
|
||||
#
|
||||
# f_dbshut_log = open( os.path.join(context['temp_directory'],'tmp_dbshut_5077.log'), 'w')
|
||||
# subprocess.call( [ context['gfix_path'], 'localhost:'+tmpfdb, "-shut", "full", "-force", "0" ],
|
||||
# stdout = f_dbshut_log,
|
||||
# stderr = subprocess.STDOUT
|
||||
# )
|
||||
# flush_and_close( f_dbshut_log )
|
||||
#
|
||||
# allowed_patterns = (
|
||||
# re.compile( 'Database(\\s+not){0,1}\\s+encrypted\\.*', re.IGNORECASE),
|
||||
# )
|
||||
#
|
||||
# with open( f_isql_log.name,'r') as f:
|
||||
# for line in f:
|
||||
# match2some = filter( None, [ p.search(line) for p in allowed_patterns ] )
|
||||
# if match2some:
|
||||
# print( (' '.join( line.split()).upper() ) )
|
||||
#
|
||||
# with open( f_isql_err.name,'r') as f:
|
||||
# for line in f:
|
||||
# print("Unexpected error when doing 'SHOW DATABASE': "+line)
|
||||
#
|
||||
# with open( f_dbshut_log.name,'r') as f:
|
||||
# for line in f:
|
||||
# print("Unexpected error on SHUTDOWN temp database: "+line)
|
||||
#
|
||||
#
|
||||
# # CLEANUP
|
||||
# #########
|
||||
# time.sleep(1)
|
||||
#
|
||||
# cleanup( ( f_isql_log, f_isql_err, f_isql_cmd, f_dbshut_log,tmpfdb ) )
|
||||
#
|
||||
#---
|
||||
with act.db.connect() as con:
|
||||
t1=py_dt.datetime.now()
|
||||
d1 = t1-t1
|
||||
sttm = 'alter database ' + ('encrypt with "fbSampleDbCrypt" key "Red"' if m == 'encryption' else 'decrypt')
|
||||
try:
|
||||
con.execute_immediate(sttm)
|
||||
con.commit()
|
||||
except DatabaseError as e:
|
||||
print( e.__str__() )
|
||||
|
||||
act.expected_stdout = ''
|
||||
act.stdout = capsys.readouterr().out
|
||||
assert act.clean_stdout == act.clean_expected_stdout
|
||||
act.reset()
|
||||
|
||||
|
||||
while True:
|
||||
t2=py_dt.datetime.now()
|
||||
d1=t2-t1
|
||||
if d1.seconds*1000 + d1.microseconds//1000 > MAX_ENCRYPT_DECRYPT_MS:
|
||||
break
|
||||
|
||||
# Possible output:
|
||||
# Database not encrypted
|
||||
# Database encrypted, crypt thread not complete
|
||||
act.isql(switches=['-q'], input = 'show database;', combine_output = True)
|
||||
if m == 'encryption' and 'Database encrypted' in act.stdout or m == 'decryption' and 'Database not encrypted' in act.stdout:
|
||||
if 'not complete' in act.stdout:
|
||||
pass
|
||||
else:
|
||||
break
|
||||
act.reset()
|
||||
|
||||
if d1.seconds*1000 + d1.microseconds//1000 <= MAX_ENCRYPT_DECRYPT_MS:
|
||||
print(expected_stdout_console)
|
||||
else:
|
||||
print(f'BREAK ON TIMEOUT EXPIRATION: {m.upper()} took {d1.seconds*1000 + d1.microseconds//1000} ms which exceeds limit = {MAX_ENCRYPT_DECRYPT_MS} ms.')
|
||||
|
||||
act.expected_stdout = expected_stdout_console
|
||||
act.stdout = capsys.readouterr().out
|
||||
assert act.clean_stdout == act.clean_expected_stdout
|
||||
act.reset()
|
||||
|
Loading…
Reference in New Issue
Block a user