6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 21:43:06 +01:00

Added/Updated tests\functional\transactions\test_read_consist_sttm_restart_on_update_04.py: one more re-implementing in order to prevent raises and get execution order predictable. See notes.

This commit is contained in:
pavel-zotov 2023-09-25 02:18:42 +03:00
parent f7714da147
commit da9edd067d

View File

@ -121,34 +121,50 @@ DESCRIPTION:
NOTE: concrete values of fields TRN, GLOBAL_CN and SNAP_NO in the TLOG_DONE can differ from one to another run!
This is because of concurrent nature of connections that work against database. We must not assume that these values will be constant.
################
Checked on 4.0.0.2195 SS/CS
26.09.2020: added for-loop in order to check different target objects: TABLE ('test') and VIEW ('v_test'), see 'checked_mode'.
FBTEST: functional.transactions.read_consist_sttm_restart_on_update_04
NOTES:
[29.07.2022] pzotov
Checked on 4.0.1.2692, 5.0.0.591
[26.09.2020] pzotov
Added for-loop in order to check different target objects: TABLE ('test') and VIEW ('v_test'), see 'checked_mode'.
[23.09.2023] pzotov
Replaced verification method of worker attachment presense (which tries DML and waits for resource).
This attachment is created by *asynchronously* launched ISQL thus using of time.sleep(1) is COMPLETELY wrong.
Loop with query to mon$statements is used instead (we search for record which SQL_TEXT contains 'special tag', see variable SQL_TAG_THAT_WE_WAITING_FOR).
Maximal duration of this loop is limited by variable 'MAX_WAIT_FOR_WORKER_START_MS'.
Many thanks to Vlad for suggestions.
[25.09.2023] pzotov
1. Added trace launch and its parsing in order to get number of times when WORKER statement did restart.
2. To prevent raises between concurrent transactions, it is necessary to ensure that code:
* does not allow LOCKER-2 to start its work until WORKER session will establish connection and - moreover - will actually locks first record
from the scope that is seen by the query that we want to be executed by worker.
* does not allow LOCKER-1 to do something after LOCKER-2 issued commit (and thus released record): we first have to ensure that this record
now is locked by WORKER. The same when record was occupied by LOCKER-2 and then is released: LOCKER-1 must not do smth until WORKER will
encounter this record and 'catch' it.
This is done by calls to function 'wait_for_record_become_locked()' which are performed by separate 'monitoring' connection with starting Tx
with NO_WAIT mode and catching exception with further parsing. In case when record has been already occupied (by WORKER) this exception will
have form "deadlock / -update conflicts ... / -concurrent transaction number is <N>". We can then obtain number of this transaction and query
mon$statements for get MON$SQL_TEXT that is runnig by this Tx. If it contains contains 'special tag' (see variable SQL_TAG_THAT_WE_WAITING_FOR)
then we can be sure that WORKER really did establish connection and successfully locked row with required ID.
Table 'TLOG_WANT' (which is fulfilled by trigger TEST_BIUD using in autonomous tx) can NOT be used for detection of moments when WORKER
actually locks records which he was waiting for: this trigger fires BEFORE actual updating occurs, i.e. when record become seeon by WORKER
but is still occupied by some LOCKER ("[DOC]: c) engine continue to evaluate remaining records ... and put write locks on it too")
NB! Worker transaction must running in WAIT mode - in contrary to Tx that we start in our monitoring loop.
Checked on WI-T6.0.0.48, WI-T5.0.0.1211, WI-V4.0.4.2988.
"""
import subprocess
import pytest
from firebird.qa import *
import re
from pathlib import Path
import time
import datetime as py_dt
import locale
import pytest
from firebird.qa import *
from firebird.driver import tpb, Isolation, TraAccessMode, DatabaseError
db = db_factory()
act = python_act('db', substitutions=[('=', ''), ('[ \t]+', ' ')])
act = python_act( 'db', substitutions = [ ('=', ''), ('[ \t]+', ' '), ('.* EXECUTE_STATEMENT_RESTART', 'EXECUTE_STATEMENT_RESTART') ] )
MAX_WAIT_FOR_WORKER_START_MS = 10000;
SQL_TAG_THAT_WE_WAITING_FOR = 'SQL_TAG_THAT_WE_WAITING_FOR'
@ -158,83 +174,55 @@ fn_worker_sql = temp_file('tmp_worker.sql')
fn_worker_log = temp_file('tmp_worker.log')
fn_worker_err = temp_file('tmp_worker.err')
expected_stdout = """
checked_mode: table, STDLOG: Records affected: 4
#-----------------------------------------------------------------------------------------------------------------------------------------------------
def wait_for_record_become_locked(tx_monitoring, cur_monitoring, sql_to_lock_record, SQL_TAG_THAT_WE_WAITING_FOR):
checked_mode: table, STDLOG: ID
checked_mode: table, STDLOG: =======
checked_mode: table, STDLOG: -5
checked_mode: table, STDLOG: -2
checked_mode: table, STDLOG: -1
checked_mode: table, STDLOG: 3
checked_mode: table, STDLOG: 4
checked_mode: table, STDLOG: Records affected: 5
# sql_to_lock_record: f'update {target_obj} set id=id where id=1'
# ::: NB :::
# tx_monitoring must work in NOWAIT mode!
checked_mode: table, STDLOG: OLD_ID OP SNAP_NO_RANK
checked_mode: table, STDLOG: ======= ====== =====================
checked_mode: table, STDLOG: 2 UPD 1
checked_mode: table, STDLOG: 2 UPD 2
checked_mode: table, STDLOG: 1 UPD 2
checked_mode: table, STDLOG: 2 UPD 3
checked_mode: table, STDLOG: 1 UPD 3
checked_mode: table, STDLOG: 2 UPD 4
checked_mode: table, STDLOG: 1 UPD 4
checked_mode: table, STDLOG: -3 UPD 4
checked_mode: table, STDLOG: -4 UPD 4
checked_mode: table, STDLOG: Records affected: 9
checked_mode: view, STDLOG: Records affected: 4
checked_mode: view, STDLOG: ID
checked_mode: view, STDLOG: =======
checked_mode: view, STDLOG: -5
checked_mode: view, STDLOG: -2
checked_mode: view, STDLOG: -1
checked_mode: view, STDLOG: 3
checked_mode: view, STDLOG: 4
checked_mode: view, STDLOG: Records affected: 5
checked_mode: view, STDLOG: OLD_ID OP SNAP_NO_RANK
checked_mode: view, STDLOG: ======= ====== =====================
checked_mode: view, STDLOG: 2 UPD 1
checked_mode: view, STDLOG: 2 UPD 2
checked_mode: view, STDLOG: 1 UPD 2
checked_mode: view, STDLOG: 2 UPD 3
checked_mode: view, STDLOG: 1 UPD 3
checked_mode: view, STDLOG: 2 UPD 4
checked_mode: view, STDLOG: 1 UPD 4
checked_mode: view, STDLOG: -3 UPD 4
checked_mode: view, STDLOG: -4 UPD 4
checked_mode: view, STDLOG: Records affected: 9
"""
def wait_for_attach_showup_in_monitoring(con_monitoring, sql_text_tag):
chk_sql = f"select 1 from mon$statements s where s.mon$attachment_id != current_connection and s.mon$sql_text containing '{sql_text_tag}'"
attach_with_sql_tag = None
t1=py_dt.datetime.now()
cur_monitoring = con_monitoring.cursor()
required_concurrent_found = None
concurrent_tx_pattern = re.compile('concurrent transaction number is \\d+', re.IGNORECASE)
while True:
cur_monitoring.execute(chk_sql)
for r in cur_monitoring:
attach_with_sql_tag = r[0]
if not attach_with_sql_tag:
concurrent_tx_number = None
concurrent_runsql = ''
tx_monitoring.begin()
try:
cur_monitoring.execute(sql_to_lock_record)
except DatabaseError as exc:
# Failed: SQL execution failed with: deadlock
# -update conflicts with concurrent update
# -concurrent transaction number is 40
m = concurrent_tx_pattern.search( str(exc) )
if m:
concurrent_tx_number = m.group().split()[-1] # 'concurrent transaction number is 40' ==> '40'
cur_monitoring.execute( 'select mon$sql_text from mon$statements where mon$transaction_id = ?', (int(concurrent_tx_number),) )
for r in cur_monitoring:
concurrent_runsql = r[0]
if SQL_TAG_THAT_WE_WAITING_FOR in concurrent_runsql:
required_concurrent_found = 1
# pytest.fail(f"Can not upd, concurrent TX = {concurrent_tx_number}, sql: {concurrent_runsql}")
finally:
tx_monitoring.rollback()
if not required_concurrent_found:
t2=py_dt.datetime.now()
d1=t2-t1
if d1.seconds*1000 + d1.microseconds//1000 >= MAX_WAIT_FOR_WORKER_START_MS:
break
else:
con_monitoring.commit()
time.sleep(0.2)
else:
break
assert attach_with_sql_tag, f"Could not find attach statement containing '{sql_text_tag}' for {MAX_WAIT_FOR_WORKER_START_MS} ms. ABEND."
assert required_concurrent_found, f"Could not find attach that running SQL with tag '{SQL_TAG_THAT_WE_WAITING_FOR}' and locks record for {MAX_WAIT_FOR_WORKER_START_MS} ms. ABEND."
return
#-----------------------------------------------------------------------------------------------------------------------------------------------------
@pytest.mark.version('>=4.0')
def test_1(act: Action, fn_worker_sql: Path, fn_worker_log: Path, fn_worker_err: Path, capsys):
sql_init = (act.files_dir / 'read-consist-sttm-restart-DDL.sql').read_text()
@ -264,117 +252,218 @@ def test_1(act: Action, fn_worker_sql: Path, fn_worker_log: Path, fn_worker_err:
assert act.stderr == ''
act.reset()
with act.db.connect() as con_lock_1, act.db.connect() as con_lock_2, act.db.connect() as con_monitoring:
for i,c in enumerate((con_lock_1,con_lock_2)):
sttm = f"execute block as begin rdb$set_context('USER_SESSION', 'WHO', 'LOCKER #{i+1}'); end"
c.execute_immediate(sttm)
trace_cfg_items = [
'time_threshold = 0',
'log_errors = true',
'log_statement_start = true',
'log_statement_finish = true',
]
with act.trace(db_events = trace_cfg_items, encoding=locale.getpreferredencoding()):
#########################
### L O C K E R - 1 ###
#########################
with act.db.connect() as con_lock_1, act.db.connect() as con_lock_2, act.db.connect() as con_monitoring:
tpb_monitoring = tpb(isolation=Isolation.READ_COMMITTED_RECORD_VERSION, lock_timeout=0)
tx_monitoring = con_monitoring.transaction_manager(tpb_monitoring)
cur_monitoring = tx_monitoring.cursor()
con_lock_1.execute_immediate( f'update {target_obj} set id=id where id=1' )
for i,c in enumerate((con_lock_1,con_lock_2)):
sttm = f"execute block as begin rdb$set_context('USER_SESSION', 'WHO', 'LOCKER #{i+1}'); end"
c.execute_immediate(sttm)
worker_sql = f'''
set list on;
set autoddl off;
set term ^;
execute block returns (whoami varchar(30)) as
begin
whoami = 'WORKER'; -- , ATT#' || current_connection;
rdb$set_context('USER_SESSION','WHO', whoami);
-- suspend;
end
^
set term ;^
commit;
SET KEEP_TRAN_PARAMS ON;
set transaction read committed read consistency;
--select current_connection, current_transaction from rdb$database;
set list off;
set wng off;
--set plan on;
set count on;
-- this must hang because of locker-1:
{SQL_TO_BE_RESTARTED};
-- check results:
-- ###############
select id from {target_obj} order by id; -- one record must remain, with ID = -5
select v.old_id, v.op, v.snap_no_rank -- snap_no_rank must have four unique values: 1,2,3 and 4.
from v_worker_log v
where v.op = 'upd';
--set width who 10;
-- DO NOT check this! Values can differ here from one run to another!
-- select id, trn, who, old_id, new_id, op, rec_vers, global_cn, snap_no from tlog_done order by id;
rollback;
'''
fn_worker_sql.write_text(worker_sql)
with fn_worker_log.open(mode='w') as hang_out, fn_worker_err.open(mode='w') as hang_err:
############################################################################
### L A U N C H W O R K E R U S I N G I S Q L, A S Y N C. ###
############################################################################
p_worker = subprocess.Popen([act.vars['isql'], '-i', str(fn_worker_sql),
'-user', act.db.user,
'-password', act.db.password,
'-pag', '999999',
act.db.dsn
],
stdout = hang_out,
stderr = hang_err
)
wait_for_attach_showup_in_monitoring(con_monitoring, SQL_TAG_THAT_WE_WAITING_FOR)
#########################
### L O C K E R - 2 ###
### L O C K E R - 1 ###
#########################
# Change ID so that it **will* be included in the set of rows that must be affected by session-worker:
con_lock_2.execute_immediate( f'update {target_obj} set id = -5 where abs(id) = 5;' )
con_lock_2.commit()
con_lock_2.execute_immediate( f'update {target_obj} set id = id where abs(id) = 5;' )
con_lock_1.execute_immediate( f'update {target_obj} set id=id where id=1' )
worker_sql = f'''
set list on;
set autoddl off;
set term ^;
execute block returns (whoami varchar(30)) as
begin
whoami = 'WORKER'; -- , ATT#' || current_connection;
rdb$set_context('USER_SESSION','WHO', whoami);
-- suspend;
end
^
set term ;^
commit;
SET KEEP_TRAN_PARAMS ON;
set transaction read committed read consistency;
--select current_connection, current_transaction from rdb$database;
set list off;
set wng off;
--set plan on;
set count on;
-- this must hang because of locker-1:
{SQL_TO_BE_RESTARTED};
-- check results:
-- ###############
select id from {target_obj} order by id; -- one record must remain, with ID = -5
select v.old_id, v.op, v.snap_no_rank -- snap_no_rank must have four unique values: 1,2,3 and 4.
from v_worker_log v
where v.op = 'upd';
--set width who 10;
-- DO NOT check this! Values can differ here from one run to another!
-- select id, trn, who, old_id, new_id, op, rec_vers, global_cn, snap_no from tlog_done order by id;
rollback;
'''
fn_worker_sql.write_text(worker_sql)
with fn_worker_log.open(mode='w') as hang_out, fn_worker_err.open(mode='w') as hang_err:
############################################################################
### L A U N C H W O R K E R U S I N G I S Q L, A S Y N C. ###
############################################################################
p_worker = subprocess.Popen([act.vars['isql'], '-i', str(fn_worker_sql),
'-user', act.db.user,
'-password', act.db.password,
'-pag', '999999',
act.db.dsn
],
stdout = hang_out,
stderr = hang_err
)
# NB: when ISQL will establish attach, first record that it must lock is ID = 2 -- see above SQL_TO_BE_RESTARTED
# We must to ensure that this (worker) attachment has been really created and LOCKS this record:
#
wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=2', SQL_TAG_THAT_WE_WAITING_FOR)
#########################
### L O C K E R - 2 ###
#########################
# Change ID so that it **will* be included in the set of rows that must be affected by session-worker:
con_lock_2.execute_immediate( f'update {target_obj} set id = -5 where abs(id) = 5;' )
con_lock_2.commit()
con_lock_2.execute_immediate( f'update {target_obj} set id = id where abs(id) = 5;' )
con_lock_1.commit() # releases record with ID=1 ==> now it can be locked by worker.
# We have to WAIT HERE until worker will actually 'catch' just released record with ID = 1.
#
wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=1', SQL_TAG_THAT_WE_WAITING_FOR)
# If we come here then it means that record with ID = 1 for sure is locked by WORKER.
# Change ID so that it **will* be included in the set of rows that must be affected by session-worker:
con_lock_1.execute_immediate( f'update {target_obj} set id = -4 where abs(id) = 4;' )
con_lock_1.commit()
con_lock_1.execute_immediate( f'update {target_obj} set id = id where abs(id) = 4;' )
con_lock_1.commit() # releases record with ID=1 (allow it to be deleted by session-worker)
con_lock_2.commit() # releases record with ID = -5, but session-worker is waiting for record with ID = -4 (that was changed by locker-1).
# Change ID so that it **will* be included in the set of rows that must be affected by session-worker:
con_lock_1.execute_immediate( f'update {target_obj} set id = -4 where abs(id) = 4;' )
con_lock_1.commit()
con_lock_1.execute_immediate( f'update {target_obj} set id = id where abs(id) = 4;' )
# We have to WAIT HERE until worker will actually 'catch' just released record with ID = -5:
#
wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id = -5', SQL_TAG_THAT_WE_WAITING_FOR)
# If we come here then it means that record with ID = -5 for sure is locked by WORKER.
con_lock_2.execute_immediate( f'update {target_obj} set id = -3 where abs(id) = 3;' )
con_lock_2.commit()
con_lock_2.execute_immediate( f'update {target_obj} set id = id where abs(id) = 3;' )
con_lock_2.commit() # releases record with ID = -5, but session-worker is waiting for record with ID = -4 (that was changed by locker-1).
con_lock_2.execute_immediate( f'update {target_obj} set id = -3 where abs(id) = 3;' )
con_lock_2.commit()
con_lock_2.execute_immediate( f'update {target_obj} set id = id where abs(id) = 3;' )
con_lock_1.commit() # This releases row with ID=-4 but session-worker is waiting for ID = - 3 (changed by locker-2).
con_lock_1.commit() # This releases row with ID=-4 but session-worker is waiting for ID = - 3 (changed by locker-2).
con_lock_2.commit() # This releases row with ID=-3. No more locked rows so session-worker can finish its mission.
# We have to WAIT HERE until worker will actually 'catch' just released record with ID = -4:
#
wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id = -4', SQL_TAG_THAT_WE_WAITING_FOR)
# If we come here then it means that record with ID = -4 for sure is locked by WORKER.
con_lock_2.commit() # This releases row with ID=-3. No more locked rows so session-worker can finish its mission.
# Here we wait for ISQL complete its mission:
p_worker.wait()
# Here we wait for ISQL complete its mission:
p_worker.wait()
#< with act.db.connect()
for g in (fn_worker_log, fn_worker_err):
with g.open() as f:
for line in f:
if line.split():
if g == fn_worker_log:
print(f'checked_mode: {checked_mode}, STDLOG: {line}')
else:
print(f'UNEXPECTED STDERR {line}')
for g in (fn_worker_log, fn_worker_err):
with g.open() as f:
for line in f:
if line.split():
if g == fn_worker_log:
print(f'checked_mode: {checked_mode}, STDLOG: {line}')
else:
print(f'UNEXPECTED STDERR {line}')
act.expected_stdout = expected_stdout
act.stdout = capsys.readouterr().out
assert act.clean_stdout == act.clean_expected_stdout
expected_stdout_worker = f"""
checked_mode: {checked_mode}, STDLOG: Records affected: 4
checked_mode: {checked_mode}, STDLOG: ID
checked_mode: {checked_mode}, STDLOG: =======
checked_mode: {checked_mode}, STDLOG: -5
checked_mode: {checked_mode}, STDLOG: -2
checked_mode: {checked_mode}, STDLOG: -1
checked_mode: {checked_mode}, STDLOG: 3
checked_mode: {checked_mode}, STDLOG: 4
checked_mode: {checked_mode}, STDLOG: Records affected: 5
checked_mode: {checked_mode}, STDLOG: OLD_ID OP SNAP_NO_RANK
checked_mode: {checked_mode}, STDLOG: ======= ====== =====================
checked_mode: {checked_mode}, STDLOG: 2 UPD 1
checked_mode: {checked_mode}, STDLOG: 2 UPD 2
checked_mode: {checked_mode}, STDLOG: 1 UPD 2
checked_mode: {checked_mode}, STDLOG: 2 UPD 3
checked_mode: {checked_mode}, STDLOG: 1 UPD 3
checked_mode: {checked_mode}, STDLOG: 2 UPD 4
checked_mode: {checked_mode}, STDLOG: 1 UPD 4
checked_mode: {checked_mode}, STDLOG: -3 UPD 4
checked_mode: {checked_mode}, STDLOG: -4 UPD 4
checked_mode: {checked_mode}, STDLOG: Records affected: 9
"""
act.expected_stdout = expected_stdout_worker
act.stdout = capsys.readouterr().out
assert act.clean_stdout == act.clean_expected_stdout
act.reset()
#< with act.trace
allowed_patterns = \
[
'\\)\\s+EXECUTE_STATEMENT_RESTART$'
,re.escape(SQL_TO_BE_RESTARTED)
,'^Restarted \\d+ time\\(s\\)'
]
allowed_patterns = [re.compile(x) for x in allowed_patterns]
for line in act.trace_log:
if line.strip():
if act.match_any(line.strip(), allowed_patterns):
print(line.strip())
expected_stdout_trace = f"""
{SQL_TO_BE_RESTARTED}
2023-09-25T02:09:26.1800 (23240:0000000005062FC0) EXECUTE_STATEMENT_RESTART
{SQL_TO_BE_RESTARTED}
Restarted 1 time(s)
2023-09-25T02:09:26.1860 (23240:0000000005062FC0) EXECUTE_STATEMENT_RESTART
{SQL_TO_BE_RESTARTED}
Restarted 2 time(s)
2023-09-25T02:09:26.1910 (23240:0000000005062FC0) EXECUTE_STATEMENT_RESTART
{SQL_TO_BE_RESTARTED}
Restarted 3 time(s)
{SQL_TO_BE_RESTARTED}
"""
act.expected_stdout = expected_stdout_trace
act.stdout = capsys.readouterr().out
assert act.clean_stdout == act.clean_expected_stdout
act.reset()
#< for checked_mode in('table', 'view')