6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 13:33:07 +01:00

Added/Updated functional\replication\test_shutdown_during_applying_segments_leads_to_crash.py. Checked on 5.0.0.623, 4.0.1.2692 - both on Windows and Linux. On linux test atually silently FAILS, without showing any error. Will be investigated later.

This commit is contained in:
zotov 2022-08-27 20:19:40 +03:00
parent 13531d503f
commit fee7925d47

View File

@ -8,486 +8,453 @@ DESCRIPTION:
Bug initially was found during heavy test of replication performed by OLTP-EMUL, for FB 4.x Bug initially was found during heavy test of replication performed by OLTP-EMUL, for FB 4.x
(see letters to dimitr 13.09.2021; reply from dimitr, 18.09.2021 08:42 - all in mailbox: pz at ibase.ru). (see letters to dimitr 13.09.2021; reply from dimitr, 18.09.2021 08:42 - all in mailbox: pz at ibase.ru).
It *can* be reproduced without heavy/concurrent workload, but we have to operate with data that are written It *can* be reproduced without heavy/concurrent workload, but one need to make data be written into replica
into database 'slowly'. Such data can be wide INDEXED column which has GUID-based values. database 'slowly'. First of all, we have to set/change (temporary) Forced Writes = _ON_ for replica DB.
Further, on master DB we can create table with wide INDEXED column and insert GUID-based values there.
On master we have FW = OFF thus data will be inserted quickly, but on applying of segments to replica DB
will be extremely slow (more than 1..2 minutes).
Test creates a table with 'wide' indexed field and adds data to it. At this point we save current timestamp and start to check replicationb.log for appearance of phrase
Then we save current timestamp (with accuracy up to SECONDS, i.e. cut off milli- or microseconds) to variable. 'Added <N> segment(s) to the processing queue' - with requirement that this phrase has timestamp newer
After this we start check replicationb.log for appearance of phrase 'Added <N> segment(s) to the processing queue'. than just saved one (essentually, we are looking for LAST such phrase).
After founding each such phrase we skip two lines above and parse timestamp when this occurred.
If timestamp in log less than saved timestamp of our DML action then we go on to the next such phrase.
Otherwise we can assume that replication BEGINS to apply just generated segment.
See function wait_for_add_queue_in_replica() which does this parsing of replication.log. See function wait_for_add_queue_in_replica() which does this parsing of replication.log.
Because we operate with table which have very 'wide' index and, moreover, data in this index are GUID-generated Since that point control is returned from func wait_for_add_queue_in_replica() to main code.
text strings, we can safely assume that applying of segment will take at least 5...10 seconds (actually this We can be sure that replicating of segments will be performed very slow. In order to check ticket issue,
can take done for 30...35 seconds). we can now change replica DB to shutdown - and FB must not chash at this point.
During this time we change replica mode to full shutdown and (immediately after that) return to online. We also have to return FW state to 'OFF', so before shutting down this DB we change it FW attribute
NO message like 'error reading / writing from/to connection' must appear at this step. (see 'srv.database.set_write_mode(...)').
NOTE: changing FW attribute causes error on 5.0.0.215 with messages:
335544344 : I/O error during "WriteFile" operation for file "<db_repl_file.fdb>"
335544737 : Error while trying to write to file
- so we have to enclose it into try/except block (otherwise we will not see crash because of test terminating).
After this, we have to wait for replica finish applying segment and when this occur we drop the table. When we further change DB replica state to full shutdown, FB 5.0.0.215 crashes (checked both of SS and CS).
After returning DB to online ('srv.database.bring_online'), we will wait for all segments will be applied.
If this occurs then we can conclude that test passes.
Finally, we extract metadata for master and replica and compare them (see 'f_meta_diff'). Finally, we extract metadata for master and replica and compare them (see 'f_meta_diff').
The only difference in metadata must be 'CREATE DATABASE' statement with different DB names - we suppress it, The only difference in metadata must be 'CREATE DATABASE' statement with different DB names - we suppress it,
thus metadata difference must not be issued. thus metadata difference must not be issued.
################
### N O T E ###
################
Test assumes that master and replica DB have been created beforehand.
Also, it assumes that %FB_HOME%/replication.conf has been prepared with apropriate parameters for replication.
Particularly, name of directories and databases must have info about checked FB major version and ServerMode.
* verbose = true // in order to find out line with message that required segment was replicated
* section for master database with specified parameters:
journal_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.journal"
journal_archive_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.archive"
journal_archive_command = "copy $(pathname) $(archivepathname)"
journal_archive_timeout = 10
* section for replica database with specified parameter:
journal_source_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.archive"
Master and replica databases must be created in "!fbt_repo! mp" directory and have names like these: Confirmed bug on 5.0.0.215: server crashed when segment was applied to replica and at the same time we issued
'fbt-main.fb40.SS.fdb'; 'fbt-repl.fb40.SS.fdb'; - for FB 4.x ('SS' = Super; 'CS' = Classic) 'gfix -shut full -force 0 ...'. Regardless of that command, replica DB remained in NORMAL mode, not in shutdown.
'fbt-main.fb50.SS.fdb'; 'fbt-repl.fb50.SS.fdb'; - for FB 5.x ('SS' = Super; 'CS' = Classic) If this command was issued after this again - FB process hanged (gfix could not return control to OS).
NB: fixed numeric value ('40' or '50') must be used for any minor FB version (4.0; 4.0.1; 4.1; 5.0; 5.1 etc) This is the same bug as described in the ticked (discussed with dimitr, letters 22.09.2021).
These two databases must NOT be dropped in any of tests related to replication!
They are created and dropped in the batch scenario which prepares FB instance to be checked for each ServerMode
and make cleanup after it, i.e. when all tests will be completed.
NB. Currently this task was implemented only in Windows batch, thus test has attribute platform = 'Windows'.
Temporary comment. For debug purpoces:
1) find out SUFFIX of the name of FB service which is to be tested (e.g. 'DefaultInstance', '40SS' etc);
2) copy file %fbt-repo%/tests/functional/tabloid/batches/setup-fb-for-replication.bat.txt
to some place and rename it "*.bat";
3) open this .bat in editor and asjust value of 'fbt_repo' variable;
4) run: setup-fb-for-replication.bat [SUFFIX_OF_FB_SERVICE]
where SUFFIX_OF_FB_SERVICE is ending part of FB service which you want to check:
DefaultInstance ; 40ss ; 40cs ; 50ss ; 50cs etc
5) batch 'setup-fb-for-replication.bat' will:
* stop selected FB instance
* create test databases (in !fbt_repo!/tmp);
* prepare journal/archive sub-folders for replication (also in !fbt_repo!/tmp);
* replace %fb_home%/replication.conf with apropriate
* start selected FB instance
6) run this test (FB instance will be already launched by setup-fb-for-replication.bat):
%fpt_repo%/fbt-run2.bat dblevel-triggers-must-not-fire-on-replica.fbt 50ss, etc
Confirmed bug on 5.0.0.215: server crashed when segment was applied to replica and at the same time we issued
'gfix -shut full -force 0 ...'. Regardless of that command, replica DB remained in NORMAL mode, not in shutdown.
If this command was issued after this again - FB process hanged (gfix could not return control to OS).
This is the same bug as described in the ticked (discussed with dimitr, letters 22.09.2021).
Checked on: 4.0.1.2613 (SS/CS); 5.0.0.219 (SS/CS)
FBTEST: tests.functional.replication.shutdown_during_applying_segments_leads_to_crash FBTEST: tests.functional.replication.shutdown_during_applying_segments_leads_to_crash
NOTES:
[27.08.2022] pzotov
1. In case of any errors (somewhat_failed <> 0) test will re-create db_main and db_repl, and then perform all needed
actions to resume replication (set 'replica' flag on db_repl, enabling publishing in db_main, remove all files
from subdirectories <repl_journal> and <repl_archive> which must present in the same folder as <db_main>).
2. Warning raises on Windows and Linux:
../../../usr/local/lib/python3.9/site-packages/_pytest/config/__init__.py:1126
/usr/local/lib/python3.9/site-packages/_pytest/config/__init__.py:1126:
PytestAssertRewriteWarning: Module already imported so cannot be rewritten: __editable___firebird_qa_0_17_0_finder
self._mark_plugins_for_rewrite(hook)
The reason currently is unknown.
3. Following message appears in the firebird.log during this test:
3.1.[WINDOWS]
I/O error during "WriteFile" operation for file "<db_replica_filename>"
Error while trying to write to file
3.2 [LINUX]
I/O error during "write" operation for file "<db_replica_filename>"
Error while trying to write to file
Success
4. ### ACHTUNG ###
On linux test atually "silently" FAILS, without showing any error (but FB process is terminated!).
Will be investigated later.
Checked on 5.0.0.691, 4.0.1.2692 - both CS and SS. Both on Windows and Linux.
""" """
import os
import shutil
import re
from difflib import unified_diff
from pathlib import Path
import time
from datetime import datetime
from datetime import timedelta
import pytest import pytest
from firebird.qa import * from firebird.qa import *
from firebird.driver import connect, create_database, DbWriteMode, ReplicaMode, ShutdownMode,ShutdownMethod, OnlineMode
from firebird.driver.types import DatabaseError
# QA_GLOBALS -- dict, is defined in qa/plugin.py, obtain settings
# from act.files_dir/'test_config.ini':
repl_settings = QA_GLOBALS['replication']
MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = int(repl_settings['max_time_for_wait_segment_in_log'])
MAX_TIME_FOR_WAIT_ADDED_TO_QUEUE = int(repl_settings['max_time_for_wait_added_to_queue'])
MAIN_DB_ALIAS = repl_settings['main_db_alias']
REPL_DB_ALIAS = repl_settings['repl_db_alias']
db_main = db_factory( filename = '#' + MAIN_DB_ALIAS, do_not_create = True, do_not_drop = True)
db_repl = db_factory( filename = '#' + REPL_DB_ALIAS, do_not_create = True, do_not_drop = True)
substitutions = [('Start removing objects in:.*', 'Start removing objects'), substitutions = [('Start removing objects in:.*', 'Start removing objects'),
('Finish. Total objects removed: [1-9]\\d*', 'Finish. Total objects removed'), ('Finish. Total objects removed: [1-9]\\d*', 'Finish. Total objects removed'),
('.* CREATE DATABASE .*', ''), ('FMT_DESCR .*', 'FMT_DESCR'), ('[ \t]+', ' ')] ('.* CREATE DATABASE .*', ''),
('[\t ]+', ' '),
('FOUND message about replicated segment N .*', 'FOUND message about replicated segment'),
('FOUND message about segments added to queue after timestamp .*', 'FOUND message about segments added to queue after timestamp')
]
db = db_factory() act_db_main = python_act('db_main', substitutions=substitutions)
act_db_repl = python_act('db_repl', substitutions=substitutions)
act = python_act('db', substitutions=substitutions) #--------------------------------------------
expected_stdout = """ def cleanup_folder(p):
POINT-A FOUND message about segments added to queue after given timestamp. # Removed all files and subdirs in the folder <p>
POINT-B Attributes force write, full shutdown, read-only replica # Used for cleanup <repl_journal> and <repl_archive> when replication must be reset
POINT-1 FOUND message about replicated segment. # in case when any error occurred during test execution.
Start removing objects assert os.path.dirname(p) != p, f"@@@ ABEND @@@ CAN NOT operate in the file system root directory. Check your code!"
Finish. Total objects removed for root, dirs, files in os.walk(p):
POINT-2 FOUND message about replicated segment. for f in files:
""" os.unlink(os.path.join(root, f))
for d in dirs:
shutil.rmtree(os.path.join(root, d))
return len(os.listdir(p))
#--------------------------------------------
def wait_for_add_queue_in_replica( act_db_main: Action, max_allowed_time_for_wait, min_timestamp, prefix_msg = '' ):
# <hostname> (replica) Tue Sep 21 20:24:57 2021
# Database: ...
# Added 3 segment(s) to the processing queue
# -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-
def check_pattern_in_log( log_lines, pattern, min_timestamp, prefix_msg = '' ):
found_required_message = False
for i,r in enumerate(log_lines):
if pattern.search(r):
if i>=2 and log_lines[i-2]:
# a = r.replace('(',' ').split()
a = log_lines[i-2].split()
if len(a)>=4:
# s='replica_host_name (slave) Sun May 30 17:46:43 2021'
# s.split()[-5:] ==> ['Sun', 'May', '30', '17:46:43', '2021']
# ' '.join( ...) ==> 'Sun May 30 17:46:43 2021'
dts = ' '.join( log_lines[i-2].split()[-5:] )
segment_timestamp = datetime.strptime( dts, '%a %b %d %H:%M:%S %Y')
if segment_timestamp >= min_timestamp:
print( (prefix_msg + ' ' if prefix_msg else '') + f'FOUND message about segments added to queue after timestamp {min_timestamp}: {segment_timestamp}')
found_required_message = True
break
return found_required_message
# -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-
replication_log = act_db_main.home_dir / 'replication.log'
replold_lines = []
with open(replication_log, 'r') as f:
replold_lines = f.readlines()
# VERBOSE: Added 5 segment(s) to the processing queue
segments_to_queue_pattern=re.compile( 'VERBOSE:\\s+added\\s+\\d+\\s+segment.*to.*queue', re.IGNORECASE)
# 08.09.2021: replication content can already contain phrase which we are looking for.
# Because of this, it is crucial to check OLD content of replication log before loop.
# Also, segments_to_queue_pattern must NOT start from '\\+' because it can occur only for diff_data (within loop):
#
found_required_message = check_pattern_in_log( replold_lines, segments_to_queue_pattern, min_timestamp, prefix_msg )
if not found_required_message:
for i in range(0,max_allowed_time_for_wait):
time.sleep(1)
# Get content of fb_home replication.log _after_ isql finish:
with open(replication_log, 'r') as f:
diff_data = unified_diff(
replold_lines,
f.readlines()
)
found_required_message = check_pattern_in_log( list(diff_data), segments_to_queue_pattern, min_timestamp, prefix_msg )
if found_required_message:
break
if not found_required_message:
print(f'UNEXPECTED RESULT: no message about segments added to queue after {min_timestamp}.')
#--------------------------------------------
def wait_for_data_in_replica( act_db_main: Action, max_allowed_time_for_wait, prefix_msg = '' ):
replication_log = act_db_main.home_dir / 'replication.log'
replold_lines = []
with open(replication_log, 'r') as f:
replold_lines = f.readlines()
with act_db_main.db.connect(no_db_triggers = True) as con:
with con.cursor() as cur:
cur.execute("select rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') from rdb$database")
last_generated_repl_segment = cur.fetchone()[0]
# VERBOSE: Segment 1 (2582 bytes) is replicated in 1 second(s), deleting the file
# VERBOSE: Segment 2 (200 bytes) is replicated in 82 ms, deleting the file
p_successfully_replicated = re.compile( f'\\+\\s+verbose:\\s+segment\\s+{last_generated_repl_segment}\\s+\\(\\d+\\s+bytes\\)\\s+is\\s+replicated.*deleting', re.IGNORECASE)
# VERBOSE: Segment 16 replication failure at offset 33628
p_replication_failure = re.compile('segment\\s+\\d+\\s+replication\\s+failure', re.IGNORECASE)
found_required_message = False
found_replfail_message = False
found_common_error_msg = False
for i in range(0,max_allowed_time_for_wait):
time.sleep(1)
# Get content of fb_home replication.log _after_ isql finish:
with open(replication_log, 'r') as f:
diff_data = unified_diff(
replold_lines,
f.readlines()
)
for k,d in enumerate(diff_data):
if p_successfully_replicated.search(d):
# We FOUND phrase "VERBOSE: Segment <last_generated_repl_segment> ... is replicated ..." in the replication log.
# This is expected success, break!
print( (prefix_msg + ' ' if prefix_msg else '') + f'FOUND message about replicated segment N {last_generated_repl_segment}.' )
found_required_message = True
break
if p_replication_failure.search(d):
print( (prefix_msg + ' ' if prefix_msg else '') + '@@@ SEGMENT REPLICATION FAILURE @@@ ' + d )
found_replfail_message = True
break
if 'ERROR:' in d:
print( (prefix_msg + ' ' if prefix_msg else '') + '@@@ REPLICATION ERROR @@@ ' + d )
found_common_error_msg = True
break
if found_required_message or found_replfail_message or found_common_error_msg:
break
if not found_required_message:
print(f'UNEXPECTED RESULT: no message about replicating segment N {last_generated_repl_segment} for {max_allowed_time_for_wait} seconds.')
#--------------------------------------------
@pytest.mark.skip('FIXME: Not IMPLEMENTED')
@pytest.mark.version('>=4.0.1') @pytest.mark.version('>=4.0.1')
@pytest.mark.platform('Windows') def test_1(act_db_main: Action, act_db_repl: Action, capsys):
def test_1(act: Action):
pytest.fail("Not IMPLEMENTED")
# test_script_1 #assert '' == capsys.readouterr().out
#---
# ###################
# import os somewhat_failed = 1
# import subprocess ###################
# import re try:
# import difflib
# import shutil N_ROWS = 30000
# import time FLD_WIDTH = 700
# from datetime import datetime
# from datetime import timedelta # N_ROWS = 30'000:
# # FW = ON ==>
# # Added 2 segment(s) to the processing queue
# os.environ["ISC_USER"] = user_name # Segment 1 (16783004 bytes) is replicated in 1 minute(s), preserving the file due to 1 active transaction(s) (oldest: 10 in segment 1)
# os.environ["ISC_PASSWORD"] = user_password # Segment 2 (4667696 bytes) is replicated in 55 second(s), deleting the file
# # FW = OFF ==>
# # NB: with default values of 'apply_idle_timeout' and 'apply_error_timeout' (10 and 60 s) # Segment 1 (16783004 bytes) is replicated in 1 second(s), preserving the file due to 1 active transaction(s) (oldest: 10 in segment 1)
# # total time of this test is about 130...132s. # Segment 2 (4667696 bytes) is replicated in 374 ms, deleting the file
# #####################################
# MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = 135 act_db_main.db.set_async_write()
# MAX_TIME_FOR_WAIT_ADDED_TO_QUEUE = 135 act_db_repl.db.set_sync_write()
# ##################################### assert '' == capsys.readouterr().out
#
# svc = fdb.services.connect(host='localhost', user=user_name, password=user_password) current_date_with_hhmmss = datetime.today().replace(microsecond=0) # datetime.datetime(2022, 8, 26, 22, 54, 33) etc
# FB_HOME = svc.get_home_directory()
# svc.close() sql_init = f'''
# set bail on;
# engine = db_conn.engine_version # 4.0; 4.1; 5.0 etc -- type float recreate table test(s varchar({FLD_WIDTH}), constraint test_s_unq unique(s));
# fb_major = 'fb' + str(engine)[:1] + '0' # 'fb40'; 'fb50' commit;
#
# cur = db_conn.cursor() set term ^;
# cur.execute("select rdb$config_value from rdb$config where upper(rdb$config_name) = upper('ServerMode')") execute block as
# server_mode = 'XX' declare fld_len int;
# for r in cur: declare n int;
# if r[0] == 'Super': begin
# server_mode = 'SS' select ff.rdb$field_length
# elif r[0] == 'SuperClassic': from rdb$relation_fields rf
# server_mode = 'SC' join rdb$fields ff on rf.rdb$field_source = ff.rdb$field_name
# elif r[0] == 'Classic': where upper(rf.rdb$relation_name) = upper('test') and upper(rf.rdb$field_name) = upper('s')
# server_mode = 'CS' into fld_len;
# cur.close()
#
# # 'fbt-main.fb50.ss.fdb' etc: n = {N_ROWS};
# db_main = os.path.join( context['temp_directory'], 'fbt-main.' + fb_major + '.' + server_mode + '.fdb' ) while (n > 0) do
# db_repl = db_main.replace( 'fbt-main.', 'fbt-repl.') begin
# insert into test(s) values( lpad('', :fld_len, uuid_to_char(gen_uuid())) );
# # Folders for journalling and archieving segments. n = n - 1;
# repl_journal_dir = os.path.join( context['temp_directory'], 'fb-replication.' + fb_major + '.' + server_mode + '.journal' ) end
# repl_archive_dir = os.path.join( context['temp_directory'], 'fb-replication.' + fb_major + '.' + server_mode + '.archive' )
# end
# runProgram('gfix', ['-w', 'async', 'localhost:' + db_main]) ^
# set term ;^
# db_conn.close() commit;
# '''
# #--------------------------------------------
# act_db_main.expected_stderr = ''
# def flush_and_close( file_handle ): act_db_main.isql(switches=['-q'], input = sql_init)
# # https://docs.python.org/2/library/os.html#os.fsync assert act_db_main.clean_stderr == act_db_main.clean_expected_stderr
# # If you're starting with a Python file object f, act_db_main.reset()
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk. act_db_main.expected_stdout = f'POINT-1 FOUND message about segments added to queue after timestamp {current_date_with_hhmmss}'
# global os ##############################################################################
# ### W A I T F O R S E G M E N T S A D D E D T O Q U E U E ###
# file_handle.flush() ##############################################################################
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull: wait_for_add_queue_in_replica( act_db_main, MAX_TIME_FOR_WAIT_ADDED_TO_QUEUE, current_date_with_hhmmss, 'POINT-1' )
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno()) act_db_main.stdout = capsys.readouterr().out
# file_handle.close() assert act_db_main.clean_stdout == act_db_main.clean_expected_stdout
# act_db_main.reset()
# #--------------------------------------------
# time.sleep(1)
# def cleanup( f_names_list ):
# global os with act_db_repl.connect_server() as srv:
# for i in range(len( f_names_list )):
# if type(f_names_list[i]) == file: try:
# del_name = f_names_list[i].name # 5.0.0.215:
# elif type(f_names_list[i]) == str: # 335544344 : I/O error during "WriteFile" operation for file "<db_repl_file.fdb>"
# del_name = f_names_list[i] # 335544737 : Error while trying to write to file
# else: srv.database.set_write_mode(database=act_db_repl.db.db_path
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.') , mode=DbWriteMode.ASYNC
# del_name = None )
# except:
# if del_name and os.path.isfile( del_name ): pass
# os.remove( del_name )
# #try:
# #-------------------------------------------- # 5.0.0.215:
# # FB crashes here, replication archive folder can not be cleaned:
# def wait_for_data_in_replica( fb_home, max_allowed_time_for_wait, db_main, prefix_msg = '' ): # PermissionError: [WinError 32] ...: '<repl_arc_sub_dir>/<dbmain>.journal-NNN'
# global re srv.database.shutdown(
# global difflib database=act_db_repl.db.db_path
# global time ,mode=ShutdownMode.FULL
# ,method=ShutdownMethod.FORCED
# # -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:- ,timeout=0
# def check_pattern_in_log( log_lines, pattern, prefix_msg = '' ): )
# found_required_message = False
# for d in log_lines: # Without crash replication here will be resumed, but DB_REPL now has FW = OFF, and segments
# if pattern.search(d): # will be replicated very fast after this:
# print( (prefix_msg + ' ' if prefix_msg else '') + 'FOUND message about replicated segment.' ) srv.database.bring_online(
# found_required_message = True database=act_db_repl.db.db_path
# break ,mode=OnlineMode.NORMAL
# return found_required_message )
# # -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-
# assert '' == capsys.readouterr().out
# replold_lines = []
# with open( os.path.join(fb_home,'replication.log'), 'r') as f: act_db_main.expected_stdout = 'POINT-2 FOUND message about replicated segment'
# replold_lines = f.readlines() ##############################################################################
# ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
# con = fdb.connect( dsn = 'localhost:' + db_main, no_db_triggers = 1) ##############################################################################
# cur = con.cursor() wait_for_data_in_replica( act_db_main, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, 'POINT-2' )
# cur.execute("select rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') from rdb$database")
# for r in cur: act_db_main.stdout = capsys.readouterr().out
# last_generated_repl_segment = r[0] assert act_db_main.clean_stdout == act_db_main.clean_expected_stdout
# cur.close() act_db_main.reset()
# con.close()
#
# # VERBOSE: Segment 1 (2582 bytes) is replicated in 1 second(s), deleting the file #---------------------------------------------------
# segment_replicated_pattern=re.compile( 'verbose:\\s+segment\\s+%(last_generated_repl_segment)s\\s+\\(\\d+\\s+bytes\\)\\s+is\\s+replicated.*deleting' % locals(), re.IGNORECASE)
# # return initial state of master DB:
# # 08.09.2021: replication content can remain unchanged if there was no user-defined object in DB that must be dropped! # remove all DB objects (tables, views, ...):
# # Because of this, it is crucial to check OLD content of replication log before loop. #
# # Also, segment_replicated_pattern must NOT start from '\\+' because it can occur only for diff_data (within loop): db_main_meta, db_repl_meta = '', ''
# # for a in (act_db_main,act_db_repl):
# found_required_message = check_pattern_in_log( replold_lines, segment_replicated_pattern, prefix_msg ) if a == act_db_main:
# sql_clean = (a.files_dir / 'drop-all-db-objects.sql').read_text()
# if not found_required_message: a.expected_stdout = """
# Start removing objects
# for i in range(0,max_allowed_time_for_wait): Finish. Total objects removed
# time.sleep(1) """
# a.isql(switches=['-q', '-nod'], input = sql_clean, combine_output = True)
# # Get content of fb_home replication.log _after_ isql finish: assert a.clean_stdout == a.clean_expected_stdout
# f_repllog_new = open( os.path.join(fb_home,'replication.log'), 'r') a.reset()
# diff_data = difflib.unified_diff(
# replold_lines,
# f_repllog_new.readlines() a.expected_stdout = 'POINT-3 FOUND message about replicated segment'
# ) ##############################################################################
# f_repllog_new.close() ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
# ##############################################################################
# found_required_message = check_pattern_in_log( diff_data, segment_replicated_pattern, prefix_msg ) wait_for_data_in_replica( a, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, 'POINT-3' )
# if found_required_message:
# break a.stdout = capsys.readouterr().out
# assert a.clean_stdout == a.clean_expected_stdout
# if not found_required_message: a.reset()
# print('UNEXPECTED RESULT: no message about replicated segment No. %d for %d seconds.' % (int(last_generated_repl_segment), max_allowed_time_for_wait) )
# db_main_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
# else:
# #-------------------------------------------- db_repl_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
#
# def wait_for_add_queue_in_replica( fb_home, max_allowed_time_for_wait, min_timestamp, prefix_msg = '' ): ######################
# ### A C H T U N G ###
# global re ######################
# global difflib # MANDATORY, OTHERWISE REPLICATION GETS STUCK ON SECOND RUN OF THIS TEST
# global time # WITH 'ERROR: Record format with length NN is not found for table TEST':
# global datetime a.gfix(switches=['-sweep', a.db.dsn])
#
# # <hostname> (replica) Tue Sep 21 20:24:57 2021
# # Database: ... # Final point: metadata must become equal:
# # Added 3 segment(s) to the processing queue #
# diff_meta = ''.join(unified_diff( \
# # -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:- [x for x in db_main_meta.splitlines() if 'CREATE DATABASE' not in x],
# def check_pattern_in_log( log_lines, pattern, min_timestamp, prefix_msg = '' ): [x for x in db_repl_meta.splitlines() if 'CREATE DATABASE' not in x])
# found_required_message = False )
# for i,r in enumerate(log_lines): assert diff_meta == ''
# if pattern.search(r):
# if i>=2 and log_lines[i-2]:
# # a = r.replace('(',' ').split() ###################
# a = log_lines[i-2].split() somewhat_failed = 0
# if len(a)>=4: ###################
# # s='replica_host_name (slave) Sun May 30 17:46:43 2021'
# # s.split()[-5:] ==> ['Sun', 'May', '30', '17:46:43', '2021'] except Exception as e:
# # ' '.join( ...) ==> 'Sun May 30 17:46:43 2021' print(e.__str__())
# dts = ' '.join( log_lines[i-2].split()[-5:] )
# segment_timestamp = datetime.strptime( dts, '%a %b %d %H:%M:%S %Y') finally:
# if segment_timestamp >= min_timestamp: if somewhat_failed:
# print( (prefix_msg + ' ' if prefix_msg else '') + 'FOUND message about segments added to queue after given timestamp.') #, 'segment_timestamp=%s' % segment_timestamp, '; min_timestamp=%s' % min_timestamp ) # If any previous assert failed, we have to RECREATE both master and slave databases.
# found_required_message = True # Otherwise further execution of this test or other replication-related tests most likely will fail.
# break for a in (act_db_main,act_db_repl):
# return found_required_message d = a.db.db_path
# a.db.drop()
# # -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:- dbx = create_database(str(d), user = a.db.user)
# dbx.close()
# replold_lines = [] with a.connect_server() as srv:
# with open( os.path.join(fb_home,'replication.log'), 'r') as f: srv.database.set_write_mode(database = d, mode=DbWriteMode.ASYNC)
# replold_lines = f.readlines() srv.database.set_sweep_interval(database = d, interval = 0)
# if a == act_db_repl:
# srv.database.set_replica_mode(database = d, mode = ReplicaMode.READ_ONLY)
# segments_to_queue_pattern=re.compile( 'verbose:\\s+added\\s+\\d+\\s+segment.*to.*queue', re.IGNORECASE) else:
# with a.db.connect() as con:
# # 08.09.2021: replication content can remain unchanged if there was no user-defined object in DB that must be dropped! # !! IT IS ASSUMED THAT REPLICATION FOLDERS ARE IN THE SAME DIR AS <DB_MAIN> !!
# # Because of this, it is crucial to check OLD content of replication log before loop. # DO NOT use 'a.db.db_path' for ALIASED database!
# # Also, segments_to_queue_pattern must NOT start from '\\+' because it can occur only for diff_data (within loop): # Its '.parent' property will be '.' rather than full path.
# # # Use only con.info.name for that:
# found_required_message = check_pattern_in_log( replold_lines, segments_to_queue_pattern, min_timestamp, prefix_msg ) repl_root_path = Path(con.info.name).parent
# repl_jrn_sub_dir = repl_settings['journal_sub_dir']
# if not found_required_message: repl_arc_sub_dir = repl_settings['archive_sub_dir']
#
# for i in range(0,max_allowed_time_for_wait): # Clean folders repl_journal and repl_archive (remove all files from there):
# time.sleep(1) for p in (repl_jrn_sub_dir,repl_arc_sub_dir):
# # PermissionError: [WinError 32] ...: '<repl_arc_sub_dir>/<dbmain>.journal-000000001'
# # Get content of fb_home replication.log _after_ isql finish: assert cleanup_folder(repl_root_path / p) == 0, f"Directory {str(p)} remains non-empty."
# f_repllog_new = open( os.path.join(fb_home,'replication.log'), 'r')
# diff_data = difflib.unified_diff( con.execute_immediate('alter database enable publication')
# replold_lines, con.execute_immediate('alter database include all to publication')
# f_repllog_new.readlines() con.commit()
# ) assert '' == capsys.readouterr().out
# f_repllog_new.close()
#
# found_required_message = check_pattern_in_log( list(diff_data), segments_to_queue_pattern, min_timestamp, prefix_msg )
# if found_required_message:
# break
#
# if not found_required_message:
# print('UNEXPECTED RESULT: no message about segments added to queue after %s.' % min_timestamp)
#
# #--------------------------------------------
#
# sql_ddl = ''' set bail on;
# recreate table test(s varchar(700), constraint test_s_unq unique(s));
# commit;
#
# set term ^;
# execute block as
# declare fld_len int;
# declare n int;
# begin
# select ff.rdb$field_length
# from rdb$relation_fields rf
# join rdb$fields ff on rf.rdb$field_source = ff.rdb$field_name
# where upper(rf.rdb$relation_name) = upper('test') and upper(rf.rdb$field_name) = upper('s')
# into fld_len;
#
#
# n = 10000;
# while (n > 0) do
# begin
# insert into test(s) values( lpad('', :fld_len, uuid_to_char(gen_uuid())) );
# n = n - 1;
# end
#
# end
# ^
# set term ;^
# commit;
# ''' % locals()
#
#
# f_sql_chk = open( os.path.join(context['temp_directory'],'tmp_gh_6975_init.sql'), 'w')
# f_sql_chk.write(sql_ddl)
# flush_and_close( f_sql_chk )
#
# f_sql_log = open( ''.join( (os.path.splitext(f_sql_chk.name)[0], '.log' ) ), 'w')
# f_sql_err = open( ''.join( (os.path.splitext(f_sql_chk.name)[0], '.err' ) ), 'w')
# subprocess.call( [ context['isql_path'], 'localhost:' + db_main, '-i', f_sql_chk.name ], stdout = f_sql_log, stderr = f_sql_err)
# flush_and_close( f_sql_log )
# flush_and_close( f_sql_err )
#
# last_generated_repl_segment = 0
#
# with open(f_sql_err.name,'r') as f:
# for line in f:
# print('UNEXPECTED STDERR in initial SQL: ' + line)
# MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = 0
#
# if MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG: # ==> initial SQL script finished w/o errors
#
#
# ##################################
# ### A.C.H.T.U.N.G ###
# ### do NOT use datetime.now() ###
# ### because of missed accuracy ###
# ### of timestamps in repl.log ###
# ### (it is HH:MM:SS only) ###
# ##################################
# current_date_with_hhmmss = datetime.today().replace(microsecond=0)
#
#
# ##############################################################################
# ### W A I T F O R S E G M E N T S A D D E D T O Q U E U E ###
# ##############################################################################
# wait_for_add_queue_in_replica( FB_HOME, MAX_TIME_FOR_WAIT_ADDED_TO_QUEUE, current_date_with_hhmmss, 'POINT-A' )
#
# # This led to crash and appearance of message:
# # "Fatal lock manager error: invalid lock id (0), errno: 0" in firebird.log:
# #
# runProgram('gfix', ['-shut', 'full', '-force', '0', 'localhost:' + db_repl])
#
# f_repl_hdr_log=open( os.path.join(context['temp_directory'],'db_repl_hdr.log'), 'w')
# subprocess.call( [context['gstat_path'], db_repl, '-h'], stdout=f_repl_hdr_log, stderr=subprocess.STDOUT )
# flush_and_close( f_repl_hdr_log )
#
# with open(f_repl_hdr_log.name,'r') as f:
# for line in f:
# if 'Attributes' in line:
# print('POINT-B ' + line.strip())
#
#
# # This (issuing 'gfix -shu ...' second time) led FB process to hang!
# # runProgram('gfix', ['-shut', 'full', '-force', '0', 'localhost:' + db_repl])
#
# runProgram('gfix', ['-online', 'localhost:' + db_repl])
#
#
# ##############################################################################
# ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
# ##############################################################################
# wait_for_data_in_replica( FB_HOME, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, db_main, 'POINT-1' )
#
# # return initial state of master DB:
# # remove all DB objects (tables, views, ...):
# # --------------------------------------------
# sql_clean_ddl = os.path.join(context['files_location'],'drop-all-db-objects.sql')
#
# f_clean_log=open( os.path.join(context['temp_directory'],'drop-all-db-objects-gh_6975.log'), 'w')
# f_clean_err=open( ''.join( ( os.path.splitext(f_clean_log.name)[0], '.err') ), 'w')
# subprocess.call( [context['isql_path'], 'localhost:' + db_main, '-q', '-nod', '-i', sql_clean_ddl], stdout=f_clean_log, stderr=f_clean_err )
# flush_and_close(f_clean_log)
# flush_and_close(f_clean_err)
#
# with open(f_clean_err.name,'r') as f:
# for line in f:
# print('UNEXPECTED STDERR in cleanup SQL: ' + line)
# MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = 0
#
# with open(f_clean_log.name,'r') as f:
# for line in f:
# # show number of dropped objects
# print(line)
#
# if MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG: # ==> previous SQL script finished w/o errors
#
# ##############################################################################
# ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
# ##############################################################################
# wait_for_data_in_replica( FB_HOME, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, db_main, 'POINT-2' )
#
# f_main_meta_sql=open( os.path.join(context['temp_directory'],'db_main_meta_gh_6975.sql'), 'w')
# subprocess.call( [context['isql_path'], 'localhost:' + db_main, '-q', '-nod', '-ch', 'utf8', '-x'], stdout=f_main_meta_sql, stderr=subprocess.STDOUT )
# flush_and_close( f_main_meta_sql )
#
# f_repl_meta_sql=open( os.path.join(context['temp_directory'],'db_repl_meta_gh_6975.sql'), 'w')
# subprocess.call( [context['isql_path'], 'localhost:' + db_repl, '-q', '-nod', '-ch', 'utf8', '-x'], stdout=f_repl_meta_sql, stderr=subprocess.STDOUT )
# flush_and_close( f_repl_meta_sql )
#
# db_main_meta=open(f_main_meta_sql.name, 'r')
# db_repl_meta=open(f_repl_meta_sql.name, 'r')
#
# diffmeta = ''.join(difflib.unified_diff(
# db_main_meta.readlines(),
# db_repl_meta.readlines()
# ))
# db_main_meta.close()
# db_repl_meta.close()
#
# f_meta_diff=open( os.path.join(context['temp_directory'],'db_meta_diff_gh_6975.txt'), 'w', buffering = 0)
# f_meta_diff.write(diffmeta)
# flush_and_close( f_meta_diff )
#
# # Following must issue only TWO rows:
# # UNEXPECTED METADATA DIFF.: -/* CREATE DATABASE 'localhost:[db_main]' ... */
# # UNEXPECTED METADATA DIFF.: -/* CREATE DATABASE 'localhost:[db_repl]' ... */
# # Only thes lines will be suppressed further (see subst. section):
# with open(f_meta_diff.name, 'r') as f:
# for line in f:
# if line[:1] in ('-', '+') and line[:3] not in ('---','+++'):
# print('UNEXPECTED METADATA DIFF.: ' + line)
#
# runProgram('gfix', ['-w', 'sync', 'localhost:' + db_main])
#
# ######################
# ### A C H T U N G ###
# ######################
# # MANDATORY, OTHERWISE REPLICATION GETS STUCK ON SECOND RUN OF THIS TEST
# # WITH 'ERROR: Record format with length 68 is not found for table TEST':
# runProgram('gfix', ['-sweep', 'localhost:' + db_repl])
# runProgram('gfix', ['-sweep', 'localhost:' + db_main])
# #######################
#
# # cleanup:
# ##########
# #cleanup( (f_sql_chk, f_sql_log, f_sql_err,f_clean_log,f_clean_err,f_main_meta_sql,f_repl_meta_sql,f_meta_diff) )
#
#---