mirror of
https://github.com/FirebirdSQL/firebird-qa.git
synced 2025-01-23 05:53:06 +01:00
156 lines
5.1 KiB
Python
156 lines
5.1 KiB
Python
#coding:utf-8
|
|
|
|
"""
|
|
ID: gtcs.transactions-autocommit-02
|
|
TITLE: Changes within AUTO COMMIT must be cancelled when exception raises in some TRIGGER
|
|
DESCRIPTION:
|
|
Original test see in:
|
|
https://github.com/FirebirdSQL/fbtcs/blob/master/GTCS/tests/AUTO_COMMIT.2.ESQL.script
|
|
|
|
Test creates three tables (test_1, test_2 and test_3) and AI-trigger for one of them (test_1).
|
|
This trigger does INSERTs into test_2 and test_3.
|
|
For test_3 we create UNIQUE index that will prevent from insertion of duplicates.
|
|
Then we add one record into test_3 with value = 1000.
|
|
Finally, we try to add record into test_1 and after this INSERT its trigger attempts to add records,
|
|
into test_2 and test_3. The latter will fail because of UK violation (we try to insert apropriate value
|
|
into test-1 in order this exception be raised).
|
|
Expected result: NONE of just performed INSERTS must be saved in DB. The only existing record must be
|
|
in the table test_3 that we added there on initial phase.
|
|
|
|
NB: we use custom TPB with fdb.isc_tpb_autocommit in order to start DML transactions in AUTOCOMMIT=1 mode.
|
|
FBTEST: functional.gtcs.transactions_autocommit_2
|
|
"""
|
|
|
|
import pytest
|
|
from firebird.qa import *
|
|
|
|
db = db_factory()
|
|
|
|
act = python_act('db', substitutions=[('[ \t]+', ' ')])
|
|
|
|
expected_stdout = """
|
|
mon$auto_commit: 1
|
|
exception occured, gdscode: 335544349
|
|
test_3 1000
|
|
"""
|
|
|
|
@pytest.mark.skip('FIXME: Not IMPLEMENTED')
|
|
@pytest.mark.version('>=3')
|
|
def test_1(act: Action):
|
|
pytest.fail("Not IMPLEMENTED")
|
|
|
|
# test_script_1
|
|
#---
|
|
#
|
|
# import os
|
|
# import sys
|
|
# import subprocess
|
|
# import inspect
|
|
# import time
|
|
#
|
|
# os.environ["ISC_USER"] = user_name
|
|
# os.environ["ISC_PASSWORD"] = user_password
|
|
# db_conn.close()
|
|
#
|
|
# #--------------------------------------------
|
|
#
|
|
# def flush_and_close( file_handle ):
|
|
# # https://docs.python.org/2/library/os.html#os.fsync
|
|
# # If you're starting with a Python file object f,
|
|
# # first do f.flush(), and
|
|
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
|
|
# global os
|
|
#
|
|
# file_handle.flush()
|
|
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
|
|
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
|
|
# os.fsync(file_handle.fileno())
|
|
# file_handle.close()
|
|
#
|
|
# #--------------------------------------------
|
|
#
|
|
# def cleanup( f_names_list ):
|
|
# global os
|
|
# for f in f_names_list:
|
|
# if type(f) == file:
|
|
# del_name = f.name
|
|
# elif type(f) == str:
|
|
# del_name = f
|
|
# else:
|
|
# print('Unrecognized type of element:', f, ' - can not be treated as file.')
|
|
# del_name = None
|
|
#
|
|
# if del_name and os.path.isfile( del_name ):
|
|
# os.remove( del_name )
|
|
#
|
|
# #--------------------------------------------
|
|
#
|
|
# sql_init='''
|
|
# set bail on;
|
|
# recreate table test_1 (x integer);
|
|
# recreate table test_2 (x integer);
|
|
# recreate table test_3 (x integer);
|
|
# create unique index test_3_x_uniq on test_3 (x);
|
|
# commit;
|
|
# set term ^;
|
|
# create or alter trigger trg_test1_ai for test_1 active after insert position 0 as
|
|
# begin
|
|
# insert into test_2 values (new.x * 10);
|
|
# insert into test_3 values (new.x * 100);
|
|
# end ^
|
|
# set term ;^
|
|
#
|
|
# insert into test_3 values (1000);
|
|
# commit;
|
|
# '''
|
|
#
|
|
# f_init_sql = open( os.path.join(context['temp_directory'],'tmp_gtcs_tx_ac2.sql'), 'w', buffering = 0)
|
|
# f_init_sql.write( sql_init )
|
|
# flush_and_close( f_init_sql )
|
|
#
|
|
# f_init_log = open( '.'.join( (os.path.splitext( f_init_sql.name )[0], 'log') ), 'w', buffering = 0)
|
|
# f_init_err = open( '.'.join( (os.path.splitext( f_init_sql.name )[0], 'err') ), 'w', buffering = 0)
|
|
#
|
|
# # This can take about 25-30 seconds:
|
|
# ####################################
|
|
# subprocess.call( [ context['isql_path'], dsn, '-q', '-i', f_init_sql.name ], stdout = f_init_log, stderr = f_init_err)
|
|
#
|
|
# flush_and_close( f_init_log )
|
|
# flush_and_close( f_init_err )
|
|
#
|
|
# #CUSTOM_TX_PARAMS = ( [ fdb.isc_tpb_read_committed, fdb.isc_tpb_no_rec_version, fdb.isc_tpb_nowait, fdb.isc_tpb_autocommit ] )
|
|
# CUSTOM_TX_PARAMS = ( [ fdb.isc_tpb_nowait, fdb.isc_tpb_autocommit ] )
|
|
#
|
|
# con = fdb.connect( dsn = dsn )
|
|
# tx = con.trans( default_tpb = CUSTOM_TX_PARAMS )
|
|
#
|
|
# tx.begin()
|
|
# cx=tx.cursor()
|
|
#
|
|
# cx.execute('select mon$auto_commit from mon$transactions where mon$transaction_id = current_transaction')
|
|
# for r in cx:
|
|
# print( 'mon$auto_commit:', r[0] )
|
|
#
|
|
# try:
|
|
# cx.execute( 'insert into test_1 values(?)', (10,) ) # this leads to PK/UK violation in the table 'test_3'
|
|
# except Exception as e:
|
|
# #print('exception in ', inspect.stack()[0][3], ': ', sys.exc_info()[0])
|
|
# print('exception occured, gdscode:', e[2])
|
|
#
|
|
# tx.commit()
|
|
#
|
|
# cx.execute("select 'test_1' tab_name, x from test_1 union all select 'test_2', x from test_2 union all select 'test_3', x from test_3")
|
|
# for r in cx:
|
|
# print( r[0], r[1] )
|
|
#
|
|
# cx.close()
|
|
# tx.close()
|
|
# con.close()
|
|
#
|
|
# # cleanup
|
|
# #########
|
|
# time.sleep(1)
|
|
# cleanup( ( f_init_sql, f_init_log, f_init_err) )
|
|
#
|
|
#---
|