diff --git a/tests/functional/transactions/test_read_consist_sttm_restart_on_delete_02.py b/tests/functional/transactions/test_read_consist_sttm_restart_on_delete_02.py index ae44d5fa..3cadea0d 100644 --- a/tests/functional/transactions/test_read_consist_sttm_restart_on_delete_02.py +++ b/tests/functional/transactions/test_read_consist_sttm_restart_on_delete_02.py @@ -115,34 +115,58 @@ DESCRIPTION: NOTE: concrete values of fields TRN, GLOBAL_CN and SNAP_NO in the TLOG_DONE can differ from one to another run! This is because of concurrent nature of connections that work against database. We must not assume that these values will be constant. - ################ - - Checked on 4.0.0.2144 SS/CS FBTEST: functional.transactions.read_consist_sttm_restart_on_delete_02 NOTES: [28.07.2022] pzotov Checked on 4.0.1.2692, 5.0.0.591 - [23.09.2023] pzotov - Replaced verification method of worker attachment presense (which tries DML and waits for resource). - This attachment is created by *asynchronously* launched ISQL thus using of time.sleep(1) is COMPLETELY wrong. - Loop with query to mon$statements is used instead (we search for record which SQL_TEXT contains 'special tag', see variable SQL_TAG_THAT_WE_WAITING_FOR). - Maximal duration of this loop is limited by variable 'MAX_WAIT_FOR_WORKER_START_MS'. - Many thanks to Vlad for suggestions. + [25.09.2023] pzotov + 1. Added trace launch and its parsing in order to get number of times when WORKER statement did restart. + See commits: + 1) FB 4.x (23-JUN-2022, 4.0.2.2782): https://github.com/FirebirdSQL/firebird/commit/95b8623adbf129d0730a50a18b4f1cf9976ac35c + 2) FB 5.x (27-JUN-2022, 5.0.0.555): https://github.com/FirebirdSQL/firebird/commit/f121cd4a6b40b1639f560c6c38a057c4e68bb3df + + Trace must contain several groups, each with similar lines: + () EXECUTE_STATEMENT_RESTART + {SQL_TO_BE_RESTARTED} + Restarted time(s) + + 2. To prevent raises between concurrent transactions, it is necessary to ensure that code: + * does not allow LOCKER-2 to start its work until WORKER session will establish connection and - moreover - will actually locks first record + from the scope that is seen by the query that we want to be executed by worker. + * does not allow LOCKER-1 to do something after LOCKER-2 issued commit (and thus released record): we first have to ensure that this record + now is locked by WORKER. The same when record was occupied by LOCKER-2 and then is released: LOCKER-1 must not do smth until WORKER will + encounter this record and 'catch' it. + This is done by calls to function 'wait_for_record_become_locked()' which are performed by separate 'monitoring' connection with starting Tx + with NO_WAIT mode and catching exception with further parsing. In case when record has been already occupied (by WORKER) this exception will + have form "deadlock / -update conflicts ... / -concurrent transaction number is ". We can then obtain number of this transaction and query + mon$statements for get MON$SQL_TEXT that is runnig by this Tx. If it contains contains 'special tag' (see variable SQL_TAG_THAT_WE_WAITING_FOR) + then we can be sure that WORKER really did establish connection and successfully locked row with required ID. + + Table 'TLOG_WANT' (which is fulfilled by trigger TEST_BIUD using in autonomous tx) can NOT be used for detection of moments when WORKER + actually locks records which he was waiting for: this trigger fires BEFORE actual updating occurs, i.e. when record become seeon by WORKER + but is still occupied by some LOCKER ("[DOC]: c) engine continue to evaluate remaining records ... and put write locks on it too") + + NB! Worker transaction must running in WAIT mode - in contrary to Tx that we start in our monitoring loop. Checked on WI-T6.0.0.48, WI-T5.0.0.1211, WI-V4.0.4.2988. """ import subprocess -import pytest -from firebird.qa import * +import re from pathlib import Path import time import datetime as py_dt +import locale + +import pytest +from firebird.qa import * +from firebird.driver import tpb, Isolation, TraAccessMode, DatabaseError db = db_factory() -act = python_act('db', substitutions=[('=', ''), ('[ \t]+', ' ')]) +act = python_act( 'db', substitutions = [ ('=', ''), ('[ \t]+', ' '), ('.* EXECUTE_STATEMENT_RESTART', 'EXECUTE_STATEMENT_RESTART') ] ) +#act = python_act( 'db', substitutions = [ ('.* EXECUTE_STATEMENT_RESTART', 'EXECUTE_STATEMENT_RESTART') ] ) MAX_WAIT_FOR_WORKER_START_MS = 10000; SQL_TAG_THAT_WE_WAITING_FOR = 'SQL_TAG_THAT_WE_WAITING_FOR' @@ -152,72 +176,55 @@ fn_worker_sql = temp_file('tmp_worker.sql') fn_worker_log = temp_file('tmp_worker.log') fn_worker_err = temp_file('tmp_worker.err') -expected_stdout = """ - checked_mode: table, STDLOG: Records affected: 6 +#----------------------------------------------------------------------------------------------------------------------------------------------------- - checked_mode: table, STDLOG: ID - checked_mode: table, STDLOG: ======= - checked_mode: table, STDLOG: 1 - checked_mode: table, STDLOG: 2 - checked_mode: table, STDLOG: Records affected: 2 +def wait_for_record_become_locked(tx_monitoring, cur_monitoring, sql_to_lock_record, SQL_TAG_THAT_WE_WAITING_FOR): - checked_mode: table, STDLOG: OLD_ID OP SNAP_NO_RANK - checked_mode: table, STDLOG: ======= ====== ===================== - checked_mode: table, STDLOG: 3 DEL 1 - checked_mode: table, STDLOG: 4 DEL 1 - checked_mode: table, STDLOG: -3 DEL 2 - checked_mode: table, STDLOG: -2 DEL 2 - checked_mode: table, STDLOG: -1 DEL 2 - checked_mode: table, STDLOG: 3 DEL 2 - checked_mode: table, STDLOG: 4 DEL 2 - checked_mode: table, STDLOG: 5 DEL 2 - checked_mode: table, STDLOG: Records affected: 8 + # ::: NB ::: + # tx_monitoring must work in NOWAIT mode! - checked_mode: view, STDLOG: Records affected: 6 - - checked_mode: view, STDLOG: ID - checked_mode: view, STDLOG: ======= - checked_mode: view, STDLOG: 1 - checked_mode: view, STDLOG: 2 - checked_mode: view, STDLOG: Records affected: 2 - - checked_mode: view, STDLOG: OLD_ID OP SNAP_NO_RANK - checked_mode: view, STDLOG: ======= ====== ===================== - checked_mode: view, STDLOG: 3 DEL 1 - checked_mode: view, STDLOG: 4 DEL 1 - checked_mode: view, STDLOG: -3 DEL 2 - checked_mode: view, STDLOG: -2 DEL 2 - checked_mode: view, STDLOG: -1 DEL 2 - checked_mode: view, STDLOG: 3 DEL 2 - checked_mode: view, STDLOG: 4 DEL 2 - checked_mode: view, STDLOG: 5 DEL 2 - checked_mode: view, STDLOG: Records affected: 8 -""" - -def wait_for_attach_showup_in_monitoring(con_monitoring, sql_text_tag): - chk_sql = f"select 1 from mon$statements s where s.mon$attachment_id != current_connection and s.mon$sql_text containing '{sql_text_tag}'" - attach_with_sql_tag = None t1=py_dt.datetime.now() - cur_monitoring = con_monitoring.cursor() + required_concurrent_found = None + concurrent_tx_pattern = re.compile('concurrent transaction number is \\d+', re.IGNORECASE) while True: - cur_monitoring.execute(chk_sql) - for r in cur_monitoring: - attach_with_sql_tag = r[0] - if not attach_with_sql_tag: + concurrent_tx_number = None + concurrent_runsql = '' + tx_monitoring.begin() + try: + cur_monitoring.execute(sql_to_lock_record) + except DatabaseError as exc: + # Failed: SQL execution failed with: deadlock + # -update conflicts with concurrent update + # -concurrent transaction number is 40 + m = concurrent_tx_pattern.search( str(exc) ) + if m: + concurrent_tx_number = m.group().split()[-1] # 'concurrent transaction number is 40' ==> '40' + cur_monitoring.execute( 'select mon$sql_text from mon$statements where mon$transaction_id = ?', (int(concurrent_tx_number),) ) + for r in cur_monitoring: + concurrent_runsql = r[0] + if SQL_TAG_THAT_WE_WAITING_FOR in concurrent_runsql: + required_concurrent_found = 1 + + # pytest.fail(f"Can not upd, concurrent TX = {concurrent_tx_number}, sql: {concurrent_runsql}") + finally: + tx_monitoring.rollback() + + if not required_concurrent_found: t2=py_dt.datetime.now() d1=t2-t1 if d1.seconds*1000 + d1.microseconds//1000 >= MAX_WAIT_FOR_WORKER_START_MS: break else: - con_monitoring.commit() time.sleep(0.2) else: break - - assert attach_with_sql_tag, f"Could not find attach statement containing '{sql_text_tag}' for {MAX_WAIT_FOR_WORKER_START_MS} ms. ABEND." + + assert required_concurrent_found, f"Could not find attach that running SQL with tag '{SQL_TAG_THAT_WE_WAITING_FOR}' and locks record for {MAX_WAIT_FOR_WORKER_START_MS} ms. Check query: {sql_to_lock_record}. ABEND." return -@pytest.mark.version('>=4.0') +#----------------------------------------------------------------------------------------------------------------------------------------------------- + +@pytest.mark.version('>=4.0.2') def test_1(act: Action, fn_worker_sql: Path, fn_worker_log: Path, fn_worker_err: Path, capsys): sql_init = (act.files_dir / 'read-consist-sttm-restart-DDL.sql').read_text() @@ -246,123 +253,229 @@ def test_1(act: Action, fn_worker_sql: Path, fn_worker_log: Path, fn_worker_err: assert act.stderr == '' act.reset() - with act.db.connect() as con_lock_1, act.db.connect() as con_lock_2, act.db.connect() as con_monitoring: - for i,c in enumerate((con_lock_1,con_lock_2)): - sttm = f"execute block as begin rdb$set_context('USER_SESSION', 'WHO', 'LOCKER #{i+1}'); end" - c.execute_immediate(sttm) + trace_cfg_items = [ + 'time_threshold = 0', + 'log_errors = true', + 'log_statement_start = true', + 'log_statement_finish = true', + ] - ######################### - ### L O C K E R - 1 ### - ######################### + with act.trace(db_events = trace_cfg_items, encoding=locale.getpreferredencoding()): - con_lock_1.execute_immediate( f'update {target_obj} set id=id where id = 5' % locals() ) + with act.db.connect() as con_lock_1, act.db.connect() as con_lock_2, act.db.connect() as con_monitoring: - worker_sql = f''' - set list on; - set autoddl off; - set term ^; - execute block returns (whoami varchar(30)) as - begin - whoami = 'WORKER'; -- , ATT#' || current_connection; - rdb$set_context('USER_SESSION','WHO', whoami); - -- suspend; - end - ^ - set term ;^ - commit; - --set echo on; - SET KEEP_TRAN_PARAMS ON; - set transaction read committed read consistency; - --select current_connection, current_transaction from rdb$database; - set list off; - set wng off; - --set plan on; - set count on; + tpb_monitoring = tpb(isolation=Isolation.READ_COMMITTED_RECORD_VERSION, lock_timeout=0) + tx_monitoring = con_monitoring.transaction_manager(tpb_monitoring) + cur_monitoring = tx_monitoring.cursor() - -- delete from {target_obj} where id < 0 or id >= 3 order by id; -- THIS MUST BE LOCKED - {SQL_TO_BE_RESTARTED}; - - -- check results: - -- ############### - - select id from {target_obj} order by id; -- this will produce output only after all lockers do their commit/rollback - - select v.old_id, v.op, v.snap_no_rank - from v_worker_log v - where v.op = 'del'; - - set width who 10; - -- DO NOT check this! Values can differ here from one run to another! - -- select id, trn, who, old_id, new_id, op, rec_vers, global_cn, snap_no from tlog_done order by id; - - rollback; - - ''' - - fn_worker_sql.write_text(worker_sql) - - with fn_worker_log.open(mode='w') as hang_out, fn_worker_err.open(mode='w') as hang_err: - - ############################################################################ - ### L A U N C H W O R K E R U S I N G I S Q L, A S Y N C. ### - ############################################################################ - p_worker = subprocess.Popen([act.vars['isql'], '-i', str(fn_worker_sql), - '-user', act.db.user, - '-password', act.db.password, - act.db.dsn - ], - stdout = hang_out, - stderr = hang_err - ) - wait_for_attach_showup_in_monitoring(con_monitoring, SQL_TAG_THAT_WE_WAITING_FOR) - # >>> !!!THIS WAS WRONG!!! >>> time.sleep(1) - - ######################### - ### L O C K E R - 2 ### - ######################### - # Insert ID value that is less than previous min(id). - # Session-worker is executing its statement using PLAN ORDER, - # and it should see this new value and restart its statement: - con_lock_2.execute_immediate( f'insert into {target_obj}(id) values(-1)' % locals() ) - con_lock_2.commit() - con_lock_2.execute_immediate( f'update {target_obj} set id=id where id = -1' % locals() ) + for i,c in enumerate((con_lock_1,con_lock_2)): + sttm = f"execute block as begin rdb$set_context('USER_SESSION', 'WHO', 'LOCKER #{i+1}'); end" + c.execute_immediate(sttm) ######################### ### L O C K E R - 1 ### ######################### - con_lock_1.commit() - con_lock_1.execute_immediate( f'insert into {target_obj}(id) values(-2)' % locals() ) - con_lock_1.commit() - con_lock_1.execute_immediate( f'update {target_obj} set id=id where id = -2' % locals() ) + con_lock_1.execute_immediate( f'update {target_obj} set id=id where id = 5' ) + + worker_sql = f''' + set list on; + set autoddl off; + set term ^; + execute block returns (whoami varchar(30)) as + begin + whoami = 'WORKER'; -- , ATT#' || current_connection; + rdb$set_context('USER_SESSION','WHO', whoami); + -- suspend; + end + ^ + set term ;^ + commit; + --set echo on; + SET KEEP_TRAN_PARAMS ON; + set transaction read committed read consistency; + --select current_connection, current_transaction from rdb$database; + set list off; + set wng off; + --set plan on; + set count on; + + -- delete from {target_obj} where id < 0 or id >= 3 order by id; -- THIS MUST BE LOCKED + {SQL_TO_BE_RESTARTED}; + + -- check results: + -- ############### + select id from {target_obj} order by id; -- this will produce output only after all lockers do their commit/rollback + + select v.old_id, v.op, v.snap_no_rank + from v_worker_log v + where v.op = 'del'; + + set width who 10; + -- DO NOT check this! Values can differ here from one run to another! + -- select id, trn, who, old_id, new_id, op, rec_vers, global_cn, snap_no from tlog_done order by id; + + rollback; + + ''' + + fn_worker_sql.write_text(worker_sql) + + with fn_worker_log.open(mode='w') as hang_out, fn_worker_err.open(mode='w') as hang_err: + + ############################################################################ + ### L A U N C H W O R K E R U S I N G I S Q L, A S Y N C. ### + ############################################################################ + p_worker = subprocess.Popen([act.vars['isql'], '-i', str(fn_worker_sql), + '-user', act.db.user, + '-password', act.db.password, + act.db.dsn + ], + stdout = hang_out, + stderr = hang_err + ) + # NB: when ISQL will establish attach, first record that it must lock is ID = 3 -- see above SQL_TO_BE_RESTARTED + # We must to ensure that this (worker) attachment has been really created and LOCKS this record: + # + wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=3', SQL_TAG_THAT_WE_WAITING_FOR) - ######################### - ### L O C K E R - 2 ### - ######################### - # Insert ID value that is less than previous min(id). - # Session-worker is executing its statement using PLAN ORDER, - # and it should see this new value and restart its statement: - con_lock_2.commit() - con_lock_2.execute_immediate( f'insert into {target_obj}(id) values(-3)' % locals() ) - con_lock_2.commit() - con_lock_2.execute_immediate( f'update {target_obj} set id=id where id = -3' % locals() ) + ######################### + ### L O C K E R - 2 ### + ######################### + # Insert ID value that is less than previous min(id). + # Session-worker is executing its statement using PLAN ORDER, + # and it should see this new value and restart its statement: + con_lock_2.execute_immediate( f'insert into {target_obj}(id) values(-1)' ) + con_lock_2.commit() + con_lock_2.execute_immediate( f'update {target_obj} set id=id where id = -1' ) - con_lock_1.commit() - con_lock_2.commit() + ######################### + ### L O C K E R - 1 ### + ######################### + con_lock_1.commit() # releases record with ID = 5 ==> now it can be locked by worker. - # Here we wait for ISQL complete its mission: - p_worker.wait() + # We have to WAIT HERE until worker will actually 'catch' just released record with ID = 5. + # + wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=5', SQL_TAG_THAT_WE_WAITING_FOR) + # If we come here then it means that record with ID = 5 for sure is locked by WORKER. + + con_lock_1.execute_immediate( f'insert into {target_obj}(id) values(-2)' ) + con_lock_1.commit() + con_lock_1.execute_immediate( f'update {target_obj} set id=id where id = -2' ) - for g in (fn_worker_log, fn_worker_err): - with g.open() as f: - for line in f: - if line.split(): - if g == fn_worker_log: - print(f'checked_mode: {checked_mode}, STDLOG: {line}') - else: - print(f'UNEXPECTED STDERR {line}') + ######################### + ### L O C K E R - 2 ### + ######################### + # Insert ID value that is less than previous min(id). + # Session-worker is executing its statement using PLAN ORDER, + # and it should see this new value and restart its statement: + con_lock_2.commit() - act.expected_stdout = expected_stdout - act.stdout = capsys.readouterr().out - assert act.clean_stdout == act.clean_expected_stdout + # We have to WAIT HERE until worker will actually 'catch' just released record with ID = -1. + # + wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=-1', SQL_TAG_THAT_WE_WAITING_FOR) + # If we come here then it means that record with ID = -1 for sure is locked by WORKER. + + con_lock_2.execute_immediate( f'insert into {target_obj}(id) values(-3)' ) + con_lock_2.commit() + con_lock_2.execute_immediate( f'update {target_obj} set id=id where id = -3' ) + + ######################### + ### L O C K E R - 1 ### + ######################### + con_lock_1.commit() + + # We have to WAIT HERE until worker will actually 'catch' just released record with ID = -2. + # + wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=-2', SQL_TAG_THAT_WE_WAITING_FOR) + # If we come here then it means that record with ID = -2 for sure is locked by WORKER. + + + con_lock_2.commit() # WORKER will complete his job after this + + # Here we wait for ISQL complete its mission: + p_worker.wait() + + #< with act.db.connect() + + + for g in (fn_worker_log, fn_worker_err): + with g.open() as f: + for line in f: + if line.split(): + if g == fn_worker_log: + print(f'checked_mode: {checked_mode}, STDLOG: {line}') + else: + print(f'UNEXPECTED STDERR {line}') + + expected_stdout_worker = f""" + checked_mode: {checked_mode}, STDLOG: Records affected: 6 + + checked_mode: {checked_mode}, STDLOG: ID + checked_mode: {checked_mode}, STDLOG: ======= + checked_mode: {checked_mode}, STDLOG: 1 + checked_mode: {checked_mode}, STDLOG: 2 + checked_mode: {checked_mode}, STDLOG: Records affected: 2 + + checked_mode: {checked_mode}, STDLOG: OLD_ID OP SNAP_NO_RANK + checked_mode: {checked_mode}, STDLOG: ======= ====== ===================== + checked_mode: {checked_mode}, STDLOG: 3 DEL 1 + checked_mode: {checked_mode}, STDLOG: 4 DEL 1 + checked_mode: {checked_mode}, STDLOG: -3 DEL 2 + checked_mode: {checked_mode}, STDLOG: -2 DEL 2 + checked_mode: {checked_mode}, STDLOG: -1 DEL 2 + checked_mode: {checked_mode}, STDLOG: 3 DEL 2 + checked_mode: {checked_mode}, STDLOG: 4 DEL 2 + checked_mode: {checked_mode}, STDLOG: 5 DEL 2 + checked_mode: {checked_mode}, STDLOG: Records affected: 8 + """ + + act.expected_stdout = expected_stdout_worker + act.stdout = capsys.readouterr().out + assert act.clean_stdout == act.clean_expected_stdout + act.reset() + + # < with act.trace + + allowed_patterns = \ + [ + '\\)\\s+EXECUTE_STATEMENT_RESTART$' + ,re.escape(SQL_TO_BE_RESTARTED) + ,'^Restarted \\d+ time\\(s\\)' + ] + allowed_patterns = [re.compile(x) for x in allowed_patterns] + + for line in act.trace_log: + if line.strip(): + if act.match_any(line.strip(), allowed_patterns): + print(line.strip()) + + expected_stdout_trace = f""" + {SQL_TO_BE_RESTARTED} + + EXECUTE_STATEMENT_RESTART + {SQL_TO_BE_RESTARTED} + Restarted 1 time(s) + + EXECUTE_STATEMENT_RESTART + {SQL_TO_BE_RESTARTED} + Restarted 2 time(s) + + EXECUTE_STATEMENT_RESTART + {SQL_TO_BE_RESTARTED} + Restarted 3 time(s) + + EXECUTE_STATEMENT_RESTART + {SQL_TO_BE_RESTARTED} + Restarted 4 time(s) + + {SQL_TO_BE_RESTARTED} + """ + + act.expected_stdout = expected_stdout_trace + act.stdout = capsys.readouterr().out + assert act.clean_stdout == act.clean_expected_stdout + act.reset() + + # < for checked_mode in('table', 'view')