diff --git a/tests/functional/replication/test_blob_segments_longer_32kb_are_not_replicated.py b/tests/functional/replication/test_blob_segments_longer_32kb_are_not_replicated.py index 935cc713..f4e7e631 100644 --- a/tests/functional/replication/test_blob_segments_longer_32kb_are_not_replicated.py +++ b/tests/functional/replication/test_blob_segments_longer_32kb_are_not_replicated.py @@ -5,336 +5,304 @@ ID: replication.blob_segments_longer_32kb_are_not_replicated ISSUE: 6893 TITLE: Problem with replication of BLOB segments longer than 32KB DESCRIPTION: - Test creates table with blob column and performs trivial scenario: - insert into test(id, b) values( 1, ); + Test creates table with blob column and loads binary data into this table (using stream API). + We store crypt_hash() of this blob in the variable that will be further used for check correctness + of replication results (i.e. when blob data will be completely transferred to db_repl). - After this we do connect and query ID of last generated segment by querying REPLICATION_SEQUENCE variable - from SYSTEM context namespace. - - Then we wait until replica becomes actual to master, and this delay will last no more then threshold + After this we wait until replica becomes actual to master, and this delay will last no more then threshold that is defined by MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG variable (measured in seconds). During this delay, we check every second for replication log and search there line with number of last generated segment (which was replicated and deleting finally). We can assume that replication finished OK only when such line is found see ('POINT-1'). + Then we check that both databases have same data in the user table 'test'. - Then we invoke ISQL with executing auxiliary script for drop all DB objects on master (with '-nod' command switch). + Further, we invoke ISQL with executing auxiliary script for drop all DB objects on master (with '-nod' command switch). After all objects will be dropped, we have to wait again until replica becomes actual with master (see 'POINT-2'). Finally, we extract metadata for master and replica and compare them (see 'f_meta_diff'). The only difference in metadata must be 'CREATE DATABASE' statement with different DB names - we suppress it, thus metadata difference must not be issued. - ################ - ### N O T E ### - ################ - Test assumes that master and replica DB have been created beforehand. - Also, it assumes that %FB_HOME%/replication.conf has been prepared with apropriate parameters for replication. - Particularly, name of directories and databases must have info about checked FB major version and ServerMode. - * verbose = true // in order to find out line with message that required segment was replicated - * section for master database with specified parameters: - journal_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.journal" - journal_archive_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.archive" - journal_archive_command = "copy $(pathname) $(archivepathname)" - journal_archive_timeout = 10 - * section for replica database with specified parameter: - journal_source_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.archive" - - Master and replica databases must be created in "%FBT_REPO% mp" directory and have names like these: - 'fbt-main.fb40.SS.fdb'; 'fbt-repl.fb40.SS.fdb'; - for FB 4.x ('SS' = Super; 'CS' = Classic) - 'fbt-main.fb50.SS.fdb'; 'fbt-repl.fb50.SS.fdb'; - for FB 5.x ('SS' = Super; 'CS' = Classic) - NB: fixed numeric value ('40' or '50') must be used for any minor FB version (4.0; 4.0.1; 4.1; 5.0; 5.1 etc) - - These two databases must NOT be dropped in any of tests related to replication! - They are created and dropped in the batch scenario which prepares FB instance to be checked for each ServerMode - and make cleanup after it, i.e. when all tests will be completed. - - NB. Currently this task presents only in Windows batch, thus test has attribute platform = 'Windows'. - - Temporary comment. For debug purpoces: - 1) find out SUFFIX of the name of FB service which is to be tested (e.g. 'DefaultInstance', '40SS' etc); - 2) copy file %fbt-repo%/tests/functional/tabloid/batches/setup-fb-for-replication.bat.txt - to some place and rename it "*.bat"; - 3) open this .bat in editor and asjust value of 'fbt_repo' variable; - 4) run: setup-fb-for-replication.bat [SUFFIX_OF_FB_SERVICE] - where SUFFIX_OF_FB_SERVICE is ending part of FB service which you want to check: - DefaultInstance ; 40ss ; 40cs ; 50ss ; 50cs etc - 5) batch 'setup-fb-for-replication.bat' will: - * stop selected FB instance - * create test databases (in !fbt_repo!/tmp); - * prepare journal/archive sub-folders for replication (also in !fbt_repo! mp\\); - * replace %fb_home%/replication.conf with apropriate - * start selected FB instance - 6) run this test (FB instance will be already launched by setup-fb-for-replication.bat): - %fpt_repo%/fbt-run2.bat dblevel-triggers-must-not-fire-on-replica.fbt 50ss, etc - Confirmed bug on 5.0.0.88, 4.0.1.2523: record appears on replica but blob will be NULL. Checked on: 5.0.0.120, 4.0.1.2547 -- all OK. FBTEST: functional.replication.blob_segments_longer_32kb_are_not_replicated +NOTES: + [23.08.2022] pzotov + 1. In case of any errors (somewhat_failed <> 0) test will re-create db_main and db_repl, and then perform all needed + actions to resume replication (set 'replica' flag on db_repl, enabling publishing in db_main, remove all files + from subdirectories and which must present in the same folder as ). + 2. Warning raises on Windows and Linux: + ../../../usr/local/lib/python3.9/site-packages/_pytest/config/__init__.py:1126 + /usr/local/lib/python3.9/site-packages/_pytest/config/__init__.py:1126: + PytestAssertRewriteWarning: Module already imported so cannot be rewritten: __editable___firebird_qa_0_17_0_finder + self._mark_plugins_for_rewrite(hook) + The reason currently is unknown. + + Checked on 5.0.0.623, 4.0.1.2692 - both CS and SS. Both on Windows and Linux. """ +import os +import shutil +import re +from difflib import unified_diff +from pathlib import Path +import time + import pytest from firebird.qa import * +from firebird.driver import connect, create_database, DbWriteMode, ReplicaMode + +# QA_GLOBALS -- dict, is defined in qa/plugin.py, obtain settings +# from act.files_dir/'test_config.ini': +repl_settings = QA_GLOBALS['replication'] + +MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = int(repl_settings['max_time_for_wait_segment_in_log']) +MAIN_DB_ALIAS = repl_settings['main_db_alias'] +REPL_DB_ALIAS = repl_settings['repl_db_alias'] + +db_main = db_factory( filename = '#' + MAIN_DB_ALIAS, do_not_create = True, do_not_drop = True) +db_repl = db_factory( filename = '#' + REPL_DB_ALIAS, do_not_create = True, do_not_drop = True) substitutions = [('Start removing objects in:.*', 'Start removing objects'), ('Finish. Total objects removed: [1-9]\\d*', 'Finish. Total objects removed'), - ('.* CREATE DATABASE .*', '')] + ('.* CREATE DATABASE .*', ''), + ('FOUND message about replicated segment N .*', 'FOUND message about replicated segment')] +act_db_main = python_act('db_main', substitutions=substitutions) +act_db_repl = python_act('db_repl', substitutions=substitutions) -db = db_factory() +# Length of generated blob: +DATA_LEN = 65 * 1024 * 1024 +tmp_data = temp_file(filename = 'tmp_blob_for_replication.dat') -act = python_act('db', substitutions=substitutions) +#-------------------------------------------- -expected_stdout = """ - POINT-1 FOUND message about replicated segment. - REPLICATED_BLOB_OCTET_LEN 65533 - Start removing objects - Finish. Total objects removed - POINT-2 FOUND message about replicated segment. -""" +def cleanup_folder(p): + # Removed all files and subdirs in the folder

+ # Used for cleanup and when replication must be reset + # in case when any error occurred during test execution. + assert os.path.dirname(p) != p, f"@@@ ABEND @@@ CAN NOT operate in the file system root directory. Check your code!" + for root, dirs, files in os.walk(p): + for f in files: + os.unlink(os.path.join(root, f)) + for d in dirs: + shutil.rmtree(os.path.join(root, d)) + return len(os.listdir(p)) -@pytest.mark.skip('FIXME: Not IMPLEMENTED') -@pytest.mark.version('>=4.0') -@pytest.mark.platform('Windows') -def test_1(act: Action): - pytest.fail("Not IMPLEMENTED") +#-------------------------------------------- -# test_script_1 -#--- -# -# import os -# import subprocess -# import re -# import difflib -# import shutil -# import time -# -# os.environ["ISC_USER"] = user_name -# os.environ["ISC_PASSWORD"] = user_password -# -# ##################################### -# MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = 65 -# ##################################### -# -# svc = fdb.services.connect(host='localhost', user=user_name, password=user_password) -# FB_HOME = svc.get_home_directory() -# svc.close() -# -# engine = db_conn.engine_version # 4.0; 4.1; 5.0 etc -- type float -# fb_major = 'fb' + str(engine)[:1] + '0' # 'fb40'; 'fb50' -# -# cur = db_conn.cursor() -# cur.execute("select rdb$config_value from rdb$config where upper(rdb$config_name) = upper('ServerMode')") -# server_mode = 'XX' -# for r in cur: -# if r[0] == 'Super': -# server_mode = 'SS' -# elif r[0] == 'SuperClassic': -# server_mode = 'SC' -# elif r[0] == 'Classic': -# server_mode = 'CS' -# cur.close() -# -# # 'fbt-main.fb50.ss.fdb' etc: -# db_main = os.path.join( context['temp_directory'], 'fbt-main.' + fb_major + '.' + server_mode + '.fdb' ) -# db_repl = db_main.replace( 'fbt-main.', 'fbt-repl.') -# -# # Folders for journalling and archieving segments. -# repl_journal_dir = os.path.join( context['temp_directory'], 'fb-replication.' + fb_major + '.' + server_mode + '.journal' ) -# repl_archive_dir = os.path.join( context['temp_directory'], 'fb-replication.' + fb_major + '.' + server_mode + '.archive' ) -# -# db_conn.close() -# -# #-------------------------------------------- -# -# def flush_and_close( file_handle ): -# # https://docs.python.org/2/library/os.html#os.fsync -# # If you're starting with a Python file object f, -# # first do f.flush(), and -# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk. -# global os -# -# file_handle.flush() -# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull: -# # otherwise: "OSError: [Errno 9] Bad file descriptor"! -# os.fsync(file_handle.fileno()) -# file_handle.close() -# -# #-------------------------------------------- -# -# def cleanup( f_names_list ): -# global os -# for i in range(len( f_names_list )): -# if type(f_names_list[i]) == file: -# del_name = f_names_list[i].name -# elif type(f_names_list[i]) == str: -# del_name = f_names_list[i] -# else: -# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.') -# del_name = None -# -# if del_name and os.path.isfile( del_name ): -# os.remove( del_name ) -# -# #-------------------------------------------- -# -# def wait_for_data_in_replica( fb_home, max_allowed_time_for_wait, db_main, prefix_msg = '' ): -# -# global re -# global difflib -# global time -# -# replold_lines = [] -# with open( os.path.join(fb_home,'replication.log'), 'r') as f: -# replold_lines = f.readlines() -# -# con = fdb.connect( dsn = 'localhost:' + db_main, no_db_triggers = 1) -# cur = con.cursor() -# cur.execute("select rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') from rdb$database") -# for r in cur: -# last_generated_repl_segment = r[0] -# cur.close() -# con.close() -# -# #print('last_generated_repl_segment:', last_generated_repl_segment) -# -# # VERBOSE: Segment 1 (2582 bytes) is replicated in 1 second(s), deleting the file -# p=re.compile( '\\+\\s+verbose:\\s+segment\\s+%(last_generated_repl_segment)s\\s+\\(\\d+\\s+bytes\\)\\s+is\\s+replicated.*deleting' % locals(), re.IGNORECASE) -# -# found_required_message = False -# for i in range(0,max_allowed_time_for_wait): -# time.sleep(1) -# -# # Get content of fb_home replication.log _after_ isql finish: -# f_repllog_new = open( os.path.join(fb_home,'replication.log'), 'r') -# diff_data = difflib.unified_diff( -# replold_lines, -# f_repllog_new.readlines() -# ) -# f_repllog_new.close() -# -# for k,d in enumerate(diff_data): -# if p.search(d): -# print( (prefix_msg + ' ' if prefix_msg else '') + 'FOUND message about replicated segment.' ) -# found_required_message = True -# break -# -# if found_required_message: -# break -# -# if not found_required_message: -# print('UNEXPECTED RESULT: no message about replicated segment for %d seconds.' % max_allowed_time_for_wait) -# -# #-------------------------------------------- -# -# sql_ddl = ''' set bail on; -# set list on; -# -# recreate table test(id int primary key, b blob); -# insert into test(id, b) values(1, lpad('',65533,gen_uuid())); -# commit; -# -# select rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') as last_generated_repl_segment from rdb$database; -# quit; -# ''' % locals() -# -# -# f_sql_chk = open( os.path.join(context['temp_directory'],'tmp_gh_6893_test.sql'), 'w') -# f_sql_chk.write(sql_ddl) -# flush_and_close( f_sql_chk ) -# -# f_sql_log = open( ''.join( (os.path.splitext(f_sql_chk.name)[0], '.log' ) ), 'w') -# f_sql_err = open( ''.join( (os.path.splitext(f_sql_chk.name)[0], '.err' ) ), 'w') -# subprocess.call( [ context['isql_path'], 'localhost:' + db_main, '-i', f_sql_chk.name ], stdout = f_sql_log, stderr = f_sql_err) -# flush_and_close( f_sql_log ) -# flush_and_close( f_sql_err ) -# -# last_generated_repl_segment = 0 -# -# with open(f_sql_err.name,'r') as f: -# for line in f: -# print('UNEXPECTED STDERR in initial SQL: ' + line) -# MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = 0 -# -# if MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG: # ==> initial SQL script finished w/o errors -# -# ############################################################################## -# ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ### -# ############################################################################## -# wait_for_data_in_replica( FB_HOME, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, db_main, 'POINT-1' ) -# -# # runProgram('isql', ['localhost:' + db_repl, '-nod'], "set list on; select count(*) as long_blob_on_replica from test where octet_length(b) > 32768 and id = 1;") -# runProgram('isql', ['localhost:' + db_repl, '-nod'], "set list on; select octet_length(t.b) as replicated_blob_octet_len from rdb$database r left join test t on t.id = 1;") -# -# # return initial state of master DB: -# # remove all DB objects (tables, views, ...): -# # -------------------------------------------- -# sql_clean_ddl = os.path.join(context['files_location'],'drop-all-db-objects.sql') -# -# f_clean_log=open( os.path.join(context['temp_directory'],'tmp_gh_6893_drop-all-db-objects.log'), 'w') -# f_clean_err=open( ''.join( ( os.path.splitext(f_clean_log.name)[0], '.err') ), 'w') -# subprocess.call( [context['isql_path'], 'localhost:' + db_main, '-q', '-nod', '-i', sql_clean_ddl], stdout=f_clean_log, stderr=f_clean_err ) -# flush_and_close(f_clean_log) -# flush_and_close(f_clean_err) -# -# with open(f_clean_err.name,'r') as f: -# for line in f: -# print('UNEXPECTED STDERR in cleanup SQL: ' + line) -# MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = 0 -# -# with open(f_clean_log.name,'r') as f: -# for line in f: -# # show number of dropped objects -# print(line) -# -# if MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG: # ==> initial SQL script finished w/o errors -# -# ############################################################################## -# ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ### -# ############################################################################## -# wait_for_data_in_replica( FB_HOME, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, db_main, 'POINT-2' ) -# -# f_main_meta_sql=open( os.path.join(context['temp_directory'],'tmp_gh_6893_db_main_meta.sql'), 'w') -# subprocess.call( [context['isql_path'], 'localhost:' + db_main, '-q', '-nod', '-ch', 'utf8', '-x'], stdout=f_main_meta_sql, stderr=subprocess.STDOUT ) -# flush_and_close( f_main_meta_sql ) -# -# f_repl_meta_sql=open( os.path.join(context['temp_directory'],'tmp_gh_6893_db_repl_meta.sql'), 'w') -# subprocess.call( [context['isql_path'], 'localhost:' + db_repl, '-q', '-nod', '-ch', 'utf8', '-x'], stdout=f_repl_meta_sql, stderr=subprocess.STDOUT ) -# flush_and_close( f_repl_meta_sql ) -# -# db_main_meta=open(f_main_meta_sql.name, 'r') -# db_repl_meta=open(f_repl_meta_sql.name, 'r') -# -# diffmeta = ''.join(difflib.unified_diff( -# db_main_meta.readlines(), -# db_repl_meta.readlines() -# )) -# db_main_meta.close() -# db_repl_meta.close() -# -# f_meta_diff=open( os.path.join(context['temp_directory'],'tmp_gh_6893_db_meta_diff.txt'), 'w', buffering = 0) -# f_meta_diff.write(diffmeta) -# flush_and_close( f_meta_diff ) -# -# # Following must issue only TWO rows: -# # UNEXPECTED METADATA DIFF.: -/* CREATE DATABASE 'localhost:[db_main]' ... */ -# # UNEXPECTED METADATA DIFF.: -/* CREATE DATABASE 'localhost:[db_repl]' ... */ -# # Only thes lines will be suppressed further (see subst. section): -# with open(f_meta_diff.name, 'r') as f: -# for line in f: -# if line[:1] in ('-', '+') and line[:3] not in ('---','+++'): -# print('UNEXPECTED METADATA DIFF.: ' + line) -# -# -# ###################### -# ### A C H T U N G ### -# ###################### -# # MANDATORY, OTHERWISE REPLICATION GETS STUCK ON SECOND RUN OF THIS TEST -# # WITH 'ERROR: Record format with length 68 is not found for table TEST': -# runProgram('gfix', ['-sweep', 'localhost:' + db_repl]) -# runProgram('gfix', ['-sweep', 'localhost:' + db_main]) -# ####################### -# -# # cleanup: -# ########## -# cleanup( (f_sql_chk, f_sql_log, f_sql_err,f_clean_log,f_clean_err,f_main_meta_sql,f_repl_meta_sql,f_meta_diff) ) -# -#--- +def wait_for_data_in_replica( act_db_main: Action, max_allowed_time_for_wait, prefix_msg = '' ): + + replication_log = act_db_main.home_dir / 'replication.log' + + replold_lines = [] + with open(replication_log, 'r') as f: + replold_lines = f.readlines() + + with act_db_main.db.connect(no_db_triggers = True) as con: + with con.cursor() as cur: + cur.execute("select rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') from rdb$database") + last_generated_repl_segment = cur.fetchone()[0] + + # VERBOSE: Segment 1 (2582 bytes) is replicated in 1 second(s), deleting the file + # VERBOSE: Segment 2 (200 bytes) is replicated in 82 ms, deleting the file + p_successfully_replicated = re.compile( f'\\+\\s+verbose:\\s+segment\\s+{last_generated_repl_segment}\\s+\\(\\d+\\s+bytes\\)\\s+is\\s+replicated.*deleting', re.IGNORECASE) + + # VERBOSE: Segment 16 replication failure at offset 33628 + p_replication_failure = re.compile('segment\\s+\\d+\\s+replication\\s+failure', re.IGNORECASE) + + found_required_message = False + found_replfail_message = False + found_common_error_msg = False + + for i in range(0,max_allowed_time_for_wait): + + time.sleep(1) + + # Get content of fb_home replication.log _after_ isql finish: + with open(replication_log, 'r') as f: + diff_data = unified_diff( + replold_lines, + f.readlines() + ) + + for k,d in enumerate(diff_data): + if p_successfully_replicated.search(d): + # We FOUND phrase "VERBOSE: Segment ... is replicated ..." in the replication log. + # This is expected success, break! + print( (prefix_msg + ' ' if prefix_msg else '') + f'FOUND message about replicated segment N {last_generated_repl_segment}.' ) + found_required_message = True + break + + if p_replication_failure.search(d): + print( (prefix_msg + ' ' if prefix_msg else '') + '@@@ SEGMENT REPLICATION FAILURE @@@ ' + d ) + found_replfail_message = True + break + + if 'ERROR:' in d: + print( (prefix_msg + ' ' if prefix_msg else '') + '@@@ REPLICATION ERROR @@@ ' + d ) + found_common_error_msg = True + break + + if found_required_message or found_replfail_message or found_common_error_msg: + break + + if not found_required_message: + print('UNEXPECTED RESULT: no message about replicating segment N {last_generated_repl_segment} for {max_allowed_time_for_wait} seconds.') + +#-------------------------------------------- + +@pytest.mark.version('>=4.0.1') +def test_1(act_db_main: Action, act_db_repl: Action, tmp_data: Path, capsys): + + #assert '' == capsys.readouterr().out + + ################### + somewhat_failed = 1 + ################### + + try: + # Generate random binary data and writing to file which will be loaded as stream blob into DB: + tmp_data.write_bytes( bytearray(os.urandom(DATA_LEN)) ) + + with act_db_main.db.connect() as con: + con.execute_immediate('recreate table test(id int primary key, b blob)') + con.commit() + with con.cursor() as cur: + with open(tmp_data, 'rb') as blob_file: + # [doc]: crypt_hash() returns VARCHAR strings with OCTETS charset with length depended on algorithm. + # ### ACHTUNG ### ISQL will convert this octets to HEX-form, e.g.: + # select cast('AF' as varchar(2) charset octets) from rdb$database --> '4146' // i.e. bytes order = BIG-endian. + # firebird-driver does NOT do such transformation, and output for this example will be: b'AF'. + # We have to: + # 1. Convert this string to integer using 'big' for bytesOrder (despite that default value most likely = 'little'!) + # 2. Convert this (decimal!) integer to hex and remove "0x" prefix from it. This can be done using format() function. + # 3. Apply upper() to this string and pad it with zeroes to len=128 (because such padding is always done by ISQL). + # Resulting value - will be further queried from REPLICA database, using ISQL. + # It must be equal to that we evaluate here: + cur.execute("insert into test(id, b) values(1, ?) returning crypt_hash(b using sha512)", (blob_file,) ) + inserted_blob_hash = format(int.from_bytes(cur.fetchone()[0], 'big'), 'x').upper().rjust(128, '0') + con.commit() + + assert '' == capsys.readouterr().out + + act_db_main.expected_stdout = 'POINT-1 FOUND message about replicated segment N 1.' + ############################################################################## + ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ### + ############################################################################## + wait_for_data_in_replica( act_db_main, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, 'POINT-1' ) + + act_db_main.stdout = capsys.readouterr().out + assert act_db_main.clean_stdout == act_db_main.clean_expected_stdout + act_db_main.reset() + + #--------------------------------------------------- + + sql_chk = ''' + set list on; + set blob all; + set count on; + select + rdb$get_context('SYSTEM','REPLICA_MODE') replica_mode + ,crypt_hash(b using sha512) as blob_hash -- this will be zeroes-padded by ISQL to len=128 + from test; + ''' + for a in (act_db_main,act_db_repl): + db_repl_mode = '' if a == act_db_main else 'READ-ONLY' + a.expected_stdout = f""" + REPLICA_MODE {db_repl_mode} + BLOB_HASH {inserted_blob_hash} + Records affected: 1 + """ + a.isql(switches=['-q'], input = sql_chk) + assert a.clean_stdout == a.clean_expected_stdout + a.reset() + + #------------------------------------------------------- + + # return initial state of master DB: + # remove all DB objects (tables, views, ...): + # + db_main_meta, db_repl_meta = '', '' + for a in (act_db_main,act_db_repl): + if a == act_db_main: + sql_clean = (a.files_dir / 'drop-all-db-objects.sql').read_text() + a.expected_stdout = """ + Start removing objects + Finish. Total objects removed + """ + a.isql(switches=['-q', '-nod'], input = sql_clean, combine_output = True) + assert a.clean_stdout == a.clean_expected_stdout + a.reset() + + + a.expected_stdout = 'POINT-2 FOUND message about replicated segment' + ############################################################################## + ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ### + ############################################################################## + wait_for_data_in_replica( a, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, 'POINT-2' ) + + a.stdout = capsys.readouterr().out + assert a.clean_stdout == a.clean_expected_stdout + a.reset() + + db_main_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8') + else: + db_repl_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8') + + ###################### + ### A C H T U N G ### + ###################### + # MANDATORY, OTHERWISE REPLICATION GETS STUCK ON SECOND RUN OF THIS TEST + # WITH 'ERROR: Record format with length NN is not found for table TEST': + a.gfix(switches=['-sweep', a.db.dsn]) + + + + # Final point: metadata must become equal: + # + diff_meta = ''.join(unified_diff( \ + [x for x in db_main_meta.splitlines() if 'CREATE DATABASE' not in x], + [x for x in db_repl_meta.splitlines() if 'CREATE DATABASE' not in x]) + ) + assert diff_meta == '' + + ################### + somewhat_failed = 0 + ################### + + except Exception as e: + pass + + + if somewhat_failed: + # If any previous assert failed, we have to RECREATE both master and slave databases. + # Otherwise further execution of this test or other replication-related tests most likely will fail. + for a in (act_db_main,act_db_repl): + d = a.db.db_path + a.db.drop() + dbx = create_database(str(d), user = a.db.user) + dbx.close() + with a.connect_server() as srv: + srv.database.set_write_mode(database = d, mode=DbWriteMode.ASYNC) + srv.database.set_sweep_interval(database = d, interval = 0) + if a == act_db_repl: + srv.database.set_replica_mode(database = d, mode = ReplicaMode.READ_ONLY) + else: + with a.db.connect() as con: + # !! IT IS ASSUMED THAT REPLICATION FOLDERS ARE IN THE SAME DIR AS !! + # DO NOT use 'a.db.db_path' for ALIASED database! + # It will return '.' rather than full path+filename. + # Use only con.info.name for that: + repl_root_path = Path(con.info.name).parent + repl_jrn_sub_dir = repl_settings['journal_sub_dir'] + repl_arc_sub_dir = repl_settings['archive_sub_dir'] + + # Clean folders repl_journal and repl_archive (remove all files from there): + for p in (repl_jrn_sub_dir,repl_arc_sub_dir): + assert cleanup_folder(repl_root_path / p) == 0, f"Directory {str(p)} remains non-empty." + + con.execute_immediate('alter database enable publication') + con.execute_immediate('alter database include all to publication') + con.commit() + + assert somewhat_failed == 0