mirror of
https://github.com/FirebirdSQL/firebird-qa.git
synced 2025-01-22 21:43:06 +01:00
Added/Updated functional\replication\test_blob_segments_longer_32kb_are_not_replicated.py. Checked on 5.0.0.623, 4.0.1.2692 - both on Windows and Linux. PytestAssertRewriteWarning raises, reason currently is unknown.
This commit is contained in:
parent
65e8c0c432
commit
f34dcf3cb5
@ -5,336 +5,304 @@ ID: replication.blob_segments_longer_32kb_are_not_replicated
|
||||
ISSUE: 6893
|
||||
TITLE: Problem with replication of BLOB segments longer than 32KB
|
||||
DESCRIPTION:
|
||||
Test creates table with blob column and performs trivial scenario:
|
||||
insert into test(id, b) values( 1, <literal_with_length_more_than_32k> );
|
||||
Test creates table with blob column and loads binary data into this table (using stream API).
|
||||
We store crypt_hash() of this blob in the variable that will be further used for check correctness
|
||||
of replication results (i.e. when blob data will be completely transferred to db_repl).
|
||||
|
||||
After this we do connect and query ID of last generated segment by querying REPLICATION_SEQUENCE variable
|
||||
from SYSTEM context namespace.
|
||||
|
||||
Then we wait until replica becomes actual to master, and this delay will last no more then threshold
|
||||
After this we wait until replica becomes actual to master, and this delay will last no more then threshold
|
||||
that is defined by MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG variable (measured in seconds).
|
||||
During this delay, we check every second for replication log and search there line with number of last generated
|
||||
segment (which was replicated and deleting finally).
|
||||
We can assume that replication finished OK only when such line is found see ('POINT-1').
|
||||
Then we check that both databases have same data in the user table 'test'.
|
||||
|
||||
Then we invoke ISQL with executing auxiliary script for drop all DB objects on master (with '-nod' command switch).
|
||||
Further, we invoke ISQL with executing auxiliary script for drop all DB objects on master (with '-nod' command switch).
|
||||
After all objects will be dropped, we have to wait again until replica becomes actual with master (see 'POINT-2').
|
||||
|
||||
Finally, we extract metadata for master and replica and compare them (see 'f_meta_diff').
|
||||
The only difference in metadata must be 'CREATE DATABASE' statement with different DB names - we suppress it,
|
||||
thus metadata difference must not be issued.
|
||||
|
||||
################
|
||||
### N O T E ###
|
||||
################
|
||||
Test assumes that master and replica DB have been created beforehand.
|
||||
Also, it assumes that %FB_HOME%/replication.conf has been prepared with apropriate parameters for replication.
|
||||
Particularly, name of directories and databases must have info about checked FB major version and ServerMode.
|
||||
* verbose = true // in order to find out line with message that required segment was replicated
|
||||
* section for master database with specified parameters:
|
||||
journal_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.journal"
|
||||
journal_archive_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.archive"
|
||||
journal_archive_command = "copy $(pathname) $(archivepathname)"
|
||||
journal_archive_timeout = 10
|
||||
* section for replica database with specified parameter:
|
||||
journal_source_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.archive"
|
||||
|
||||
Master and replica databases must be created in "%FBT_REPO% mp" directory and have names like these:
|
||||
'fbt-main.fb40.SS.fdb'; 'fbt-repl.fb40.SS.fdb'; - for FB 4.x ('SS' = Super; 'CS' = Classic)
|
||||
'fbt-main.fb50.SS.fdb'; 'fbt-repl.fb50.SS.fdb'; - for FB 5.x ('SS' = Super; 'CS' = Classic)
|
||||
NB: fixed numeric value ('40' or '50') must be used for any minor FB version (4.0; 4.0.1; 4.1; 5.0; 5.1 etc)
|
||||
|
||||
These two databases must NOT be dropped in any of tests related to replication!
|
||||
They are created and dropped in the batch scenario which prepares FB instance to be checked for each ServerMode
|
||||
and make cleanup after it, i.e. when all tests will be completed.
|
||||
|
||||
NB. Currently this task presents only in Windows batch, thus test has attribute platform = 'Windows'.
|
||||
|
||||
Temporary comment. For debug purpoces:
|
||||
1) find out SUFFIX of the name of FB service which is to be tested (e.g. 'DefaultInstance', '40SS' etc);
|
||||
2) copy file %fbt-repo%/tests/functional/tabloid/batches/setup-fb-for-replication.bat.txt
|
||||
to some place and rename it "*.bat";
|
||||
3) open this .bat in editor and asjust value of 'fbt_repo' variable;
|
||||
4) run: setup-fb-for-replication.bat [SUFFIX_OF_FB_SERVICE]
|
||||
where SUFFIX_OF_FB_SERVICE is ending part of FB service which you want to check:
|
||||
DefaultInstance ; 40ss ; 40cs ; 50ss ; 50cs etc
|
||||
5) batch 'setup-fb-for-replication.bat' will:
|
||||
* stop selected FB instance
|
||||
* create test databases (in !fbt_repo!/tmp);
|
||||
* prepare journal/archive sub-folders for replication (also in !fbt_repo! mp\\);
|
||||
* replace %fb_home%/replication.conf with apropriate
|
||||
* start selected FB instance
|
||||
6) run this test (FB instance will be already launched by setup-fb-for-replication.bat):
|
||||
%fpt_repo%/fbt-run2.bat dblevel-triggers-must-not-fire-on-replica.fbt 50ss, etc
|
||||
|
||||
Confirmed bug on 5.0.0.88, 4.0.1.2523: record appears on replica but blob will be NULL.
|
||||
Checked on: 5.0.0.120, 4.0.1.2547 -- all OK.
|
||||
FBTEST: functional.replication.blob_segments_longer_32kb_are_not_replicated
|
||||
NOTES:
|
||||
[23.08.2022] pzotov
|
||||
1. In case of any errors (somewhat_failed <> 0) test will re-create db_main and db_repl, and then perform all needed
|
||||
actions to resume replication (set 'replica' flag on db_repl, enabling publishing in db_main, remove all files
|
||||
from subdirectories <repl_journal> and <repl_archive> which must present in the same folder as <db_main>).
|
||||
2. Warning raises on Windows and Linux:
|
||||
../../../usr/local/lib/python3.9/site-packages/_pytest/config/__init__.py:1126
|
||||
/usr/local/lib/python3.9/site-packages/_pytest/config/__init__.py:1126:
|
||||
PytestAssertRewriteWarning: Module already imported so cannot be rewritten: __editable___firebird_qa_0_17_0_finder
|
||||
self._mark_plugins_for_rewrite(hook)
|
||||
The reason currently is unknown.
|
||||
|
||||
Checked on 5.0.0.623, 4.0.1.2692 - both CS and SS. Both on Windows and Linux.
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import re
|
||||
from difflib import unified_diff
|
||||
from pathlib import Path
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from firebird.qa import *
|
||||
from firebird.driver import connect, create_database, DbWriteMode, ReplicaMode
|
||||
|
||||
# QA_GLOBALS -- dict, is defined in qa/plugin.py, obtain settings
|
||||
# from act.files_dir/'test_config.ini':
|
||||
repl_settings = QA_GLOBALS['replication']
|
||||
|
||||
MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = int(repl_settings['max_time_for_wait_segment_in_log'])
|
||||
MAIN_DB_ALIAS = repl_settings['main_db_alias']
|
||||
REPL_DB_ALIAS = repl_settings['repl_db_alias']
|
||||
|
||||
db_main = db_factory( filename = '#' + MAIN_DB_ALIAS, do_not_create = True, do_not_drop = True)
|
||||
db_repl = db_factory( filename = '#' + REPL_DB_ALIAS, do_not_create = True, do_not_drop = True)
|
||||
|
||||
substitutions = [('Start removing objects in:.*', 'Start removing objects'),
|
||||
('Finish. Total objects removed: [1-9]\\d*', 'Finish. Total objects removed'),
|
||||
('.* CREATE DATABASE .*', '')]
|
||||
('.* CREATE DATABASE .*', ''),
|
||||
('FOUND message about replicated segment N .*', 'FOUND message about replicated segment')]
|
||||
act_db_main = python_act('db_main', substitutions=substitutions)
|
||||
act_db_repl = python_act('db_repl', substitutions=substitutions)
|
||||
|
||||
db = db_factory()
|
||||
# Length of generated blob:
|
||||
DATA_LEN = 65 * 1024 * 1024
|
||||
tmp_data = temp_file(filename = 'tmp_blob_for_replication.dat')
|
||||
|
||||
act = python_act('db', substitutions=substitutions)
|
||||
#--------------------------------------------
|
||||
|
||||
expected_stdout = """
|
||||
POINT-1 FOUND message about replicated segment.
|
||||
REPLICATED_BLOB_OCTET_LEN 65533
|
||||
Start removing objects
|
||||
Finish. Total objects removed
|
||||
POINT-2 FOUND message about replicated segment.
|
||||
"""
|
||||
def cleanup_folder(p):
|
||||
# Removed all files and subdirs in the folder <p>
|
||||
# Used for cleanup <repl_journal> and <repl_archive> when replication must be reset
|
||||
# in case when any error occurred during test execution.
|
||||
assert os.path.dirname(p) != p, f"@@@ ABEND @@@ CAN NOT operate in the file system root directory. Check your code!"
|
||||
for root, dirs, files in os.walk(p):
|
||||
for f in files:
|
||||
os.unlink(os.path.join(root, f))
|
||||
for d in dirs:
|
||||
shutil.rmtree(os.path.join(root, d))
|
||||
return len(os.listdir(p))
|
||||
|
||||
@pytest.mark.skip('FIXME: Not IMPLEMENTED')
|
||||
@pytest.mark.version('>=4.0')
|
||||
@pytest.mark.platform('Windows')
|
||||
def test_1(act: Action):
|
||||
pytest.fail("Not IMPLEMENTED")
|
||||
#--------------------------------------------
|
||||
|
||||
# test_script_1
|
||||
#---
|
||||
#
|
||||
# import os
|
||||
# import subprocess
|
||||
# import re
|
||||
# import difflib
|
||||
# import shutil
|
||||
# import time
|
||||
#
|
||||
# os.environ["ISC_USER"] = user_name
|
||||
# os.environ["ISC_PASSWORD"] = user_password
|
||||
#
|
||||
# #####################################
|
||||
# MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = 65
|
||||
# #####################################
|
||||
#
|
||||
# svc = fdb.services.connect(host='localhost', user=user_name, password=user_password)
|
||||
# FB_HOME = svc.get_home_directory()
|
||||
# svc.close()
|
||||
#
|
||||
# engine = db_conn.engine_version # 4.0; 4.1; 5.0 etc -- type float
|
||||
# fb_major = 'fb' + str(engine)[:1] + '0' # 'fb40'; 'fb50'
|
||||
#
|
||||
# cur = db_conn.cursor()
|
||||
# cur.execute("select rdb$config_value from rdb$config where upper(rdb$config_name) = upper('ServerMode')")
|
||||
# server_mode = 'XX'
|
||||
# for r in cur:
|
||||
# if r[0] == 'Super':
|
||||
# server_mode = 'SS'
|
||||
# elif r[0] == 'SuperClassic':
|
||||
# server_mode = 'SC'
|
||||
# elif r[0] == 'Classic':
|
||||
# server_mode = 'CS'
|
||||
# cur.close()
|
||||
#
|
||||
# # 'fbt-main.fb50.ss.fdb' etc:
|
||||
# db_main = os.path.join( context['temp_directory'], 'fbt-main.' + fb_major + '.' + server_mode + '.fdb' )
|
||||
# db_repl = db_main.replace( 'fbt-main.', 'fbt-repl.')
|
||||
#
|
||||
# # Folders for journalling and archieving segments.
|
||||
# repl_journal_dir = os.path.join( context['temp_directory'], 'fb-replication.' + fb_major + '.' + server_mode + '.journal' )
|
||||
# repl_archive_dir = os.path.join( context['temp_directory'], 'fb-replication.' + fb_major + '.' + server_mode + '.archive' )
|
||||
#
|
||||
# db_conn.close()
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# def flush_and_close( file_handle ):
|
||||
# # https://docs.python.org/2/library/os.html#os.fsync
|
||||
# # If you're starting with a Python file object f,
|
||||
# # first do f.flush(), and
|
||||
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
|
||||
# global os
|
||||
#
|
||||
# file_handle.flush()
|
||||
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
|
||||
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
|
||||
# os.fsync(file_handle.fileno())
|
||||
# file_handle.close()
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# def cleanup( f_names_list ):
|
||||
# global os
|
||||
# for i in range(len( f_names_list )):
|
||||
# if type(f_names_list[i]) == file:
|
||||
# del_name = f_names_list[i].name
|
||||
# elif type(f_names_list[i]) == str:
|
||||
# del_name = f_names_list[i]
|
||||
# else:
|
||||
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.')
|
||||
# del_name = None
|
||||
#
|
||||
# if del_name and os.path.isfile( del_name ):
|
||||
# os.remove( del_name )
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# def wait_for_data_in_replica( fb_home, max_allowed_time_for_wait, db_main, prefix_msg = '' ):
|
||||
#
|
||||
# global re
|
||||
# global difflib
|
||||
# global time
|
||||
#
|
||||
# replold_lines = []
|
||||
# with open( os.path.join(fb_home,'replication.log'), 'r') as f:
|
||||
# replold_lines = f.readlines()
|
||||
#
|
||||
# con = fdb.connect( dsn = 'localhost:' + db_main, no_db_triggers = 1)
|
||||
# cur = con.cursor()
|
||||
# cur.execute("select rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') from rdb$database")
|
||||
# for r in cur:
|
||||
# last_generated_repl_segment = r[0]
|
||||
# cur.close()
|
||||
# con.close()
|
||||
#
|
||||
# #print('last_generated_repl_segment:', last_generated_repl_segment)
|
||||
#
|
||||
# # VERBOSE: Segment 1 (2582 bytes) is replicated in 1 second(s), deleting the file
|
||||
# p=re.compile( '\\+\\s+verbose:\\s+segment\\s+%(last_generated_repl_segment)s\\s+\\(\\d+\\s+bytes\\)\\s+is\\s+replicated.*deleting' % locals(), re.IGNORECASE)
|
||||
#
|
||||
# found_required_message = False
|
||||
# for i in range(0,max_allowed_time_for_wait):
|
||||
# time.sleep(1)
|
||||
#
|
||||
# # Get content of fb_home replication.log _after_ isql finish:
|
||||
# f_repllog_new = open( os.path.join(fb_home,'replication.log'), 'r')
|
||||
# diff_data = difflib.unified_diff(
|
||||
# replold_lines,
|
||||
# f_repllog_new.readlines()
|
||||
# )
|
||||
# f_repllog_new.close()
|
||||
#
|
||||
# for k,d in enumerate(diff_data):
|
||||
# if p.search(d):
|
||||
# print( (prefix_msg + ' ' if prefix_msg else '') + 'FOUND message about replicated segment.' )
|
||||
# found_required_message = True
|
||||
# break
|
||||
#
|
||||
# if found_required_message:
|
||||
# break
|
||||
#
|
||||
# if not found_required_message:
|
||||
# print('UNEXPECTED RESULT: no message about replicated segment for %d seconds.' % max_allowed_time_for_wait)
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# sql_ddl = ''' set bail on;
|
||||
# set list on;
|
||||
#
|
||||
# recreate table test(id int primary key, b blob);
|
||||
# insert into test(id, b) values(1, lpad('',65533,gen_uuid()));
|
||||
# commit;
|
||||
#
|
||||
# select rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') as last_generated_repl_segment from rdb$database;
|
||||
# quit;
|
||||
# ''' % locals()
|
||||
#
|
||||
#
|
||||
# f_sql_chk = open( os.path.join(context['temp_directory'],'tmp_gh_6893_test.sql'), 'w')
|
||||
# f_sql_chk.write(sql_ddl)
|
||||
# flush_and_close( f_sql_chk )
|
||||
#
|
||||
# f_sql_log = open( ''.join( (os.path.splitext(f_sql_chk.name)[0], '.log' ) ), 'w')
|
||||
# f_sql_err = open( ''.join( (os.path.splitext(f_sql_chk.name)[0], '.err' ) ), 'w')
|
||||
# subprocess.call( [ context['isql_path'], 'localhost:' + db_main, '-i', f_sql_chk.name ], stdout = f_sql_log, stderr = f_sql_err)
|
||||
# flush_and_close( f_sql_log )
|
||||
# flush_and_close( f_sql_err )
|
||||
#
|
||||
# last_generated_repl_segment = 0
|
||||
#
|
||||
# with open(f_sql_err.name,'r') as f:
|
||||
# for line in f:
|
||||
# print('UNEXPECTED STDERR in initial SQL: ' + line)
|
||||
# MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = 0
|
||||
#
|
||||
# if MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG: # ==> initial SQL script finished w/o errors
|
||||
#
|
||||
# ##############################################################################
|
||||
# ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
|
||||
# ##############################################################################
|
||||
# wait_for_data_in_replica( FB_HOME, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, db_main, 'POINT-1' )
|
||||
#
|
||||
# # runProgram('isql', ['localhost:' + db_repl, '-nod'], "set list on; select count(*) as long_blob_on_replica from test where octet_length(b) > 32768 and id = 1;")
|
||||
# runProgram('isql', ['localhost:' + db_repl, '-nod'], "set list on; select octet_length(t.b) as replicated_blob_octet_len from rdb$database r left join test t on t.id = 1;")
|
||||
#
|
||||
# # return initial state of master DB:
|
||||
# # remove all DB objects (tables, views, ...):
|
||||
# # --------------------------------------------
|
||||
# sql_clean_ddl = os.path.join(context['files_location'],'drop-all-db-objects.sql')
|
||||
#
|
||||
# f_clean_log=open( os.path.join(context['temp_directory'],'tmp_gh_6893_drop-all-db-objects.log'), 'w')
|
||||
# f_clean_err=open( ''.join( ( os.path.splitext(f_clean_log.name)[0], '.err') ), 'w')
|
||||
# subprocess.call( [context['isql_path'], 'localhost:' + db_main, '-q', '-nod', '-i', sql_clean_ddl], stdout=f_clean_log, stderr=f_clean_err )
|
||||
# flush_and_close(f_clean_log)
|
||||
# flush_and_close(f_clean_err)
|
||||
#
|
||||
# with open(f_clean_err.name,'r') as f:
|
||||
# for line in f:
|
||||
# print('UNEXPECTED STDERR in cleanup SQL: ' + line)
|
||||
# MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = 0
|
||||
#
|
||||
# with open(f_clean_log.name,'r') as f:
|
||||
# for line in f:
|
||||
# # show number of dropped objects
|
||||
# print(line)
|
||||
#
|
||||
# if MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG: # ==> initial SQL script finished w/o errors
|
||||
#
|
||||
# ##############################################################################
|
||||
# ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
|
||||
# ##############################################################################
|
||||
# wait_for_data_in_replica( FB_HOME, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, db_main, 'POINT-2' )
|
||||
#
|
||||
# f_main_meta_sql=open( os.path.join(context['temp_directory'],'tmp_gh_6893_db_main_meta.sql'), 'w')
|
||||
# subprocess.call( [context['isql_path'], 'localhost:' + db_main, '-q', '-nod', '-ch', 'utf8', '-x'], stdout=f_main_meta_sql, stderr=subprocess.STDOUT )
|
||||
# flush_and_close( f_main_meta_sql )
|
||||
#
|
||||
# f_repl_meta_sql=open( os.path.join(context['temp_directory'],'tmp_gh_6893_db_repl_meta.sql'), 'w')
|
||||
# subprocess.call( [context['isql_path'], 'localhost:' + db_repl, '-q', '-nod', '-ch', 'utf8', '-x'], stdout=f_repl_meta_sql, stderr=subprocess.STDOUT )
|
||||
# flush_and_close( f_repl_meta_sql )
|
||||
#
|
||||
# db_main_meta=open(f_main_meta_sql.name, 'r')
|
||||
# db_repl_meta=open(f_repl_meta_sql.name, 'r')
|
||||
#
|
||||
# diffmeta = ''.join(difflib.unified_diff(
|
||||
# db_main_meta.readlines(),
|
||||
# db_repl_meta.readlines()
|
||||
# ))
|
||||
# db_main_meta.close()
|
||||
# db_repl_meta.close()
|
||||
#
|
||||
# f_meta_diff=open( os.path.join(context['temp_directory'],'tmp_gh_6893_db_meta_diff.txt'), 'w', buffering = 0)
|
||||
# f_meta_diff.write(diffmeta)
|
||||
# flush_and_close( f_meta_diff )
|
||||
#
|
||||
# # Following must issue only TWO rows:
|
||||
# # UNEXPECTED METADATA DIFF.: -/* CREATE DATABASE 'localhost:[db_main]' ... */
|
||||
# # UNEXPECTED METADATA DIFF.: -/* CREATE DATABASE 'localhost:[db_repl]' ... */
|
||||
# # Only thes lines will be suppressed further (see subst. section):
|
||||
# with open(f_meta_diff.name, 'r') as f:
|
||||
# for line in f:
|
||||
# if line[:1] in ('-', '+') and line[:3] not in ('---','+++'):
|
||||
# print('UNEXPECTED METADATA DIFF.: ' + line)
|
||||
#
|
||||
#
|
||||
# ######################
|
||||
# ### A C H T U N G ###
|
||||
# ######################
|
||||
# # MANDATORY, OTHERWISE REPLICATION GETS STUCK ON SECOND RUN OF THIS TEST
|
||||
# # WITH 'ERROR: Record format with length 68 is not found for table TEST':
|
||||
# runProgram('gfix', ['-sweep', 'localhost:' + db_repl])
|
||||
# runProgram('gfix', ['-sweep', 'localhost:' + db_main])
|
||||
# #######################
|
||||
#
|
||||
# # cleanup:
|
||||
# ##########
|
||||
# cleanup( (f_sql_chk, f_sql_log, f_sql_err,f_clean_log,f_clean_err,f_main_meta_sql,f_repl_meta_sql,f_meta_diff) )
|
||||
#
|
||||
#---
|
||||
def wait_for_data_in_replica( act_db_main: Action, max_allowed_time_for_wait, prefix_msg = '' ):
|
||||
|
||||
replication_log = act_db_main.home_dir / 'replication.log'
|
||||
|
||||
replold_lines = []
|
||||
with open(replication_log, 'r') as f:
|
||||
replold_lines = f.readlines()
|
||||
|
||||
with act_db_main.db.connect(no_db_triggers = True) as con:
|
||||
with con.cursor() as cur:
|
||||
cur.execute("select rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') from rdb$database")
|
||||
last_generated_repl_segment = cur.fetchone()[0]
|
||||
|
||||
# VERBOSE: Segment 1 (2582 bytes) is replicated in 1 second(s), deleting the file
|
||||
# VERBOSE: Segment 2 (200 bytes) is replicated in 82 ms, deleting the file
|
||||
p_successfully_replicated = re.compile( f'\\+\\s+verbose:\\s+segment\\s+{last_generated_repl_segment}\\s+\\(\\d+\\s+bytes\\)\\s+is\\s+replicated.*deleting', re.IGNORECASE)
|
||||
|
||||
# VERBOSE: Segment 16 replication failure at offset 33628
|
||||
p_replication_failure = re.compile('segment\\s+\\d+\\s+replication\\s+failure', re.IGNORECASE)
|
||||
|
||||
found_required_message = False
|
||||
found_replfail_message = False
|
||||
found_common_error_msg = False
|
||||
|
||||
for i in range(0,max_allowed_time_for_wait):
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
# Get content of fb_home replication.log _after_ isql finish:
|
||||
with open(replication_log, 'r') as f:
|
||||
diff_data = unified_diff(
|
||||
replold_lines,
|
||||
f.readlines()
|
||||
)
|
||||
|
||||
for k,d in enumerate(diff_data):
|
||||
if p_successfully_replicated.search(d):
|
||||
# We FOUND phrase "VERBOSE: Segment <last_generated_repl_segment> ... is replicated ..." in the replication log.
|
||||
# This is expected success, break!
|
||||
print( (prefix_msg + ' ' if prefix_msg else '') + f'FOUND message about replicated segment N {last_generated_repl_segment}.' )
|
||||
found_required_message = True
|
||||
break
|
||||
|
||||
if p_replication_failure.search(d):
|
||||
print( (prefix_msg + ' ' if prefix_msg else '') + '@@@ SEGMENT REPLICATION FAILURE @@@ ' + d )
|
||||
found_replfail_message = True
|
||||
break
|
||||
|
||||
if 'ERROR:' in d:
|
||||
print( (prefix_msg + ' ' if prefix_msg else '') + '@@@ REPLICATION ERROR @@@ ' + d )
|
||||
found_common_error_msg = True
|
||||
break
|
||||
|
||||
if found_required_message or found_replfail_message or found_common_error_msg:
|
||||
break
|
||||
|
||||
if not found_required_message:
|
||||
print('UNEXPECTED RESULT: no message about replicating segment N {last_generated_repl_segment} for {max_allowed_time_for_wait} seconds.')
|
||||
|
||||
#--------------------------------------------
|
||||
|
||||
@pytest.mark.version('>=4.0.1')
|
||||
def test_1(act_db_main: Action, act_db_repl: Action, tmp_data: Path, capsys):
|
||||
|
||||
#assert '' == capsys.readouterr().out
|
||||
|
||||
###################
|
||||
somewhat_failed = 1
|
||||
###################
|
||||
|
||||
try:
|
||||
# Generate random binary data and writing to file which will be loaded as stream blob into DB:
|
||||
tmp_data.write_bytes( bytearray(os.urandom(DATA_LEN)) )
|
||||
|
||||
with act_db_main.db.connect() as con:
|
||||
con.execute_immediate('recreate table test(id int primary key, b blob)')
|
||||
con.commit()
|
||||
with con.cursor() as cur:
|
||||
with open(tmp_data, 'rb') as blob_file:
|
||||
# [doc]: crypt_hash() returns VARCHAR strings with OCTETS charset with length depended on algorithm.
|
||||
# ### ACHTUNG ### ISQL will convert this octets to HEX-form, e.g.:
|
||||
# select cast('AF' as varchar(2) charset octets) from rdb$database --> '4146' // i.e. bytes order = BIG-endian.
|
||||
# firebird-driver does NOT do such transformation, and output for this example will be: b'AF'.
|
||||
# We have to:
|
||||
# 1. Convert this string to integer using 'big' for bytesOrder (despite that default value most likely = 'little'!)
|
||||
# 2. Convert this (decimal!) integer to hex and remove "0x" prefix from it. This can be done using format() function.
|
||||
# 3. Apply upper() to this string and pad it with zeroes to len=128 (because such padding is always done by ISQL).
|
||||
# Resulting value <inserted_blob_hash> - will be further queried from REPLICA database, using ISQL.
|
||||
# It must be equal to <inserted_blob_hash> that we evaluate here:
|
||||
cur.execute("insert into test(id, b) values(1, ?) returning crypt_hash(b using sha512)", (blob_file,) )
|
||||
inserted_blob_hash = format(int.from_bytes(cur.fetchone()[0], 'big'), 'x').upper().rjust(128, '0')
|
||||
con.commit()
|
||||
|
||||
assert '' == capsys.readouterr().out
|
||||
|
||||
act_db_main.expected_stdout = 'POINT-1 FOUND message about replicated segment N 1.'
|
||||
##############################################################################
|
||||
### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
|
||||
##############################################################################
|
||||
wait_for_data_in_replica( act_db_main, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, 'POINT-1' )
|
||||
|
||||
act_db_main.stdout = capsys.readouterr().out
|
||||
assert act_db_main.clean_stdout == act_db_main.clean_expected_stdout
|
||||
act_db_main.reset()
|
||||
|
||||
#---------------------------------------------------
|
||||
|
||||
sql_chk = '''
|
||||
set list on;
|
||||
set blob all;
|
||||
set count on;
|
||||
select
|
||||
rdb$get_context('SYSTEM','REPLICA_MODE') replica_mode
|
||||
,crypt_hash(b using sha512) as blob_hash -- this will be zeroes-padded by ISQL to len=128
|
||||
from test;
|
||||
'''
|
||||
for a in (act_db_main,act_db_repl):
|
||||
db_repl_mode = '<null>' if a == act_db_main else 'READ-ONLY'
|
||||
a.expected_stdout = f"""
|
||||
REPLICA_MODE {db_repl_mode}
|
||||
BLOB_HASH {inserted_blob_hash}
|
||||
Records affected: 1
|
||||
"""
|
||||
a.isql(switches=['-q'], input = sql_chk)
|
||||
assert a.clean_stdout == a.clean_expected_stdout
|
||||
a.reset()
|
||||
|
||||
#-------------------------------------------------------
|
||||
|
||||
# return initial state of master DB:
|
||||
# remove all DB objects (tables, views, ...):
|
||||
#
|
||||
db_main_meta, db_repl_meta = '', ''
|
||||
for a in (act_db_main,act_db_repl):
|
||||
if a == act_db_main:
|
||||
sql_clean = (a.files_dir / 'drop-all-db-objects.sql').read_text()
|
||||
a.expected_stdout = """
|
||||
Start removing objects
|
||||
Finish. Total objects removed
|
||||
"""
|
||||
a.isql(switches=['-q', '-nod'], input = sql_clean, combine_output = True)
|
||||
assert a.clean_stdout == a.clean_expected_stdout
|
||||
a.reset()
|
||||
|
||||
|
||||
a.expected_stdout = 'POINT-2 FOUND message about replicated segment'
|
||||
##############################################################################
|
||||
### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
|
||||
##############################################################################
|
||||
wait_for_data_in_replica( a, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, 'POINT-2' )
|
||||
|
||||
a.stdout = capsys.readouterr().out
|
||||
assert a.clean_stdout == a.clean_expected_stdout
|
||||
a.reset()
|
||||
|
||||
db_main_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
|
||||
else:
|
||||
db_repl_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
|
||||
|
||||
######################
|
||||
### A C H T U N G ###
|
||||
######################
|
||||
# MANDATORY, OTHERWISE REPLICATION GETS STUCK ON SECOND RUN OF THIS TEST
|
||||
# WITH 'ERROR: Record format with length NN is not found for table TEST':
|
||||
a.gfix(switches=['-sweep', a.db.dsn])
|
||||
|
||||
|
||||
|
||||
# Final point: metadata must become equal:
|
||||
#
|
||||
diff_meta = ''.join(unified_diff( \
|
||||
[x for x in db_main_meta.splitlines() if 'CREATE DATABASE' not in x],
|
||||
[x for x in db_repl_meta.splitlines() if 'CREATE DATABASE' not in x])
|
||||
)
|
||||
assert diff_meta == ''
|
||||
|
||||
###################
|
||||
somewhat_failed = 0
|
||||
###################
|
||||
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
|
||||
if somewhat_failed:
|
||||
# If any previous assert failed, we have to RECREATE both master and slave databases.
|
||||
# Otherwise further execution of this test or other replication-related tests most likely will fail.
|
||||
for a in (act_db_main,act_db_repl):
|
||||
d = a.db.db_path
|
||||
a.db.drop()
|
||||
dbx = create_database(str(d), user = a.db.user)
|
||||
dbx.close()
|
||||
with a.connect_server() as srv:
|
||||
srv.database.set_write_mode(database = d, mode=DbWriteMode.ASYNC)
|
||||
srv.database.set_sweep_interval(database = d, interval = 0)
|
||||
if a == act_db_repl:
|
||||
srv.database.set_replica_mode(database = d, mode = ReplicaMode.READ_ONLY)
|
||||
else:
|
||||
with a.db.connect() as con:
|
||||
# !! IT IS ASSUMED THAT REPLICATION FOLDERS ARE IN THE SAME DIR AS <DB_MAIN> !!
|
||||
# DO NOT use 'a.db.db_path' for ALIASED database!
|
||||
# It will return '.' rather than full path+filename.
|
||||
# Use only con.info.name for that:
|
||||
repl_root_path = Path(con.info.name).parent
|
||||
repl_jrn_sub_dir = repl_settings['journal_sub_dir']
|
||||
repl_arc_sub_dir = repl_settings['archive_sub_dir']
|
||||
|
||||
# Clean folders repl_journal and repl_archive (remove all files from there):
|
||||
for p in (repl_jrn_sub_dir,repl_arc_sub_dir):
|
||||
assert cleanup_folder(repl_root_path / p) == 0, f"Directory {str(p)} remains non-empty."
|
||||
|
||||
con.execute_immediate('alter database enable publication')
|
||||
con.execute_immediate('alter database include all to publication')
|
||||
con.commit()
|
||||
|
||||
assert somewhat_failed == 0
|
||||
|
Loading…
Reference in New Issue
Block a user