mirror of
https://github.com/FirebirdSQL/firebird-qa.git
synced 2025-01-22 21:43:06 +01:00
Added/Updated functional\replication\test_blob_not_found_in_rw_replica_if_target_row_exists.py. Checked on 5.0.0.623, 4.0.1.2692 - both on Windows and Linux. PytestAssertRewriteWarning raises, reason currently is unknown.
This commit is contained in:
parent
bb385aa110
commit
4c8a8a04a2
@ -15,8 +15,8 @@ DESCRIPTION:
|
||||
reproduced only when we make table on master and wait until it will be replicated on replica.
|
||||
|
||||
After this we:
|
||||
* add one record with ID = 1 and non-empty blob into this table on REPLICA database and do commit.
|
||||
* add record with the same ID = 1 on MASTER database and do commit.
|
||||
* add record with binary blob (using stream API) into REPLICA database; commit;
|
||||
* add record with another binary blob into MASTER database; commit;
|
||||
|
||||
Message "WARNING: Record being inserted into table TEST already exists, updating instead" must appear
|
||||
in the replication log at this point but after that message about successfully replicated segment must also be.
|
||||
@ -30,366 +30,293 @@ DESCRIPTION:
|
||||
The only difference in metadata must be 'CREATE DATABASE' statement with different DB names - we suppress it,
|
||||
thus metadata difference must not be issued.
|
||||
|
||||
####################
|
||||
### CRUCIAL NOTE ###
|
||||
####################
|
||||
Currently, 25.06.2021, there is bug in FB 4.x and 5.x which can be seen on SECOND run of this test: message with text
|
||||
"ERROR: Record format with length 68 is not found for table TEST" will appear in it after inserting 1st record in master.
|
||||
The reason of that is "dirty" pages that remain in RDB$RELATION_FIELDS both on master and replica after dropping table.
|
||||
Following query show different data that appear in replica DB on 1st and 2nd run (just after table was created on master):
|
||||
=======
|
||||
set blobdisplay 6;
|
||||
select rdb$descriptor as fmt_descr
|
||||
from rdb$formats natural join rdb$relations where rdb$relation_name = 'TEST';
|
||||
=======
|
||||
This bug was explained by dimitr, see letters 25.06.2021 11:49 and 25.06.2021 16:56.
|
||||
It will be fixed later.
|
||||
|
||||
The only workaround to solve this problem is to make SWEEP after all DB objects have been dropped.
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
!NB! BOTH master and replica must be cleaned up by sweep!
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
################
|
||||
### N O T E ###
|
||||
################
|
||||
Test assumes that master and replica DB have been created beforehand.
|
||||
Also, it assumes that %FB_HOME%/replication.conf has been prepared with apropriate parameters for replication.
|
||||
Particularly, name of directories and databases must have info about checked FB major version and ServerMode.
|
||||
* verbose = true // in order to find out line with message that required segment was replicated
|
||||
* section for master database with specified parameters:
|
||||
journal_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.journal"
|
||||
journal_archive_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.archive"
|
||||
journal_archive_command = "copy $(pathname) $(archivepathname)"
|
||||
journal_archive_timeout = 10
|
||||
* section for replica database with specified parameter:
|
||||
journal_source_directory = "!fbt_repo!/tmp/fb-replication.!fb_major!.!server_mode!.archive"
|
||||
|
||||
Master and replica databases must be created in "!fbt_repo! mp" directory and have names like these:
|
||||
'fbt-main.fb40.SS.fdb'; 'fbt-repl.fb40.SS.fdb'; - for FB 4.x ('SS' = Super; 'CS' = Classic)
|
||||
'fbt-main.fb50.SS.fdb'; 'fbt-repl.fb50.SS.fdb'; - for FB 5.x ('SS' = Super; 'CS' = Classic)
|
||||
NB: fixed numeric value ('40' or '50') must be used for any minor FB version (4.0; 4.0.1; 4.1; 5.0; 5.1 etc)
|
||||
|
||||
These two databases must NOT be dropped in any of tests related to replication!
|
||||
They are created and dropped in the batch scenario which prepares FB instance to be checked for each ServerMode
|
||||
and make cleanup after it, i.e. when all tests will be completed.
|
||||
|
||||
NB. Currently this task was implemented only in Windows batch, thus test has attribute platform = 'Windows'.
|
||||
|
||||
Temporary comment. For debug purpoces:
|
||||
1) find out SUFFIX of the name of FB service which is to be tested (e.g. 'DefaultInstance', '40SS' etc);
|
||||
2) copy file %fbt-repo%/tests/functional/tabloid/batches/setup-fb-for-replication.bat.txt
|
||||
to some place and rename it "*.bat";
|
||||
3) open this .bat in editor and asjust value of 'fbt_repo' variable;
|
||||
4) run: setup-fb-for-replication.bat [SUFFIX_OF_FB_SERVICE]
|
||||
where SUFFIX_OF_FB_SERVICE is ending part of FB service which you want to check:
|
||||
DefaultInstance ; 40ss ; 40cs ; 50ss ; 50cs etc
|
||||
5) batch 'setup-fb-for-replication.bat' will:
|
||||
* stop selected FB instance
|
||||
* create test databases (in !fbt_repo!/tmp);
|
||||
* prepare journal/archive sub-folders for replication (also in !fbt_repo!/tmp);
|
||||
* replace %fb_home%/replication.conf with apropriate
|
||||
* start selected FB instance
|
||||
6) run this test (FB instance will be already launched by setup-fb-for-replication.bat):
|
||||
%fpt_repo%/fbt-run2.bat dblevel-triggers-must-not-fire-on-replica.fbt 50ss, etc
|
||||
|
||||
Confirmed bug on 4.0.1.2682 and 5.0.0.338, got in the replication.log:
|
||||
ERROR: Blob 128.0 is not found for table TEST
|
||||
Checked on:
|
||||
4.0.1.2691 SS/CS (32.3s/33.9s)
|
||||
5.0.0.351 SS/CS (28.5s/35.3s)
|
||||
FBTEST: tests.functional.replication.blob_not_found_in_rw_replica_if_target_row_exists
|
||||
NOTES:
|
||||
[26.08.2022] pzotov
|
||||
1. In case of any errors (somewhat_failed <> 0) test will re-create db_main and db_repl, and then perform all needed
|
||||
actions to resume replication (set 'replica' flag on db_repl, enabling publishing in db_main, remove all files
|
||||
from subdirectories <repl_journal> and <repl_archive> which must present in the same folder as <db_main>).
|
||||
2. Warning raises on Windows and Linux:
|
||||
../../../usr/local/lib/python3.9/site-packages/_pytest/config/__init__.py:1126
|
||||
/usr/local/lib/python3.9/site-packages/_pytest/config/__init__.py:1126:
|
||||
PytestAssertRewriteWarning: Module already imported so cannot be rewritten: __editable___firebird_qa_0_17_0_finder
|
||||
self._mark_plugins_for_rewrite(hook)
|
||||
The reason currently is unknown.
|
||||
|
||||
Checked on 5.0.0.623, 4.0.1.2692 - both CS and SS. Both on Windows and Linux.
|
||||
"""
|
||||
import os
|
||||
import shutil
|
||||
import re
|
||||
from difflib import unified_diff
|
||||
from pathlib import Path
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from firebird.qa import *
|
||||
from firebird.driver import connect, create_database, DbWriteMode, ReplicaMode
|
||||
|
||||
# QA_GLOBALS -- dict, is defined in qa/plugin.py, obtain settings
|
||||
# from act.files_dir/'test_config.ini':
|
||||
repl_settings = QA_GLOBALS['replication']
|
||||
|
||||
MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = int(repl_settings['max_time_for_wait_segment_in_log'])
|
||||
MAIN_DB_ALIAS = repl_settings['main_db_alias']
|
||||
REPL_DB_ALIAS = repl_settings['repl_db_alias']
|
||||
|
||||
db_main = db_factory( filename = '#' + MAIN_DB_ALIAS, do_not_create = True, do_not_drop = True)
|
||||
db_repl = db_factory( filename = '#' + REPL_DB_ALIAS, do_not_create = True, do_not_drop = True)
|
||||
|
||||
substitutions = [('Start removing objects in:.*', 'Start removing objects'),
|
||||
('Finish. Total objects removed: [1-9]\\d*', 'Finish. Total objects removed'),
|
||||
('.* CREATE DATABASE .*', ''), ('FMT_DESCR .*', 'FMT_DESCR')]
|
||||
('.* CREATE DATABASE .*', ''),
|
||||
('[\t ]+', ' '),
|
||||
('FOUND message about replicated segment N .*', 'FOUND message about replicated segment')]
|
||||
|
||||
db = db_factory()
|
||||
act_db_main = python_act('db_main', substitutions=substitutions)
|
||||
act_db_repl = python_act('db_repl', substitutions=substitutions)
|
||||
|
||||
act = python_act('db', substitutions=substitutions)
|
||||
# Length of generated blob:
|
||||
DATA_LEN = 65 * 1024 * 1024
|
||||
tmp_data = temp_file(filename = 'tmp_blob_for_replication.dat')
|
||||
|
||||
expected_stdout = """
|
||||
POINT-1A FOUND message about replicated segment.
|
||||
POINT-1B FOUND message about replicated segment.
|
||||
Start removing objects
|
||||
Finish. Total objects removed
|
||||
POINT-2 FOUND message about replicated segment.
|
||||
"""
|
||||
|
||||
@pytest.mark.skip('FIXME: Not IMPLEMENTED')
|
||||
#--------------------------------------------
|
||||
|
||||
def cleanup_folder(p):
|
||||
# Removed all files and subdirs in the folder <p>
|
||||
# Used for cleanup <repl_journal> and <repl_archive> when replication must be reset
|
||||
# in case when any error occurred during test execution.
|
||||
assert os.path.dirname(p) != p, f"@@@ ABEND @@@ CAN NOT operate in the file system root directory. Check your code!"
|
||||
for root, dirs, files in os.walk(p):
|
||||
for f in files:
|
||||
os.unlink(os.path.join(root, f))
|
||||
for d in dirs:
|
||||
shutil.rmtree(os.path.join(root, d))
|
||||
return len(os.listdir(p))
|
||||
|
||||
#--------------------------------------------
|
||||
|
||||
def wait_for_data_in_replica( act_db_main: Action, max_allowed_time_for_wait, prefix_msg = '' ):
|
||||
|
||||
replication_log = act_db_main.home_dir / 'replication.log'
|
||||
|
||||
replold_lines = []
|
||||
with open(replication_log, 'r') as f:
|
||||
replold_lines = f.readlines()
|
||||
|
||||
with act_db_main.db.connect(no_db_triggers = True) as con:
|
||||
with con.cursor() as cur:
|
||||
cur.execute("select rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') from rdb$database")
|
||||
last_generated_repl_segment = cur.fetchone()[0]
|
||||
|
||||
# VERBOSE: Segment 1 (2582 bytes) is replicated in 1 second(s), deleting the file
|
||||
# VERBOSE: Segment 2 (200 bytes) is replicated in 82 ms, deleting the file
|
||||
p_successfully_replicated = re.compile( f'\\+\\s+verbose:\\s+segment\\s+{last_generated_repl_segment}\\s+\\(\\d+\\s+bytes\\)\\s+is\\s+replicated.*deleting', re.IGNORECASE)
|
||||
|
||||
# VERBOSE: Segment 16 replication failure at offset 33628
|
||||
p_replication_failure = re.compile('segment\\s+\\d+\\s+replication\\s+failure', re.IGNORECASE)
|
||||
|
||||
found_required_message = False
|
||||
found_replfail_message = False
|
||||
found_common_error_msg = False
|
||||
|
||||
for i in range(0,max_allowed_time_for_wait):
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
# Get content of fb_home replication.log _after_ isql finish:
|
||||
with open(replication_log, 'r') as f:
|
||||
diff_data = unified_diff(
|
||||
replold_lines,
|
||||
f.readlines()
|
||||
)
|
||||
|
||||
for k,d in enumerate(diff_data):
|
||||
if p_successfully_replicated.search(d):
|
||||
# We FOUND phrase "VERBOSE: Segment <last_generated_repl_segment> ... is replicated ..." in the replication log.
|
||||
# This is expected success, break!
|
||||
print( (prefix_msg + ' ' if prefix_msg else '') + f'FOUND message about replicated segment N {last_generated_repl_segment}.' )
|
||||
found_required_message = True
|
||||
break
|
||||
|
||||
if p_replication_failure.search(d):
|
||||
print( (prefix_msg + ' ' if prefix_msg else '') + '@@@ SEGMENT REPLICATION FAILURE @@@ ' + d )
|
||||
found_replfail_message = True
|
||||
break
|
||||
|
||||
if 'ERROR:' in d:
|
||||
print( (prefix_msg + ' ' if prefix_msg else '') + '@@@ REPLICATION ERROR @@@ ' + d )
|
||||
found_common_error_msg = True
|
||||
break
|
||||
|
||||
if found_required_message or found_replfail_message or found_common_error_msg:
|
||||
break
|
||||
|
||||
if not found_required_message:
|
||||
print('UNEXPECTED RESULT: no message about replicating segment N {last_generated_repl_segment} for {max_allowed_time_for_wait} seconds.')
|
||||
|
||||
#--------------------------------------------
|
||||
|
||||
@pytest.mark.version('>=4.0.1')
|
||||
@pytest.mark.platform('Windows')
|
||||
def test_1(act: Action):
|
||||
pytest.fail("Not IMPLEMENTED")
|
||||
def test_1(act_db_main: Action, act_db_repl: Action, tmp_data: Path, capsys):
|
||||
|
||||
# test_script_1
|
||||
#---
|
||||
#
|
||||
# import os
|
||||
# import subprocess
|
||||
# import re
|
||||
# import difflib
|
||||
# import shutil
|
||||
# import time
|
||||
#
|
||||
# os.environ["ISC_USER"] = user_name
|
||||
# os.environ["ISC_PASSWORD"] = user_password
|
||||
#
|
||||
# #####################################
|
||||
# MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = 65
|
||||
# #####################################
|
||||
#
|
||||
# svc = fdb.services.connect(host='localhost', user=user_name, password=user_password)
|
||||
# FB_HOME = svc.get_home_directory()
|
||||
# svc.close()
|
||||
#
|
||||
# engine = db_conn.engine_version # 4.0; 4.1; 5.0 etc -- type float
|
||||
# fb_major = 'fb' + str(engine)[:1] + '0' # 'fb40'; 'fb50'
|
||||
#
|
||||
# cur = db_conn.cursor()
|
||||
# cur.execute("select rdb$config_value from rdb$config where upper(rdb$config_name) = upper('ServerMode')")
|
||||
# server_mode = 'XX'
|
||||
# for r in cur:
|
||||
# if r[0] == 'Super':
|
||||
# server_mode = 'SS'
|
||||
# elif r[0] == 'SuperClassic':
|
||||
# server_mode = 'SC'
|
||||
# elif r[0] == 'Classic':
|
||||
# server_mode = 'CS'
|
||||
# cur.close()
|
||||
#
|
||||
# # 'fbt-main.fb50.ss.fdb' etc:
|
||||
# db_main = os.path.join( context['temp_directory'], 'fbt-main.' + fb_major + '.' + server_mode + '.fdb' )
|
||||
# db_repl = db_main.replace( 'fbt-main.', 'fbt-repl.')
|
||||
#
|
||||
# runProgram('gfix', ['-replica', 'read_write', 'localhost:' + db_repl])
|
||||
#
|
||||
# # Folders for journalling and archieving segments.
|
||||
# repl_journal_dir = os.path.join( context['temp_directory'], 'fb-replication.' + fb_major + '.' + server_mode + '.journal' )
|
||||
# repl_archive_dir = os.path.join( context['temp_directory'], 'fb-replication.' + fb_major + '.' + server_mode + '.archive' )
|
||||
#
|
||||
# db_conn.close()
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# def flush_and_close( file_handle ):
|
||||
# # https://docs.python.org/2/library/os.html#os.fsync
|
||||
# # If you're starting with a Python file object f,
|
||||
# # first do f.flush(), and
|
||||
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
|
||||
# global os
|
||||
#
|
||||
# file_handle.flush()
|
||||
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
|
||||
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
|
||||
# os.fsync(file_handle.fileno())
|
||||
# file_handle.close()
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# def cleanup( f_names_list ):
|
||||
# global os
|
||||
# for i in range(len( f_names_list )):
|
||||
# if type(f_names_list[i]) == file:
|
||||
# del_name = f_names_list[i].name
|
||||
# elif type(f_names_list[i]) == str:
|
||||
# del_name = f_names_list[i]
|
||||
# else:
|
||||
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.')
|
||||
# del_name = None
|
||||
#
|
||||
# if del_name and os.path.isfile( del_name ):
|
||||
# os.remove( del_name )
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# def wait_for_data_in_replica( fb_home, max_allowed_time_for_wait, db_main, prefix_msg = '' ):
|
||||
#
|
||||
# global re
|
||||
# global difflib
|
||||
# global time
|
||||
#
|
||||
# replold_lines = []
|
||||
# with open( os.path.join(fb_home,'replication.log'), 'r') as f:
|
||||
# replold_lines = f.readlines()
|
||||
#
|
||||
# con = fdb.connect( dsn = 'localhost:' + db_main, no_db_triggers = 1)
|
||||
# cur = con.cursor()
|
||||
# cur.execute("select rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') from rdb$database")
|
||||
# for r in cur:
|
||||
# last_generated_repl_segment = r[0]
|
||||
# cur.close()
|
||||
# con.close()
|
||||
#
|
||||
# #print('last_generated_repl_segment:', last_generated_repl_segment)
|
||||
#
|
||||
# # VERBOSE: Segment 1 (2582 bytes) is replicated in 1 second(s), deleting the file
|
||||
# p_expected=re.compile( '\\+\\s+verbose:\\s+segment\\s+%(last_generated_repl_segment)s\\s+\\(\\d+\\s+bytes\\)\\s+is\\s+replicated.*deleting' % locals(), re.IGNORECASE)
|
||||
# p_unexpected = re.compile( '\\+\\s+ERROR: Blob .* not found', re.IGNORECASE)
|
||||
#
|
||||
# found_required_message, found_unexpected_message = False, False
|
||||
# for i in range(0,max_allowed_time_for_wait):
|
||||
# time.sleep(1)
|
||||
#
|
||||
# # Get content of fb_home replication.log _after_ isql finish:
|
||||
# f_repllog_new = open( os.path.join(fb_home,'replication.log'), 'r')
|
||||
# diff_data = difflib.unified_diff(
|
||||
# replold_lines,
|
||||
# f_repllog_new.readlines()
|
||||
# )
|
||||
# f_repllog_new.close()
|
||||
#
|
||||
# for k,d in enumerate(diff_data):
|
||||
# if p_unexpected.search(d):
|
||||
# print( (prefix_msg + ' ' if prefix_msg else '') + 'UNEXPECTED message encountered:' )
|
||||
# print(d)
|
||||
# found_unexpected_message = True
|
||||
# break
|
||||
#
|
||||
# if p_expected.search(d):
|
||||
# print( (prefix_msg + ' ' if prefix_msg else '') + 'FOUND message about replicated segment.' )
|
||||
# found_required_message = True
|
||||
# break
|
||||
#
|
||||
# if found_required_message or found_unexpected_message:
|
||||
# break
|
||||
#
|
||||
# if not found_required_message:
|
||||
# print('UNEXPECTED RESULT: no message about replicated segment for %d seconds.' % max_allowed_time_for_wait)
|
||||
#
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# sql_ddl = ''' set bail on;
|
||||
# set list on;
|
||||
# --set blob all;
|
||||
# set blobdisplay 6;
|
||||
#
|
||||
# recreate table test(id int primary key using index test_pk, bindata blob);
|
||||
# commit;
|
||||
#
|
||||
# -- for debug only:
|
||||
# select rdb$get_context('SYSTEM', 'DB_NAME'), rdb$get_context('SYSTEM','REPLICATION_SEQUENCE') as last_generated_repl_segment from rdb$database;
|
||||
# select
|
||||
# RDB$DESCRIPTOR as fmt_descr
|
||||
# from RDB$FORMATS natural join RDB$RELATIONS
|
||||
# where RDB$RELATION_NAME = 'TEST';
|
||||
# quit;
|
||||
# ''' % locals()
|
||||
#
|
||||
#
|
||||
# f_sql_ddl = open( os.path.join(context['temp_directory'],'tmp_gh_7070_init.sql'), 'w')
|
||||
# f_sql_ddl.write(sql_ddl)
|
||||
# flush_and_close( f_sql_ddl )
|
||||
#
|
||||
# f_ddl_log = open( ''.join( (os.path.splitext(f_sql_ddl.name)[0], '.log' ) ), 'w')
|
||||
# f_ddl_err = open( ''.join( (os.path.splitext(f_sql_ddl.name)[0], '.err' ) ), 'w')
|
||||
# subprocess.call( [ context['isql_path'], 'localhost:' + db_main, '-i', f_sql_ddl.name ], stdout = f_ddl_log, stderr = f_ddl_err)
|
||||
# flush_and_close( f_ddl_log )
|
||||
# flush_and_close( f_ddl_err )
|
||||
#
|
||||
# ##############################################################################
|
||||
# ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
|
||||
# ##############################################################################
|
||||
# wait_for_data_in_replica( FB_HOME, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, db_main, 'POINT-1A' )
|
||||
#
|
||||
# con_repl = fdb.connect( dsn = 'localhost:' + db_repl)
|
||||
# cur_repl = con_repl.cursor()
|
||||
# cur_repl.execute('insert into test(id,bindata) values(?, ?)', (2, 'qwerty-on-replica'))
|
||||
# con_repl.commit()
|
||||
# cur_repl.close()
|
||||
# con_repl.close()
|
||||
#
|
||||
# con_main = fdb.connect( dsn = 'localhost:' + db_main)
|
||||
# cur_main = con_main.cursor()
|
||||
# cur_main.execute('insert into test(id,bindata) values(?, ?)', (2, 'qwerty-on-master'))
|
||||
# con_main.commit()
|
||||
# cur_main.close()
|
||||
# con_main.close()
|
||||
#
|
||||
# ##############################################################################
|
||||
# ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
|
||||
# ##############################################################################
|
||||
# wait_for_data_in_replica( FB_HOME, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, db_main, 'POINT-1B' )
|
||||
#
|
||||
#
|
||||
# runProgram('gfix', ['-replica', 'read_only', 'localhost:' + db_repl])
|
||||
#
|
||||
# # return initial state of master DB:
|
||||
# # remove all DB objects (tables, views, ...):
|
||||
# # --------------------------------------------
|
||||
# sql_clean_ddl = os.path.join(context['files_location'],'drop-all-db-objects.sql')
|
||||
#
|
||||
# f_clean_log=open( os.path.join(context['temp_directory'],'drop-all-db-objects-gh_7070.log'), 'w')
|
||||
# f_clean_err=open( ''.join( ( os.path.splitext(f_clean_log.name)[0], '.err') ), 'w')
|
||||
# subprocess.call( [context['isql_path'], 'localhost:' + db_main, '-q', '-nod', '-i', sql_clean_ddl], stdout=f_clean_log, stderr=f_clean_err )
|
||||
# flush_and_close(f_clean_log)
|
||||
# flush_and_close(f_clean_err)
|
||||
#
|
||||
# with open(f_clean_err.name,'r') as f:
|
||||
# for line in f:
|
||||
# print('UNEXPECTED STDERR in cleanup SQL: ' + line)
|
||||
# MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG = 0
|
||||
#
|
||||
# with open(f_clean_log.name,'r') as f:
|
||||
# for line in f:
|
||||
# # show number of dropped objects
|
||||
# print(line)
|
||||
#
|
||||
#
|
||||
# ##############################################################################
|
||||
# ### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
|
||||
# ##############################################################################
|
||||
# wait_for_data_in_replica( FB_HOME, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, db_main, 'POINT-2' )
|
||||
#
|
||||
# f_main_meta_sql=open( os.path.join(context['temp_directory'],'db_main_meta_gh_7070.sql'), 'w')
|
||||
# subprocess.call( [context['isql_path'], 'localhost:' + db_main, '-q', '-nod', '-ch', 'utf8', '-x'], stdout=f_main_meta_sql, stderr=subprocess.STDOUT )
|
||||
# flush_and_close( f_main_meta_sql )
|
||||
#
|
||||
# f_repl_meta_sql=open( os.path.join(context['temp_directory'],'db_repl_meta_gh_7070.sql'), 'w')
|
||||
# subprocess.call( [context['isql_path'], 'localhost:' + db_repl, '-q', '-nod', '-ch', 'utf8', '-x'], stdout=f_repl_meta_sql, stderr=subprocess.STDOUT )
|
||||
# flush_and_close( f_repl_meta_sql )
|
||||
#
|
||||
# db_main_meta=open(f_main_meta_sql.name, 'r')
|
||||
# db_repl_meta=open(f_repl_meta_sql.name, 'r')
|
||||
#
|
||||
# diffmeta = ''.join(difflib.unified_diff(
|
||||
# db_main_meta.readlines(),
|
||||
# db_repl_meta.readlines()
|
||||
# ))
|
||||
# db_main_meta.close()
|
||||
# db_repl_meta.close()
|
||||
#
|
||||
# f_meta_diff=open( os.path.join(context['temp_directory'],'db_meta_diff_gh_7070.txt'), 'w', buffering = 0)
|
||||
# f_meta_diff.write(diffmeta)
|
||||
# flush_and_close( f_meta_diff )
|
||||
#
|
||||
# # Following must issue only TWO rows:
|
||||
# # UNEXPECTED METADATA DIFF.: -/* CREATE DATABASE 'localhost:[db_main]' ... */
|
||||
# # UNEXPECTED METADATA DIFF.: -/* CREATE DATABASE 'localhost:[db_repl]' ... */
|
||||
# # Only thes lines will be suppressed further (see subst. section):
|
||||
# with open(f_meta_diff.name, 'r') as f:
|
||||
# for line in f:
|
||||
# if line[:1] in ('-', '+') and line[:3] not in ('---','+++'):
|
||||
# print('UNEXPECTED METADATA DIFF.: ' + line)
|
||||
#
|
||||
#
|
||||
# ######################
|
||||
# ### A C H T U N G ###
|
||||
# ######################
|
||||
# # MANDATORY, OTHERWISE REPLICATION GETS STUCK ON SECOND RUN OF THIS TEST
|
||||
# # WITH 'ERROR: Record format with length 68 is not found for table TEST':
|
||||
# runProgram('gfix', ['-sweep', 'localhost:' + db_repl])
|
||||
# runProgram('gfix', ['-sweep', 'localhost:' + db_main])
|
||||
# #######################
|
||||
#
|
||||
# # cleanup:
|
||||
# ##########
|
||||
# cleanup( (f_sql_ddl, f_ddl_log, f_ddl_err,f_clean_log,f_clean_err,f_main_meta_sql,f_repl_meta_sql,f_meta_diff) )
|
||||
#
|
||||
#
|
||||
#---
|
||||
#assert '' == capsys.readouterr().out
|
||||
|
||||
try:
|
||||
###################
|
||||
somewhat_failed = 1
|
||||
###################
|
||||
|
||||
with act_db_repl.connect_server() as srv:
|
||||
srv.database.set_replica_mode(database = act_db_repl.db.db_path, mode = ReplicaMode.READ_WRITE)
|
||||
|
||||
sql_init = '''
|
||||
set bail on;
|
||||
recreate table test(id int primary key using index test_pk, b blob);
|
||||
commit;
|
||||
'''
|
||||
|
||||
act_db_main.expected_stderr = ''
|
||||
act_db_main.isql(switches=['-q'], input = sql_init)
|
||||
assert act_db_main.clean_stderr == act_db_main.clean_expected_stderr
|
||||
act_db_main.reset()
|
||||
|
||||
act_db_main.expected_stdout = 'POINT-1 FOUND message about replicated segment'
|
||||
##############################################################################
|
||||
### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
|
||||
##############################################################################
|
||||
wait_for_data_in_replica( act_db_main, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, 'POINT-1' )
|
||||
|
||||
act_db_main.stdout = capsys.readouterr().out
|
||||
assert act_db_main.clean_stdout == act_db_main.clean_expected_stdout
|
||||
act_db_main.reset()
|
||||
|
||||
#-----------------------------------------------------------------------
|
||||
blob_inserted_hashes = {}
|
||||
for a in (act_db_repl, act_db_main):
|
||||
with a.db.connect() as con:
|
||||
with con.cursor() as cur:
|
||||
# Generate random binary data and writing to file which will be loaded as stream blob into DB:
|
||||
tmp_data.write_bytes( bytearray(os.urandom(DATA_LEN)) )
|
||||
with open(tmp_data, 'rb') as blob_file:
|
||||
cur.execute("insert into test(id, b) values(1, ?) returning crypt_hash(b using sha512)", (blob_file,) )
|
||||
blob_inserted_hashes[ a.db.db_path ] = cur.fetchone()[0]
|
||||
con.commit()
|
||||
assert '' == capsys.readouterr().out
|
||||
|
||||
# No errors must be now. We have to wait now until blob from MASTER be delivered
|
||||
# to REPLICA and replace there "old" blob (in the record with ID = 1).
|
||||
|
||||
|
||||
act_db_main.expected_stdout = 'POINT-2 FOUND message about replicated segment'
|
||||
##############################################################################
|
||||
### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
|
||||
##############################################################################
|
||||
wait_for_data_in_replica( act_db_main, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, 'POINT-2' )
|
||||
|
||||
act_db_main.stdout = capsys.readouterr().out
|
||||
assert act_db_main.clean_stdout == act_db_main.clean_expected_stdout
|
||||
act_db_main.reset()
|
||||
|
||||
with act_db_repl.connect_server() as srv:
|
||||
srv.database.set_replica_mode(database = act_db_repl.db.db_path, mode = ReplicaMode.READ_ONLY)
|
||||
|
||||
# Check: blob in _replica_ DB must now be equal to blob that came from master:
|
||||
for a in (act_db_repl,):
|
||||
with a.db.connect() as con:
|
||||
with con.cursor() as cur:
|
||||
with open(tmp_data, 'rb') as blob_file:
|
||||
cur.execute("select crypt_hash(b using sha512) from test")
|
||||
assert cur.fetchone()[0] == blob_inserted_hashes[ act_db_main.db.db_path ]
|
||||
|
||||
#---------------------------------------------------
|
||||
|
||||
# return initial state of master DB:
|
||||
# remove all DB objects (tables, views, ...):
|
||||
#
|
||||
db_main_meta, db_repl_meta = '', ''
|
||||
for a in (act_db_main,act_db_repl):
|
||||
if a == act_db_main:
|
||||
sql_clean = (a.files_dir / 'drop-all-db-objects.sql').read_text()
|
||||
a.expected_stdout = """
|
||||
Start removing objects
|
||||
Finish. Total objects removed
|
||||
"""
|
||||
a.isql(switches=['-q', '-nod'], input = sql_clean, combine_output = True)
|
||||
assert a.clean_stdout == a.clean_expected_stdout
|
||||
a.reset()
|
||||
|
||||
|
||||
a.expected_stdout = 'POINT-3 FOUND message about replicated segment'
|
||||
##############################################################################
|
||||
### W A I T U N T I L R E P L I C A B E C O M E S A C T U A L ###
|
||||
##############################################################################
|
||||
wait_for_data_in_replica( a, MAX_TIME_FOR_WAIT_SEGMENT_IN_LOG, 'POINT-3' )
|
||||
|
||||
a.stdout = capsys.readouterr().out
|
||||
assert a.clean_stdout == a.clean_expected_stdout
|
||||
a.reset()
|
||||
|
||||
db_main_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
|
||||
else:
|
||||
db_repl_meta = a.extract_meta(charset = 'utf8', io_enc = 'utf8')
|
||||
|
||||
######################
|
||||
### A C H T U N G ###
|
||||
######################
|
||||
# MANDATORY, OTHERWISE REPLICATION GETS STUCK ON SECOND RUN OF THIS TEST
|
||||
# WITH 'ERROR: Record format with length NN is not found for table TEST':
|
||||
a.gfix(switches=['-sweep', a.db.dsn])
|
||||
|
||||
|
||||
# Final point: metadata must become equal:
|
||||
#
|
||||
diff_meta = ''.join(unified_diff( \
|
||||
[x for x in db_main_meta.splitlines() if 'CREATE DATABASE' not in x],
|
||||
[x for x in db_repl_meta.splitlines() if 'CREATE DATABASE' not in x])
|
||||
)
|
||||
assert diff_meta == ''
|
||||
|
||||
###################
|
||||
somewhat_failed = 0
|
||||
###################
|
||||
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
|
||||
if somewhat_failed:
|
||||
# If any previous assert failed, we have to RECREATE both master and slave databases.
|
||||
# Otherwise further execution of this test or other replication-related tests most likely will fail.
|
||||
for a in (act_db_main,act_db_repl):
|
||||
d = a.db.db_path
|
||||
a.db.drop()
|
||||
dbx = create_database(str(d), user = a.db.user)
|
||||
dbx.close()
|
||||
with a.connect_server() as srv:
|
||||
srv.database.set_write_mode(database = d, mode=DbWriteMode.ASYNC)
|
||||
srv.database.set_sweep_interval(database = d, interval = 0)
|
||||
if a == act_db_repl:
|
||||
srv.database.set_replica_mode(database = d, mode = ReplicaMode.READ_ONLY)
|
||||
else:
|
||||
with a.db.connect() as con:
|
||||
# !! IT IS ASSUMED THAT REPLICATION FOLDERS ARE IN THE SAME DIR AS <DB_MAIN> !!
|
||||
# DO NOT use 'a.db.db_path' for ALIASED database!
|
||||
# Its '.parent' property will be '.' rather than full path.
|
||||
# Use only con.info.name for that:
|
||||
repl_root_path = Path(con.info.name).parent
|
||||
repl_jrn_sub_dir = repl_settings['journal_sub_dir']
|
||||
repl_arc_sub_dir = repl_settings['archive_sub_dir']
|
||||
|
||||
# Clean folders repl_journal and repl_archive (remove all files from there):
|
||||
for p in (repl_jrn_sub_dir,repl_arc_sub_dir):
|
||||
assert cleanup_folder(repl_root_path / p) == 0, f"Directory {str(p)} remains non-empty."
|
||||
|
||||
con.execute_immediate('alter database enable publication')
|
||||
con.execute_immediate('alter database include all to publication')
|
||||
con.commit()
|
||||
|
||||
assert somewhat_failed == 0
|
||||
|
Loading…
Reference in New Issue
Block a user