6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 21:43:06 +01:00

Added/Updated tests\functional\transactions\test_read_consist_sttm_restart_on_delete_02.py: one more re-implementing in order to prevent raises and get execution order predictable. See notes.

This commit is contained in:
pavel-zotov 2023-09-26 11:28:59 +03:00
parent 3ba2df6b63
commit d36db9712a

View File

@ -115,34 +115,58 @@ DESCRIPTION:
NOTE: concrete values of fields TRN, GLOBAL_CN and SNAP_NO in the TLOG_DONE can differ from one to another run! NOTE: concrete values of fields TRN, GLOBAL_CN and SNAP_NO in the TLOG_DONE can differ from one to another run!
This is because of concurrent nature of connections that work against database. We must not assume that these values will be constant. This is because of concurrent nature of connections that work against database. We must not assume that these values will be constant.
################
Checked on 4.0.0.2144 SS/CS
FBTEST: functional.transactions.read_consist_sttm_restart_on_delete_02 FBTEST: functional.transactions.read_consist_sttm_restart_on_delete_02
NOTES: NOTES:
[28.07.2022] pzotov [28.07.2022] pzotov
Checked on 4.0.1.2692, 5.0.0.591 Checked on 4.0.1.2692, 5.0.0.591
[23.09.2023] pzotov [25.09.2023] pzotov
Replaced verification method of worker attachment presense (which tries DML and waits for resource). 1. Added trace launch and its parsing in order to get number of times when WORKER statement did restart.
This attachment is created by *asynchronously* launched ISQL thus using of time.sleep(1) is COMPLETELY wrong. See commits:
Loop with query to mon$statements is used instead (we search for record which SQL_TEXT contains 'special tag', see variable SQL_TAG_THAT_WE_WAITING_FOR). 1) FB 4.x (23-JUN-2022, 4.0.2.2782): https://github.com/FirebirdSQL/firebird/commit/95b8623adbf129d0730a50a18b4f1cf9976ac35c
Maximal duration of this loop is limited by variable 'MAX_WAIT_FOR_WORKER_START_MS'. 2) FB 5.x (27-JUN-2022, 5.0.0.555): https://github.com/FirebirdSQL/firebird/commit/f121cd4a6b40b1639f560c6c38a057c4e68bb3df
Many thanks to Vlad for suggestions.
Trace must contain several groups, each with similar lines:
<timestamp> (<trace_memory_address>) EXECUTE_STATEMENT_RESTART
{SQL_TO_BE_RESTARTED}
Restarted <N> time(s)
2. To prevent raises between concurrent transactions, it is necessary to ensure that code:
* does not allow LOCKER-2 to start its work until WORKER session will establish connection and - moreover - will actually locks first record
from the scope that is seen by the query that we want to be executed by worker.
* does not allow LOCKER-1 to do something after LOCKER-2 issued commit (and thus released record): we first have to ensure that this record
now is locked by WORKER. The same when record was occupied by LOCKER-2 and then is released: LOCKER-1 must not do smth until WORKER will
encounter this record and 'catch' it.
This is done by calls to function 'wait_for_record_become_locked()' which are performed by separate 'monitoring' connection with starting Tx
with NO_WAIT mode and catching exception with further parsing. In case when record has been already occupied (by WORKER) this exception will
have form "deadlock / -update conflicts ... / -concurrent transaction number is <N>". We can then obtain number of this transaction and query
mon$statements for get MON$SQL_TEXT that is runnig by this Tx. If it contains contains 'special tag' (see variable SQL_TAG_THAT_WE_WAITING_FOR)
then we can be sure that WORKER really did establish connection and successfully locked row with required ID.
Table 'TLOG_WANT' (which is fulfilled by trigger TEST_BIUD using in autonomous tx) can NOT be used for detection of moments when WORKER
actually locks records which he was waiting for: this trigger fires BEFORE actual updating occurs, i.e. when record become seeon by WORKER
but is still occupied by some LOCKER ("[DOC]: c) engine continue to evaluate remaining records ... and put write locks on it too")
NB! Worker transaction must running in WAIT mode - in contrary to Tx that we start in our monitoring loop.
Checked on WI-T6.0.0.48, WI-T5.0.0.1211, WI-V4.0.4.2988. Checked on WI-T6.0.0.48, WI-T5.0.0.1211, WI-V4.0.4.2988.
""" """
import subprocess import subprocess
import pytest import re
from firebird.qa import *
from pathlib import Path from pathlib import Path
import time import time
import datetime as py_dt import datetime as py_dt
import locale
import pytest
from firebird.qa import *
from firebird.driver import tpb, Isolation, TraAccessMode, DatabaseError
db = db_factory() db = db_factory()
act = python_act('db', substitutions=[('=', ''), ('[ \t]+', ' ')]) act = python_act( 'db', substitutions = [ ('=', ''), ('[ \t]+', ' '), ('.* EXECUTE_STATEMENT_RESTART', 'EXECUTE_STATEMENT_RESTART') ] )
#act = python_act( 'db', substitutions = [ ('.* EXECUTE_STATEMENT_RESTART', 'EXECUTE_STATEMENT_RESTART') ] )
MAX_WAIT_FOR_WORKER_START_MS = 10000; MAX_WAIT_FOR_WORKER_START_MS = 10000;
SQL_TAG_THAT_WE_WAITING_FOR = 'SQL_TAG_THAT_WE_WAITING_FOR' SQL_TAG_THAT_WE_WAITING_FOR = 'SQL_TAG_THAT_WE_WAITING_FOR'
@ -152,72 +176,55 @@ fn_worker_sql = temp_file('tmp_worker.sql')
fn_worker_log = temp_file('tmp_worker.log') fn_worker_log = temp_file('tmp_worker.log')
fn_worker_err = temp_file('tmp_worker.err') fn_worker_err = temp_file('tmp_worker.err')
expected_stdout = """ #-----------------------------------------------------------------------------------------------------------------------------------------------------
checked_mode: table, STDLOG: Records affected: 6
checked_mode: table, STDLOG: ID def wait_for_record_become_locked(tx_monitoring, cur_monitoring, sql_to_lock_record, SQL_TAG_THAT_WE_WAITING_FOR):
checked_mode: table, STDLOG: =======
checked_mode: table, STDLOG: 1
checked_mode: table, STDLOG: 2
checked_mode: table, STDLOG: Records affected: 2
checked_mode: table, STDLOG: OLD_ID OP SNAP_NO_RANK # ::: NB :::
checked_mode: table, STDLOG: ======= ====== ===================== # tx_monitoring must work in NOWAIT mode!
checked_mode: table, STDLOG: 3 DEL 1
checked_mode: table, STDLOG: 4 DEL 1
checked_mode: table, STDLOG: -3 DEL 2
checked_mode: table, STDLOG: -2 DEL 2
checked_mode: table, STDLOG: -1 DEL 2
checked_mode: table, STDLOG: 3 DEL 2
checked_mode: table, STDLOG: 4 DEL 2
checked_mode: table, STDLOG: 5 DEL 2
checked_mode: table, STDLOG: Records affected: 8
checked_mode: view, STDLOG: Records affected: 6
checked_mode: view, STDLOG: ID
checked_mode: view, STDLOG: =======
checked_mode: view, STDLOG: 1
checked_mode: view, STDLOG: 2
checked_mode: view, STDLOG: Records affected: 2
checked_mode: view, STDLOG: OLD_ID OP SNAP_NO_RANK
checked_mode: view, STDLOG: ======= ====== =====================
checked_mode: view, STDLOG: 3 DEL 1
checked_mode: view, STDLOG: 4 DEL 1
checked_mode: view, STDLOG: -3 DEL 2
checked_mode: view, STDLOG: -2 DEL 2
checked_mode: view, STDLOG: -1 DEL 2
checked_mode: view, STDLOG: 3 DEL 2
checked_mode: view, STDLOG: 4 DEL 2
checked_mode: view, STDLOG: 5 DEL 2
checked_mode: view, STDLOG: Records affected: 8
"""
def wait_for_attach_showup_in_monitoring(con_monitoring, sql_text_tag):
chk_sql = f"select 1 from mon$statements s where s.mon$attachment_id != current_connection and s.mon$sql_text containing '{sql_text_tag}'"
attach_with_sql_tag = None
t1=py_dt.datetime.now() t1=py_dt.datetime.now()
cur_monitoring = con_monitoring.cursor() required_concurrent_found = None
concurrent_tx_pattern = re.compile('concurrent transaction number is \\d+', re.IGNORECASE)
while True: while True:
cur_monitoring.execute(chk_sql) concurrent_tx_number = None
for r in cur_monitoring: concurrent_runsql = ''
attach_with_sql_tag = r[0] tx_monitoring.begin()
if not attach_with_sql_tag: try:
cur_monitoring.execute(sql_to_lock_record)
except DatabaseError as exc:
# Failed: SQL execution failed with: deadlock
# -update conflicts with concurrent update
# -concurrent transaction number is 40
m = concurrent_tx_pattern.search( str(exc) )
if m:
concurrent_tx_number = m.group().split()[-1] # 'concurrent transaction number is 40' ==> '40'
cur_monitoring.execute( 'select mon$sql_text from mon$statements where mon$transaction_id = ?', (int(concurrent_tx_number),) )
for r in cur_monitoring:
concurrent_runsql = r[0]
if SQL_TAG_THAT_WE_WAITING_FOR in concurrent_runsql:
required_concurrent_found = 1
# pytest.fail(f"Can not upd, concurrent TX = {concurrent_tx_number}, sql: {concurrent_runsql}")
finally:
tx_monitoring.rollback()
if not required_concurrent_found:
t2=py_dt.datetime.now() t2=py_dt.datetime.now()
d1=t2-t1 d1=t2-t1
if d1.seconds*1000 + d1.microseconds//1000 >= MAX_WAIT_FOR_WORKER_START_MS: if d1.seconds*1000 + d1.microseconds//1000 >= MAX_WAIT_FOR_WORKER_START_MS:
break break
else: else:
con_monitoring.commit()
time.sleep(0.2) time.sleep(0.2)
else: else:
break break
assert attach_with_sql_tag, f"Could not find attach statement containing '{sql_text_tag}' for {MAX_WAIT_FOR_WORKER_START_MS} ms. ABEND." assert required_concurrent_found, f"Could not find attach that running SQL with tag '{SQL_TAG_THAT_WE_WAITING_FOR}' and locks record for {MAX_WAIT_FOR_WORKER_START_MS} ms. Check query: {sql_to_lock_record}. ABEND."
return return
@pytest.mark.version('>=4.0') #-----------------------------------------------------------------------------------------------------------------------------------------------------
@pytest.mark.version('>=4.0.2')
def test_1(act: Action, fn_worker_sql: Path, fn_worker_log: Path, fn_worker_err: Path, capsys): def test_1(act: Action, fn_worker_sql: Path, fn_worker_log: Path, fn_worker_err: Path, capsys):
sql_init = (act.files_dir / 'read-consist-sttm-restart-DDL.sql').read_text() sql_init = (act.files_dir / 'read-consist-sttm-restart-DDL.sql').read_text()
@ -246,123 +253,229 @@ def test_1(act: Action, fn_worker_sql: Path, fn_worker_log: Path, fn_worker_err:
assert act.stderr == '' assert act.stderr == ''
act.reset() act.reset()
with act.db.connect() as con_lock_1, act.db.connect() as con_lock_2, act.db.connect() as con_monitoring: trace_cfg_items = [
for i,c in enumerate((con_lock_1,con_lock_2)): 'time_threshold = 0',
sttm = f"execute block as begin rdb$set_context('USER_SESSION', 'WHO', 'LOCKER #{i+1}'); end" 'log_errors = true',
c.execute_immediate(sttm) 'log_statement_start = true',
'log_statement_finish = true',
]
######################### with act.trace(db_events = trace_cfg_items, encoding=locale.getpreferredencoding()):
### L O C K E R - 1 ###
#########################
con_lock_1.execute_immediate( f'update {target_obj} set id=id where id = 5' % locals() ) with act.db.connect() as con_lock_1, act.db.connect() as con_lock_2, act.db.connect() as con_monitoring:
worker_sql = f''' tpb_monitoring = tpb(isolation=Isolation.READ_COMMITTED_RECORD_VERSION, lock_timeout=0)
set list on; tx_monitoring = con_monitoring.transaction_manager(tpb_monitoring)
set autoddl off; cur_monitoring = tx_monitoring.cursor()
set term ^;
execute block returns (whoami varchar(30)) as
begin
whoami = 'WORKER'; -- , ATT#' || current_connection;
rdb$set_context('USER_SESSION','WHO', whoami);
-- suspend;
end
^
set term ;^
commit;
--set echo on;
SET KEEP_TRAN_PARAMS ON;
set transaction read committed read consistency;
--select current_connection, current_transaction from rdb$database;
set list off;
set wng off;
--set plan on;
set count on;
-- delete from {target_obj} where id < 0 or id >= 3 order by id; -- THIS MUST BE LOCKED for i,c in enumerate((con_lock_1,con_lock_2)):
{SQL_TO_BE_RESTARTED}; sttm = f"execute block as begin rdb$set_context('USER_SESSION', 'WHO', 'LOCKER #{i+1}'); end"
c.execute_immediate(sttm)
-- check results:
-- ###############
select id from {target_obj} order by id; -- this will produce output only after all lockers do their commit/rollback
select v.old_id, v.op, v.snap_no_rank
from v_worker_log v
where v.op = 'del';
set width who 10;
-- DO NOT check this! Values can differ here from one run to another!
-- select id, trn, who, old_id, new_id, op, rec_vers, global_cn, snap_no from tlog_done order by id;
rollback;
'''
fn_worker_sql.write_text(worker_sql)
with fn_worker_log.open(mode='w') as hang_out, fn_worker_err.open(mode='w') as hang_err:
############################################################################
### L A U N C H W O R K E R U S I N G I S Q L, A S Y N C. ###
############################################################################
p_worker = subprocess.Popen([act.vars['isql'], '-i', str(fn_worker_sql),
'-user', act.db.user,
'-password', act.db.password,
act.db.dsn
],
stdout = hang_out,
stderr = hang_err
)
wait_for_attach_showup_in_monitoring(con_monitoring, SQL_TAG_THAT_WE_WAITING_FOR)
# >>> !!!THIS WAS WRONG!!! >>> time.sleep(1)
#########################
### L O C K E R - 2 ###
#########################
# Insert ID value that is less than previous min(id).
# Session-worker is executing its statement using PLAN ORDER,
# and it should see this new value and restart its statement:
con_lock_2.execute_immediate( f'insert into {target_obj}(id) values(-1)' % locals() )
con_lock_2.commit()
con_lock_2.execute_immediate( f'update {target_obj} set id=id where id = -1' % locals() )
######################### #########################
### L O C K E R - 1 ### ### L O C K E R - 1 ###
######################### #########################
con_lock_1.commit() con_lock_1.execute_immediate( f'update {target_obj} set id=id where id = 5' )
con_lock_1.execute_immediate( f'insert into {target_obj}(id) values(-2)' % locals() )
con_lock_1.commit() worker_sql = f'''
con_lock_1.execute_immediate( f'update {target_obj} set id=id where id = -2' % locals() ) set list on;
set autoddl off;
set term ^;
execute block returns (whoami varchar(30)) as
begin
whoami = 'WORKER'; -- , ATT#' || current_connection;
rdb$set_context('USER_SESSION','WHO', whoami);
-- suspend;
end
^
set term ;^
commit;
--set echo on;
SET KEEP_TRAN_PARAMS ON;
set transaction read committed read consistency;
--select current_connection, current_transaction from rdb$database;
set list off;
set wng off;
--set plan on;
set count on;
-- delete from {target_obj} where id < 0 or id >= 3 order by id; -- THIS MUST BE LOCKED
{SQL_TO_BE_RESTARTED};
-- check results:
-- ###############
select id from {target_obj} order by id; -- this will produce output only after all lockers do their commit/rollback
select v.old_id, v.op, v.snap_no_rank
from v_worker_log v
where v.op = 'del';
set width who 10;
-- DO NOT check this! Values can differ here from one run to another!
-- select id, trn, who, old_id, new_id, op, rec_vers, global_cn, snap_no from tlog_done order by id;
rollback;
'''
fn_worker_sql.write_text(worker_sql)
with fn_worker_log.open(mode='w') as hang_out, fn_worker_err.open(mode='w') as hang_err:
############################################################################
### L A U N C H W O R K E R U S I N G I S Q L, A S Y N C. ###
############################################################################
p_worker = subprocess.Popen([act.vars['isql'], '-i', str(fn_worker_sql),
'-user', act.db.user,
'-password', act.db.password,
act.db.dsn
],
stdout = hang_out,
stderr = hang_err
)
# NB: when ISQL will establish attach, first record that it must lock is ID = 3 -- see above SQL_TO_BE_RESTARTED
# We must to ensure that this (worker) attachment has been really created and LOCKS this record:
#
wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=3', SQL_TAG_THAT_WE_WAITING_FOR)
######################### #########################
### L O C K E R - 2 ### ### L O C K E R - 2 ###
######################### #########################
# Insert ID value that is less than previous min(id). # Insert ID value that is less than previous min(id).
# Session-worker is executing its statement using PLAN ORDER, # Session-worker is executing its statement using PLAN ORDER,
# and it should see this new value and restart its statement: # and it should see this new value and restart its statement:
con_lock_2.commit() con_lock_2.execute_immediate( f'insert into {target_obj}(id) values(-1)' )
con_lock_2.execute_immediate( f'insert into {target_obj}(id) values(-3)' % locals() ) con_lock_2.commit()
con_lock_2.commit() con_lock_2.execute_immediate( f'update {target_obj} set id=id where id = -1' )
con_lock_2.execute_immediate( f'update {target_obj} set id=id where id = -3' % locals() )
con_lock_1.commit() #########################
con_lock_2.commit() ### L O C K E R - 1 ###
#########################
con_lock_1.commit() # releases record with ID = 5 ==> now it can be locked by worker.
# Here we wait for ISQL complete its mission: # We have to WAIT HERE until worker will actually 'catch' just released record with ID = 5.
p_worker.wait() #
wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=5', SQL_TAG_THAT_WE_WAITING_FOR)
# If we come here then it means that record with ID = 5 for sure is locked by WORKER.
con_lock_1.execute_immediate( f'insert into {target_obj}(id) values(-2)' )
con_lock_1.commit()
con_lock_1.execute_immediate( f'update {target_obj} set id=id where id = -2' )
for g in (fn_worker_log, fn_worker_err): #########################
with g.open() as f: ### L O C K E R - 2 ###
for line in f: #########################
if line.split(): # Insert ID value that is less than previous min(id).
if g == fn_worker_log: # Session-worker is executing its statement using PLAN ORDER,
print(f'checked_mode: {checked_mode}, STDLOG: {line}') # and it should see this new value and restart its statement:
else: con_lock_2.commit()
print(f'UNEXPECTED STDERR {line}')
act.expected_stdout = expected_stdout # We have to WAIT HERE until worker will actually 'catch' just released record with ID = -1.
act.stdout = capsys.readouterr().out #
assert act.clean_stdout == act.clean_expected_stdout wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=-1', SQL_TAG_THAT_WE_WAITING_FOR)
# If we come here then it means that record with ID = -1 for sure is locked by WORKER.
con_lock_2.execute_immediate( f'insert into {target_obj}(id) values(-3)' )
con_lock_2.commit()
con_lock_2.execute_immediate( f'update {target_obj} set id=id where id = -3' )
#########################
### L O C K E R - 1 ###
#########################
con_lock_1.commit()
# We have to WAIT HERE until worker will actually 'catch' just released record with ID = -2.
#
wait_for_record_become_locked(tx_monitoring, cur_monitoring, f'update {target_obj} set id=id where id=-2', SQL_TAG_THAT_WE_WAITING_FOR)
# If we come here then it means that record with ID = -2 for sure is locked by WORKER.
con_lock_2.commit() # WORKER will complete his job after this
# Here we wait for ISQL complete its mission:
p_worker.wait()
#< with act.db.connect()
for g in (fn_worker_log, fn_worker_err):
with g.open() as f:
for line in f:
if line.split():
if g == fn_worker_log:
print(f'checked_mode: {checked_mode}, STDLOG: {line}')
else:
print(f'UNEXPECTED STDERR {line}')
expected_stdout_worker = f"""
checked_mode: {checked_mode}, STDLOG: Records affected: 6
checked_mode: {checked_mode}, STDLOG: ID
checked_mode: {checked_mode}, STDLOG: =======
checked_mode: {checked_mode}, STDLOG: 1
checked_mode: {checked_mode}, STDLOG: 2
checked_mode: {checked_mode}, STDLOG: Records affected: 2
checked_mode: {checked_mode}, STDLOG: OLD_ID OP SNAP_NO_RANK
checked_mode: {checked_mode}, STDLOG: ======= ====== =====================
checked_mode: {checked_mode}, STDLOG: 3 DEL 1
checked_mode: {checked_mode}, STDLOG: 4 DEL 1
checked_mode: {checked_mode}, STDLOG: -3 DEL 2
checked_mode: {checked_mode}, STDLOG: -2 DEL 2
checked_mode: {checked_mode}, STDLOG: -1 DEL 2
checked_mode: {checked_mode}, STDLOG: 3 DEL 2
checked_mode: {checked_mode}, STDLOG: 4 DEL 2
checked_mode: {checked_mode}, STDLOG: 5 DEL 2
checked_mode: {checked_mode}, STDLOG: Records affected: 8
"""
act.expected_stdout = expected_stdout_worker
act.stdout = capsys.readouterr().out
assert act.clean_stdout == act.clean_expected_stdout
act.reset()
# < with act.trace
allowed_patterns = \
[
'\\)\\s+EXECUTE_STATEMENT_RESTART$'
,re.escape(SQL_TO_BE_RESTARTED)
,'^Restarted \\d+ time\\(s\\)'
]
allowed_patterns = [re.compile(x) for x in allowed_patterns]
for line in act.trace_log:
if line.strip():
if act.match_any(line.strip(), allowed_patterns):
print(line.strip())
expected_stdout_trace = f"""
{SQL_TO_BE_RESTARTED}
EXECUTE_STATEMENT_RESTART
{SQL_TO_BE_RESTARTED}
Restarted 1 time(s)
EXECUTE_STATEMENT_RESTART
{SQL_TO_BE_RESTARTED}
Restarted 2 time(s)
EXECUTE_STATEMENT_RESTART
{SQL_TO_BE_RESTARTED}
Restarted 3 time(s)
EXECUTE_STATEMENT_RESTART
{SQL_TO_BE_RESTARTED}
Restarted 4 time(s)
{SQL_TO_BE_RESTARTED}
"""
act.expected_stdout = expected_stdout_trace
act.stdout = capsys.readouterr().out
assert act.clean_stdout == act.clean_expected_stdout
act.reset()
# < for checked_mode in('table', 'view')