6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 13:33:07 +01:00

Added/Updated functional\gtcs\test_transactions_autocommit_2.py: checked on 4.0.1.2692, 3.0.8.33535

This commit is contained in:
zotov 2022-04-29 20:24:14 +03:00
parent c62b6a98e5
commit 64fd89305f

View File

@ -23,133 +23,59 @@ FBTEST: functional.gtcs.transactions_autocommit_2
import pytest
from firebird.qa import *
from firebird.driver import *
db = db_factory()
init_script = """
set bail on;
recreate table test_1 (x integer);
recreate table test_2 (x integer);
recreate table test_3 (x integer);
create unique index test_3_x_uniq on test_3 (x);
commit;
set term ^;
create or alter trigger trg_test1_ai for test_1 active after insert position 0 as
begin
insert into test_2 values (new.x * 10);
insert into test_3 values (new.x * 100);
end ^
set term ;^
insert into test_3 values (1000);
commit;
"""
db = db_factory(init=init_script)
act = python_act('db', substitutions=[('[ \t]+', ' ')])
expected_stdout = """
mon$auto_commit: 1
exception occured, gdscode: 335544349
exception occured, gdscodes: (335544349, 335545072, 335544842)
test_3 1000
"""
@pytest.mark.skip('FIXME: Not IMPLEMENTED')
@pytest.mark.version('>=3')
def test_1(act: Action):
pytest.fail("Not IMPLEMENTED")
custom_tpb = TPB(lock_timeout = 0, auto_commit=True).get_buffer()
# test_script_1
#---
#
# import os
# import sys
# import subprocess
# import inspect
# import time
#
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
# db_conn.close()
#
# #--------------------------------------------
#
# def flush_and_close( file_handle ):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
# #--------------------------------------------
#
# def cleanup( f_names_list ):
# global os
# for f in f_names_list:
# if type(f) == file:
# del_name = f.name
# elif type(f) == str:
# del_name = f
# else:
# print('Unrecognized type of element:', f, ' - can not be treated as file.')
# del_name = None
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
# #--------------------------------------------
#
# sql_init='''
# set bail on;
# recreate table test_1 (x integer);
# recreate table test_2 (x integer);
# recreate table test_3 (x integer);
# create unique index test_3_x_uniq on test_3 (x);
# commit;
# set term ^;
# create or alter trigger trg_test1_ai for test_1 active after insert position 0 as
# begin
# insert into test_2 values (new.x * 10);
# insert into test_3 values (new.x * 100);
# end ^
# set term ;^
#
# insert into test_3 values (1000);
# commit;
# '''
#
# f_init_sql = open( os.path.join(context['temp_directory'],'tmp_gtcs_tx_ac2.sql'), 'w', buffering = 0)
# f_init_sql.write( sql_init )
# flush_and_close( f_init_sql )
#
# f_init_log = open( '.'.join( (os.path.splitext( f_init_sql.name )[0], 'log') ), 'w', buffering = 0)
# f_init_err = open( '.'.join( (os.path.splitext( f_init_sql.name )[0], 'err') ), 'w', buffering = 0)
#
# # This can take about 25-30 seconds:
# ####################################
# subprocess.call( [ context['isql_path'], dsn, '-q', '-i', f_init_sql.name ], stdout = f_init_log, stderr = f_init_err)
#
# flush_and_close( f_init_log )
# flush_and_close( f_init_err )
#
# #CUSTOM_TX_PARAMS = ( [ fdb.isc_tpb_read_committed, fdb.isc_tpb_no_rec_version, fdb.isc_tpb_nowait, fdb.isc_tpb_autocommit ] )
# CUSTOM_TX_PARAMS = ( [ fdb.isc_tpb_nowait, fdb.isc_tpb_autocommit ] )
#
# con = fdb.connect( dsn = dsn )
# tx = con.trans( default_tpb = CUSTOM_TX_PARAMS )
#
# tx.begin()
# cx=tx.cursor()
#
# cx.execute('select mon$auto_commit from mon$transactions where mon$transaction_id = current_transaction')
# for r in cx:
# print( 'mon$auto_commit:', r[0] )
#
# try:
# cx.execute( 'insert into test_1 values(?)', (10,) ) # this leads to PK/UK violation in the table 'test_3'
# except Exception as e:
# #print('exception in ', inspect.stack()[0][3], ': ', sys.exc_info()[0])
# print('exception occured, gdscode:', e[2])
#
# tx.commit()
#
# cx.execute("select 'test_1' tab_name, x from test_1 union all select 'test_2', x from test_2 union all select 'test_3', x from test_3")
# for r in cx:
# print( r[0], r[1] )
#
# cx.close()
# tx.close()
# con.close()
#
# # cleanup
# #########
# time.sleep(1)
# cleanup( ( f_init_sql, f_init_log, f_init_err) )
#
#---
@pytest.mark.version('>=3')
def test_1(act: Action, capsys):
with act.db.connect() as con:
con.begin(custom_tpb)
with con.cursor() as cur:
for r in cur.execute('select mon$auto_commit from mon$transactions where mon$transaction_id = current_transaction').fetchall():
print('mon$auto_commit:', r[0])
try:
cur.execute( 'insert into test_1 values(?)', (10,) ) # this leads to PK/UK violation in the table 'test_3'
except DatabaseError as e:
print('exception occured, gdscodes:', e.gds_codes)
con.commit()
with con.cursor() as cur:
for r in cur.execute("select 'test_1' tab_name, x from test_1 union all select 'test_2', x from test_2 union all select 'test_3', x from test_3").fetchall():
print(r[0], r[1])
assert act.clean_string(capsys.readouterr().out, act.substitutions) == act.clean_string(expected_stdout, act.substitutions)