diff --git a/tests/functional/transactions/test_read_consist_sttm_restart_on_update_04.py b/tests/functional/transactions/test_read_consist_sttm_restart_on_update_04.py index e9038864..7aec1312 100644 --- a/tests/functional/transactions/test_read_consist_sttm_restart_on_update_04.py +++ b/tests/functional/transactions/test_read_consist_sttm_restart_on_update_04.py @@ -121,34 +121,50 @@ DESCRIPTION: NOTE: concrete values of fields TRN, GLOBAL_CN and SNAP_NO in the TLOG_DONE can differ from one to another run! This is because of concurrent nature of connections that work against database. We must not assume that these values will be constant. - ################ - - Checked on 4.0.0.2195 SS/CS - 26.09.2020: added for-loop in order to check different target objects: TABLE ('test') and VIEW ('v_test'), see 'checked_mode'. FBTEST: functional.transactions.read_consist_sttm_restart_on_update_04 NOTES: - [29.07.2022] pzotov - Checked on 4.0.1.2692, 5.0.0.591 + [26.09.2020] pzotov + Added for-loop in order to check different target objects: TABLE ('test') and VIEW ('v_test'), see 'checked_mode'. [23.09.2023] pzotov Replaced verification method of worker attachment presense (which tries DML and waits for resource). - This attachment is created by *asynchronously* launched ISQL thus using of time.sleep(1) is COMPLETELY wrong. - Loop with query to mon$statements is used instead (we search for record which SQL_TEXT contains 'special tag', see variable SQL_TAG_THAT_WE_WAITING_FOR). - Maximal duration of this loop is limited by variable 'MAX_WAIT_FOR_WORKER_START_MS'. Many thanks to Vlad for suggestions. + [25.09.2023] pzotov + 1. Added trace launch and its parsing in order to get number of times when WORKER statement did restart. + 2. To prevent raises between concurrent transactions, it is necessary to ensure that code: + * does not allow LOCKER-2 to start its work until WORKER session will establish connection and - moreover - will actually locks first record + from the scope that is seen by the query that we want to be executed by worker. + * does not allow LOCKER-1 to do something after LOCKER-2 issued commit (and thus released record): we first have to ensure that this record + now is locked by WORKER. The same when record was occupied by LOCKER-2 and then is released: LOCKER-1 must not do smth until WORKER will + encounter this record and 'catch' it. + This is done by calls to function 'wait_for_record_become_locked()' which are performed by separate 'monitoring' connection with starting Tx + with NO_WAIT mode and catching exception with further parsing. In case when record has been already occupied (by WORKER) this exception will + have form "deadlock / -update conflicts ... / -concurrent transaction number is ". We can then obtain number of this transaction and query + mon$statements for get MON$SQL_TEXT that is runnig by this Tx. If it contains contains 'special tag' (see variable SQL_TAG_THAT_WE_WAITING_FOR) + then we can be sure that WORKER really did establish connection and successfully locked row with required ID. + + Table 'TLOG_WANT' (which is fulfilled by trigger TEST_BIUD using in autonomous tx) can NOT be used for detection of moments when WORKER + actually locks records which he was waiting for: this trigger fires BEFORE actual updating occurs, i.e. when record become seeon by WORKER + but is still occupied by some LOCKER ("[DOC]: c) engine continue to evaluate remaining records ... and put write locks on it too") + + NB! Worker transaction must running in WAIT mode - in contrary to Tx that we start in our monitoring loop. Checked on WI-T6.0.0.48, WI-T5.0.0.1211, WI-V4.0.4.2988. """ import subprocess -import pytest -from firebird.qa import * +import re from pathlib import Path import time import datetime as py_dt +import locale + +import pytest +from firebird.qa import * +from firebird.driver import tpb, Isolation, TraAccessMode, DatabaseError db = db_factory() -act = python_act('db', substitutions=[('=', ''), ('[ \t]+', ' ')]) +act = python_act( 'db', substitutions = [ ('=', ''), ('[ \t]+', ' '), ('.* EXECUTE_STATEMENT_RESTART', 'EXECUTE_STATEMENT_RESTART') ] ) MAX_WAIT_FOR_WORKER_START_MS = 10000; SQL_TAG_THAT_WE_WAITING_FOR = 'SQL_TAG_THAT_WE_WAITING_FOR' @@ -158,83 +174,55 @@ fn_worker_sql = temp_file('tmp_worker.sql') fn_worker_log = temp_file('tmp_worker.log') fn_worker_err = temp_file('tmp_worker.err') -expected_stdout = """ - checked_mode: table, STDLOG: Records affected: 4 +#----------------------------------------------------------------------------------------------------------------------------------------------------- +def wait_for_record_become_locked(tx_monitoring, cur_monitoring, sql_to_lock_record, SQL_TAG_THAT_WE_WAITING_FOR): - checked_mode: table, STDLOG: ID - checked_mode: table, STDLOG: ======= - checked_mode: table, STDLOG: -5 - checked_mode: table, STDLOG: -2 - checked_mode: table, STDLOG: -1 - checked_mode: table, STDLOG: 3 - checked_mode: table, STDLOG: 4 - checked_mode: table, STDLOG: Records affected: 5 + # sql_to_lock_record: f'update {target_obj} set id=id where id=1' + # ::: NB ::: + # tx_monitoring must work in NOWAIT mode! - checked_mode: table, STDLOG: OLD_ID OP SNAP_NO_RANK - checked_mode: table, STDLOG: ======= ====== ===================== - checked_mode: table, STDLOG: 2 UPD 1 - checked_mode: table, STDLOG: 2 UPD 2 - checked_mode: table, STDLOG: 1 UPD 2 - checked_mode: table, STDLOG: 2 UPD 3 - checked_mode: table, STDLOG: 1 UPD 3 - checked_mode: table, STDLOG: 2 UPD 4 - checked_mode: table, STDLOG: 1 UPD 4 - checked_mode: table, STDLOG: -3 UPD 4 - checked_mode: table, STDLOG: -4 UPD 4 - checked_mode: table, STDLOG: Records affected: 9 - - - - checked_mode: view, STDLOG: Records affected: 4 - - - checked_mode: view, STDLOG: ID - checked_mode: view, STDLOG: ======= - checked_mode: view, STDLOG: -5 - checked_mode: view, STDLOG: -2 - checked_mode: view, STDLOG: -1 - checked_mode: view, STDLOG: 3 - checked_mode: view, STDLOG: 4 - checked_mode: view, STDLOG: Records affected: 5 - - checked_mode: view, STDLOG: OLD_ID OP SNAP_NO_RANK - checked_mode: view, STDLOG: ======= ====== ===================== - checked_mode: view, STDLOG: 2 UPD 1 - checked_mode: view, STDLOG: 2 UPD 2 - checked_mode: view, STDLOG: 1 UPD 2 - checked_mode: view, STDLOG: 2 UPD 3 - checked_mode: view, STDLOG: 1 UPD 3 - checked_mode: view, STDLOG: 2 UPD 4 - checked_mode: view, STDLOG: 1 UPD 4 - checked_mode: view, STDLOG: -3 UPD 4 - checked_mode: view, STDLOG: -4 UPD 4 - checked_mode: view, STDLOG: Records affected: 9 -""" - -def wait_for_attach_showup_in_monitoring(con_monitoring, sql_text_tag): - chk_sql = f"select 1 from mon$statements s where s.mon$attachment_id != current_connection and s.mon$sql_text containing '{sql_text_tag}'" - attach_with_sql_tag = None t1=py_dt.datetime.now() - cur_monitoring = con_monitoring.cursor() + required_concurrent_found = None + concurrent_tx_pattern = re.compile('concurrent transaction number is \\d+', re.IGNORECASE) while True: - cur_monitoring.execute(chk_sql) - for r in cur_monitoring: - attach_with_sql_tag = r[0] - if not attach_with_sql_tag: + concurrent_tx_number = None + concurrent_runsql = '' + tx_monitoring.begin() + try: + cur_monitoring.execute(sql_to_lock_record) + except DatabaseError as exc: + # Failed: SQL execution failed with: deadlock + # -update conflicts with concurrent update + # -concurrent transaction number is 40 + m = concurrent_tx_pattern.search( str(exc) ) + if m: + concurrent_tx_number = m.group().split()[-1] # 'concurrent transaction number is 40' ==> '40' + cur_monitoring.execute( 'select mon$sql_text from mon$statements where mon$transaction_id = ?', (int(concurrent_tx_number),) ) + for r in cur_monitoring: + concurrent_runsql = r[0] + if SQL_TAG_THAT_WE_WAITING_FOR in concurrent_runsql: + required_concurrent_found = 1 + + # pytest.fail(f"Can not upd, concurrent TX = {concurrent_tx_number}, sql: {concurrent_runsql}") + finally: + tx_monitoring.rollback() + + if not required_concurrent_found: t2=py_dt.datetime.now() d1=t2-t1 if d1.seconds*1000 + d1.microseconds//1000 >= MAX_WAIT_FOR_WORKER_START_MS: break else: - con_monitoring.commit() time.sleep(0.2) else: break - - assert attach_with_sql_tag, f"Could not find attach statement containing '{sql_text_tag}' for {MAX_WAIT_FOR_WORKER_START_MS} ms. ABEND." + + assert required_concurrent_found, f"Could not find attach that running SQL with tag '{SQL_TAG_THAT_WE_WAITING_FOR}' and locks record for {MAX_WAIT_FOR_WORKER_START_MS} ms. ABEND." return +#----------------------------------------------------------------------------------------------------------------------------------------------------- + @pytest.mark.version('>=4.0') def test_1(act: Action, fn_worker_sql: Path, fn_worker_log: Path, fn_worker_err: Path, capsys): sql_init = (act.files_dir / 'read-consist-sttm-restart-DDL.sql').read_text() @@ -264,117 +252,218 @@ def test_1(act: Action, fn_worker_sql: Path, fn_worker_log: Path, fn_worker_err: assert act.stderr == '' act.reset() - with act.db.connect() as con_lock_1, act.db.connect() as con_lock_2, act.db.connect() as con_monitoring: - for i,c in enumerate((con_lock_1,con_lock_2)): - sttm = f"execute block as begin rdb$set_context('USER_SESSION', 'WHO', 'LOCKER #{i+1}'); end" - c.execute_immediate(sttm) + trace_cfg_items = [ + 'time_threshold = 0', + 'log_errors = true', + 'log_statement_start = true', + 'log_statement_finish = true', + ] + with act.trace(db_events = trace_cfg_items, encoding=locale.getpreferredencoding()): - ######################### - ### L O C K E R - 1 ### - ######################### + with act.db.connect() as con_lock_1, act.db.connect() as con_lock_2, act.db.connect() as con_monitoring: + + tpb_monitoring = tpb(isolation=Isolation.READ_COMMITTED_RECORD_VERSION, lock_timeout=0) + tx_monitoring = con_monitoring.transaction_manager(tpb_monitoring) + cur_monitoring = tx_monitoring.cursor() - con_lock_1.execute_immediate( f'update {target_obj} set id=id where id=1' ) + for i,c in enumerate((con_lock_1,con_lock_2)): + sttm = f"execute block as begin rdb$set_context('USER_SESSION', 'WHO', 'LOCKER #{i+1}'); end" + c.execute_immediate(sttm) - worker_sql = f''' - set list on; - set autoddl off; - set term ^; - execute block returns (whoami varchar(30)) as - begin - whoami = 'WORKER'; -- , ATT#' || current_connection; - rdb$set_context('USER_SESSION','WHO', whoami); - -- suspend; - end - ^ - set term ;^ - commit; - SET KEEP_TRAN_PARAMS ON; - set transaction read committed read consistency; - --select current_connection, current_transaction from rdb$database; - set list off; - set wng off; - - --set plan on; - set count on; - - -- this must hang because of locker-1: - {SQL_TO_BE_RESTARTED}; - - -- check results: - -- ############### - - select id from {target_obj} order by id; -- one record must remain, with ID = -5 - - select v.old_id, v.op, v.snap_no_rank -- snap_no_rank must have four unique values: 1,2,3 and 4. - from v_worker_log v - where v.op = 'upd'; - - --set width who 10; - -- DO NOT check this! Values can differ here from one run to another! - -- select id, trn, who, old_id, new_id, op, rec_vers, global_cn, snap_no from tlog_done order by id; - rollback; - - ''' - - fn_worker_sql.write_text(worker_sql) - - with fn_worker_log.open(mode='w') as hang_out, fn_worker_err.open(mode='w') as hang_err: - - ############################################################################ - ### L A U N C H W O R K E R U S I N G I S Q L, A S Y N C. ### - ############################################################################ - p_worker = subprocess.Popen([act.vars['isql'], '-i', str(fn_worker_sql), - '-user', act.db.user, - '-password', act.db.password, - '-pag', '999999', - act.db.dsn - ], - stdout = hang_out, - stderr = hang_err - ) - - wait_for_attach_showup_in_monitoring(con_monitoring, SQL_TAG_THAT_WE_WAITING_FOR) ######################### - ### L O C K E R - 2 ### + ### L O C K E R - 1 ### ######################### - # Change ID so that it **will* be included in the set of rows that must be affected by session-worker: - con_lock_2.execute_immediate( f'update {target_obj} set id = -5 where abs(id) = 5;' ) - con_lock_2.commit() - con_lock_2.execute_immediate( f'update {target_obj} set id = id where abs(id) = 5;' ) + con_lock_1.execute_immediate( f'update {target_obj} set id=id where id=1' ) + + worker_sql = f''' + set list on; + set autoddl off; + set term ^; + execute block returns (whoami varchar(30)) as + begin + whoami = 'WORKER'; -- , ATT#' || current_connection; + rdb$set_context('USER_SESSION','WHO', whoami); + -- suspend; + end + ^ + set term ;^ + commit; + SET KEEP_TRAN_PARAMS ON; + set transaction read committed read consistency; + --select current_connection, current_transaction from rdb$database; + set list off; + set wng off; + + --set plan on; + set count on; + + -- this must hang because of locker-1: + {SQL_TO_BE_RESTARTED}; + + -- check results: + -- ############### + select id from {target_obj} order by id; -- one record must remain, with ID = -5 + + select v.old_id, v.op, v.snap_no_rank -- snap_no_rank must have four unique values: 1,2,3 and 4. + from v_worker_log v + where v.op = 'upd'; + + --set width who 10; + -- DO NOT check this! Values can differ here from one run to another! + -- select id, trn, who, old_id, new_id, op, rec_vers, global_cn, snap_no from tlog_done order by id; + rollback; + + ''' + + fn_worker_sql.write_text(worker_sql) + + with fn_worker_log.open(mode='w') as hang_out, fn_worker_err.open(mode='w') as hang_err: + + ############################################################################ + ### L A U N C H W O R K E R U S I N G I S Q L, A S Y N C. ### + ############################################################################ + p_worker = subprocess.Popen([act.vars['isql'], '-i', str(fn_worker_sql), + '-user', act.db.user, + '-password', act.db.password, + '-pag', '999999', + act.db.dsn + ], + stdout = hang_out, + stderr = hang_err + ) + + # NB: when ISQL will establish attach, first record that it must lock is ID = 2 -- see above SQL_TO_BE_RESTARTED + # We must to ensure that this (worker) attachment has been really created and LOCKS this record: + # + wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=2', SQL_TAG_THAT_WE_WAITING_FOR) + + ######################### + ### L O C K E R - 2 ### + ######################### + + # Change ID so that it **will* be included in the set of rows that must be affected by session-worker: + con_lock_2.execute_immediate( f'update {target_obj} set id = -5 where abs(id) = 5;' ) + con_lock_2.commit() + con_lock_2.execute_immediate( f'update {target_obj} set id = id where abs(id) = 5;' ) + con_lock_1.commit() # releases record with ID=1 ==> now it can be locked by worker. + + # We have to WAIT HERE until worker will actually 'catch' just released record with ID = 1. + # + wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=1', SQL_TAG_THAT_WE_WAITING_FOR) + + # If we come here then it means that record with ID = 1 for sure is locked by WORKER. + + # Change ID so that it **will* be included in the set of rows that must be affected by session-worker: + con_lock_1.execute_immediate( f'update {target_obj} set id = -4 where abs(id) = 4;' ) + con_lock_1.commit() + con_lock_1.execute_immediate( f'update {target_obj} set id = id where abs(id) = 4;' ) - con_lock_1.commit() # releases record with ID=1 (allow it to be deleted by session-worker) + con_lock_2.commit() # releases record with ID = -5, but session-worker is waiting for record with ID = -4 (that was changed by locker-1). - # Change ID so that it **will* be included in the set of rows that must be affected by session-worker: - con_lock_1.execute_immediate( f'update {target_obj} set id = -4 where abs(id) = 4;' ) - con_lock_1.commit() - con_lock_1.execute_immediate( f'update {target_obj} set id = id where abs(id) = 4;' ) + # We have to WAIT HERE until worker will actually 'catch' just released record with ID = -5: + # + wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id = -5', SQL_TAG_THAT_WE_WAITING_FOR) + # If we come here then it means that record with ID = -5 for sure is locked by WORKER. + + con_lock_2.execute_immediate( f'update {target_obj} set id = -3 where abs(id) = 3;' ) + con_lock_2.commit() + con_lock_2.execute_immediate( f'update {target_obj} set id = id where abs(id) = 3;' ) - con_lock_2.commit() # releases record with ID = -5, but session-worker is waiting for record with ID = -4 (that was changed by locker-1). - con_lock_2.execute_immediate( f'update {target_obj} set id = -3 where abs(id) = 3;' ) - con_lock_2.commit() - con_lock_2.execute_immediate( f'update {target_obj} set id = id where abs(id) = 3;' ) + con_lock_1.commit() # This releases row with ID=-4 but session-worker is waiting for ID = - 3 (changed by locker-2). - con_lock_1.commit() # This releases row with ID=-4 but session-worker is waiting for ID = - 3 (changed by locker-2). - con_lock_2.commit() # This releases row with ID=-3. No more locked rows so session-worker can finish its mission. + # We have to WAIT HERE until worker will actually 'catch' just released record with ID = -4: + # + wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id = -4', SQL_TAG_THAT_WE_WAITING_FOR) + + # If we come here then it means that record with ID = -4 for sure is locked by WORKER. + + con_lock_2.commit() # This releases row with ID=-3. No more locked rows so session-worker can finish its mission. - # Here we wait for ISQL complete its mission: - p_worker.wait() + # Here we wait for ISQL complete its mission: + p_worker.wait() + #< with act.db.connect() - for g in (fn_worker_log, fn_worker_err): - with g.open() as f: - for line in f: - if line.split(): - if g == fn_worker_log: - print(f'checked_mode: {checked_mode}, STDLOG: {line}') - else: - print(f'UNEXPECTED STDERR {line}') + for g in (fn_worker_log, fn_worker_err): + with g.open() as f: + for line in f: + if line.split(): + if g == fn_worker_log: + print(f'checked_mode: {checked_mode}, STDLOG: {line}') + else: + print(f'UNEXPECTED STDERR {line}') - act.expected_stdout = expected_stdout - act.stdout = capsys.readouterr().out - assert act.clean_stdout == act.clean_expected_stdout + expected_stdout_worker = f""" + checked_mode: {checked_mode}, STDLOG: Records affected: 4 + + checked_mode: {checked_mode}, STDLOG: ID + checked_mode: {checked_mode}, STDLOG: ======= + checked_mode: {checked_mode}, STDLOG: -5 + checked_mode: {checked_mode}, STDLOG: -2 + checked_mode: {checked_mode}, STDLOG: -1 + checked_mode: {checked_mode}, STDLOG: 3 + checked_mode: {checked_mode}, STDLOG: 4 + checked_mode: {checked_mode}, STDLOG: Records affected: 5 + + checked_mode: {checked_mode}, STDLOG: OLD_ID OP SNAP_NO_RANK + checked_mode: {checked_mode}, STDLOG: ======= ====== ===================== + checked_mode: {checked_mode}, STDLOG: 2 UPD 1 + checked_mode: {checked_mode}, STDLOG: 2 UPD 2 + checked_mode: {checked_mode}, STDLOG: 1 UPD 2 + checked_mode: {checked_mode}, STDLOG: 2 UPD 3 + checked_mode: {checked_mode}, STDLOG: 1 UPD 3 + checked_mode: {checked_mode}, STDLOG: 2 UPD 4 + checked_mode: {checked_mode}, STDLOG: 1 UPD 4 + checked_mode: {checked_mode}, STDLOG: -3 UPD 4 + checked_mode: {checked_mode}, STDLOG: -4 UPD 4 + checked_mode: {checked_mode}, STDLOG: Records affected: 9 + """ + act.expected_stdout = expected_stdout_worker + act.stdout = capsys.readouterr().out + assert act.clean_stdout == act.clean_expected_stdout + act.reset() + + #< with act.trace + + allowed_patterns = \ + [ + '\\)\\s+EXECUTE_STATEMENT_RESTART$' + ,re.escape(SQL_TO_BE_RESTARTED) + ,'^Restarted \\d+ time\\(s\\)' + ] + allowed_patterns = [re.compile(x) for x in allowed_patterns] + + for line in act.trace_log: + if line.strip(): + if act.match_any(line.strip(), allowed_patterns): + print(line.strip()) + + expected_stdout_trace = f""" + {SQL_TO_BE_RESTARTED} + + 2023-09-25T02:09:26.1800 (23240:0000000005062FC0) EXECUTE_STATEMENT_RESTART + {SQL_TO_BE_RESTARTED} + Restarted 1 time(s) + + 2023-09-25T02:09:26.1860 (23240:0000000005062FC0) EXECUTE_STATEMENT_RESTART + {SQL_TO_BE_RESTARTED} + Restarted 2 time(s) + + 2023-09-25T02:09:26.1910 (23240:0000000005062FC0) EXECUTE_STATEMENT_RESTART + {SQL_TO_BE_RESTARTED} + Restarted 3 time(s) + + {SQL_TO_BE_RESTARTED} + """ + + act.expected_stdout = expected_stdout_trace + act.stdout = capsys.readouterr().out + assert act.clean_stdout == act.clean_expected_stdout + act.reset() + + #< for checked_mode in('table', 'view')