#coding:utf-8 # # id: functional.gtcs.transactions_autocommit_2 # title: GTCS/tests/AUTO_COMMIT.2.ESQL. Changes within AUTO COMMIT must be cancelled when exception raises in some TRIGGER. # decription: # Original test see in: # https://github.com/FirebirdSQL/fbtcs/blob/master/GTCS/tests/AUTO_COMMIT.2.ESQL.script # # Test creates three tables (test_1, test_2 and test_3) and AI-trigger for one of them (test_1). # This trigger does INSERTs into test_2 and test_3. # For test_3 we create UNIQUE index that will prevent from insertion of duplicates. # Then we add one record into test_3 with value = 1000. # Finally, we try to add record into test_1 and after this INSERT its trigger attempts to add records, # into test_2 and test_3. The latter will fail because of UK violation (we try to insert apropriate value # into test-1 in order this exception be raised). # Expected result: NONE of just performed INSERTS must be saved in DB. The only existing record must be # in the table test_3 that we added there on initial phase. # # NB: we use custom TPB with fdb.isc_tpb_autocommit in order to start DML transactions in AUTOCOMMIT=1 mode. # Checked on: # 4.0.0.1812 SS: 2.054s. # 4.0.0.1767 SC: 1.893s. # 4.0.0.1810 SS: 1.922s. # 3.0.6.33273 SS: 0.973s. # 3.0.6.33240 SC: 1.082s. # 3.0.6.33247 CS: 2.120s. # 2.5.6.27020 SS: 2.612s. # 2.5.9.27149 SC: 0.453s. # 2.5.9.27143 CS: 0.963s. # # tracker_id: # min_versions: ['2.5.0'] # versions: 2.5 # qmid: None import pytest from firebird.qa import db_factory, python_act, Action # version: 2.5 # resources: None substitutions_1 = [('[ \t]+', ' ')] init_script_1 = """""" db_1 = db_factory(sql_dialect=3, init=init_script_1) # test_script_1 #--- # # import os # import sys # import subprocess # import inspect # import time # # os.environ["ISC_USER"] = user_name # os.environ["ISC_PASSWORD"] = user_password # db_conn.close() # # #-------------------------------------------- # # def flush_and_close( file_handle ): # # https://docs.python.org/2/library/os.html#os.fsync # # If you're starting with a Python file object f, # # first do f.flush(), and # # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk. # global os # # file_handle.flush() # if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull: # # otherwise: "OSError: [Errno 9] Bad file descriptor"! # os.fsync(file_handle.fileno()) # file_handle.close() # # #-------------------------------------------- # # def cleanup( f_names_list ): # global os # for f in f_names_list: # if type(f) == file: # del_name = f.name # elif type(f) == str: # del_name = f # else: # print('Unrecognized type of element:', f, ' - can not be treated as file.') # del_name = None # # if del_name and os.path.isfile( del_name ): # os.remove( del_name ) # # #-------------------------------------------- # # sql_init=''' # set bail on; # recreate table test_1 (x integer); # recreate table test_2 (x integer); # recreate table test_3 (x integer); # create unique index test_3_x_uniq on test_3 (x); # commit; # set term ^; # create or alter trigger trg_test1_ai for test_1 active after insert position 0 as # begin # insert into test_2 values (new.x * 10); # insert into test_3 values (new.x * 100); # end ^ # set term ;^ # # insert into test_3 values (1000); # commit; # ''' # # f_init_sql = open( os.path.join(context['temp_directory'],'tmp_gtcs_tx_ac2.sql'), 'w', buffering = 0) # f_init_sql.write( sql_init ) # flush_and_close( f_init_sql ) # # f_init_log = open( '.'.join( (os.path.splitext( f_init_sql.name )[0], 'log') ), 'w', buffering = 0) # f_init_err = open( '.'.join( (os.path.splitext( f_init_sql.name )[0], 'err') ), 'w', buffering = 0) # # # This can take about 25-30 seconds: # #################################### # subprocess.call( [ context['isql_path'], dsn, '-q', '-i', f_init_sql.name ], stdout = f_init_log, stderr = f_init_err) # # flush_and_close( f_init_log ) # flush_and_close( f_init_err ) # # #CUSTOM_TX_PARAMS = ( [ fdb.isc_tpb_read_committed, fdb.isc_tpb_no_rec_version, fdb.isc_tpb_nowait, fdb.isc_tpb_autocommit ] ) # CUSTOM_TX_PARAMS = ( [ fdb.isc_tpb_nowait, fdb.isc_tpb_autocommit ] ) # # con = fdb.connect( dsn = dsn ) # tx = con.trans( default_tpb = CUSTOM_TX_PARAMS ) # # tx.begin() # cx=tx.cursor() # # cx.execute('select mon$auto_commit from mon$transactions where mon$transaction_id = current_transaction') # for r in cx: # print( 'mon$auto_commit:', r[0] ) # # try: # cx.execute( 'insert into test_1 values(?)', (10,) ) # this leads to PK/UK violation in the table 'test_3' # except Exception as e: # #print('exception in ', inspect.stack()[0][3], ': ', sys.exc_info()[0]) # print('exception occured, gdscode:', e[2]) # # tx.commit() # # cx.execute("select 'test_1' tab_name, x from test_1 union all select 'test_2', x from test_2 union all select 'test_3', x from test_3") # for r in cx: # print( r[0], r[1] ) # # cx.close() # tx.close() # con.close() # # # cleanup # ######### # time.sleep(1) # cleanup( ( f_init_sql, f_init_log, f_init_err) ) # #--- act_1 = python_act('db_1', substitutions=substitutions_1) expected_stdout_1 = """ mon$auto_commit: 1 exception occured, gdscode: 335544349 test_3 1000 """ @pytest.mark.version('>=2.5') @pytest.mark.xfail def test_1(act_1: Action): pytest.fail("Test not IMPLEMENTED")