6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 21:43:06 +01:00

More python tests

This commit is contained in:
Pavel Císař 2021-11-30 19:13:50 +01:00
parent afcaf43049
commit f3d94d2873
8 changed files with 529 additions and 317 deletions

View File

@ -131,9 +131,9 @@ fdb_file_1 = temp_file('tmp_core_5078.fdb')
@pytest.mark.version('>=2.5.6')
def test_1(act_1: Action, fbk_file_1: Path, fdb_file_1: Path):
script_file = zipfile.Path(act_1.vars['files'] / 'core_5078.zip',
zipped_fbk_file = zipfile.Path(act_1.vars['files'] / 'core_5078.zip',
at='tmp_core_5078.fbk')
fbk_file_1.write_bytes(script_file.read_bytes())
fbk_file_1.write_bytes(zipped_fbk_file.read_bytes())
with act_1.connect_server() as srv:
srv.database.restore(database=str(fdb_file_1), backup=str(fbk_file_1))
srv.wait()

View File

@ -2,12 +2,12 @@
#
# id: bugs.core_5201
# title: Return nonzero result code when restore fails on activating and creating deferred user index
# decription:
# decription:
# ### NB ###
# According to Alex responce on letter 25-apr-2016 19:15, zero retcode returned ONLY when restore
# According to Alex responce on letter 25-apr-2016 19:15, zero retcode returned ONLY when restore
# was done WITH '-verbose' switch, and this was fixed. When restoring wasdone without additional
# switches, retcode was NON zero and its value was 1.
#
#
# Test description.
# We create table with UNIQUE computed-by index which expression refers to other table (Firebird allows this!).
# Because other table (test_2) initially is empty, index _can_ be created. But after this we insert record into
@ -15,16 +15,17 @@
# NOT be able to restore (unless '-i' switch specified).
# We will use this inability of restore index by checking 'gbak -rep -v ...' return code: it should be NON zero.
# If code will skip exception then this will mean FAIL of test.
#
#
# Confirmed on: 3.0.0.32484, 4.0.0.142 - retcode was ZERO (and this was wrong); since 4.0.0.145 - all fine, retcode=2.
#
#
# tracker_id: CORE-5201
# min_versions: ['2.5.6']
# versions: 2.5.6
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 2.5.6
# resources: None
@ -42,7 +43,7 @@ init_script_1 = """
commit;
insert into test_2 values(1000);
commit;
"""
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
@ -51,33 +52,33 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# import os
# import time
# import subprocess
#
#
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
#
#
# thisdb = db_conn.database_name
# tmpbkp = os.path.splitext(thisdb)[0] + '.fbk'
# tmpres = os.path.splitext(thisdb)[0] + '.tmp'
#
#
# db_conn.close()
#
#
# #--------------------------------------------
#
#
# def flush_and_close(file_handle):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
#
# #--------------------------------------------
#
#
# def cleanup( f_names_list ):
# global os
# for i in range(len( f_names_list )):
@ -88,12 +89,12 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# else:
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.')
# del_name = None
#
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
#
# #--------------------------------------------
#
#
# f_backup=open( os.path.join(context['temp_directory'],'tmp_backup_5201.log'), 'w')
# subprocess.check_call([context['fbsvcmgr_path'],
# "localhost:service_mgr",
@ -103,7 +104,7 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# ],
# stdout=f_backup, stderr=subprocess.STDOUT)
# flush_and_close( f_backup )
#
#
# f_restore=open( os.path.join(context['temp_directory'],'tmp_restore_5201.log'), 'w')
# try:
# # This is key point: before 4.0.0.145 restoring with '-v' key did assign retcode to zero
@ -116,43 +117,53 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# tmpres
# ],
# stdout=f_restore, stderr=subprocess.STDOUT)
#
#
# except subprocess.CalledProcessError as cpexc:
# # Final output of this test MUST have following line (confirmed on 3.0.0.32484: did not has this).
# print ('Restore finished with error code: '+str(cpexc.returncode))
#
#
# flush_and_close( f_restore )
#
#
#
#
# # Output STDOUT+STDERR of backup: they both should be EMPTY because we did not specify '-v' key:
# with open( f_backup.name,'r') as f:
# for line in f:
# print( "BACKUP LOG: "+line )
#
#
# # Output STDOUT+STDERR of restoring with filtering text related to ERRORs:
# with open( f_restore.name,'r') as f:
# for line in f:
# if ' ERROR:' in line:
# print( "RESTORE LOG: "+line )
#
#
#
#
# # Cleanup:
# ##########
# time.sleep(1)
# cleanup( (f_backup, f_restore, tmpbkp, tmpres) )
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
Restore finished with error code: 2
RESTORE LOG: gbak: ERROR:attempt to store duplicate value (visible to active transactions) in unique index "TEST_1_UNQ"
RESTORE LOG: gbak: ERROR: Problematic key value is (<expression> = 1)
"""
@pytest.mark.version('>=2.5.6')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
gbak: ERROR:attempt to store duplicate value (visible to active transactions) in unique index "TEST_1_UNQ"
gbak: ERROR: Problematic key value is (<expression> = 1)
"""
fbk_file = temp_file('core_5201.fbk')
tmp_db_file = temp_file('tmp_core_5201.fdb')
@pytest.mark.version('>=3.0')
def test_1(act_1: Action, fbk_file: Path, tmp_db_file: Path):
with act_1.connect_server() as srv:
srv.database.backup(database=str(act_1.db.db_path), backup=str(fbk_file))
assert srv.readlines() == []
#
act_1.expected_stderr = 'We expect error'
act_1.expected_stdout = expected_stdout_1
act_1.gbak(switches=['-rep', '-v', str(fbk_file), str(tmp_db_file)])
# filter stdout
act_1.stdout = '\n'.join([line for line in act_1.stdout.splitlines() if ' ERROR:' in line])
assert act_1.return_code == 2
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -2,7 +2,7 @@
#
# id: bugs.core_5207
# title: ISQL -X may generate invalid GRANT USAGE statements for domains
# decription:
# decription:
# Test uses .fbk which was prepared on FB 2.5 (source .fdb contains single domain).
# After .fbk extration we start restore from it and extract metadata to log.
# Then we search metadata log for phrase 'GRANT USAGE ON DOMAIN' - it should NOT present there.
@ -18,14 +18,16 @@
# WRONG GRANT: GRANT USAGE ON DOMAIN DM_INT TO PUBLIC;
# ===
# Checked on: LI-T4.0.0.142 - works fine.
#
#
# tracker_id: CORE-5207
# min_versions: ['3.0']
# versions: 3.0
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
import zipfile
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 3.0
# resources: None
@ -43,31 +45,31 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# import zipfile
# import subprocess
# import re
#
#
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
#
#
# tmpbkp = os.path.join( context['temp_directory'], 'core_5207.fbk' )
# tmpres = os.path.join( context['temp_directory'], 'tmp_core_5207.fdb' )
# db_conn.close()
#
#
# #--------------------------------------------
#
#
# def flush_and_close(file_handle):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
#
# #--------------------------------------------
#
#
# def cleanup( f_names_list ):
# global os
# for i in range(len( f_names_list )):
@ -78,19 +80,19 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# else:
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.')
# del_name = None
#
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
#
# #--------------------------------------------
#
#
#
#
# zf = zipfile.ZipFile( os.path.join(context['files_location'],'core_5207.zip') )
# zf.extractall( context['temp_directory'] )
# zf.close()
#
#
# # Result: core_5207.fbk is extracted into context['temp_directory']
#
#
# f_restore=open( os.path.join(context['temp_directory'],'tmp_restore_5207.log'), 'w')
# subprocess.check_call([context['fbsvcmgr_path'],
# "localhost:service_mgr",
@ -101,54 +103,54 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# ],
# stdout=f_restore, stderr=subprocess.STDOUT)
# flush_and_close( f_restore )
#
#
# # Result: database file 'tmp_core_5207.fdb' should be created after this restoring,
# # log ('tmp_restore_5207.log') must be EMPTY.
#
#
# f_xmeta_log = open( os.path.join(context['temp_directory'],'tmp_xmeta_5207.log'), 'w')
# f_xmeta_err = open( os.path.join(context['temp_directory'],'tmp_xmeta_5207.err'), 'w')
#
#
# subprocess.call( [context['isql_path'], "localhost:"+tmpres, "-x"],
# stdout = f_xmeta_log,
# stderr = f_xmeta_err
# )
#
#
# # This file should contain metadata:
# flush_and_close( f_xmeta_log )
#
#
# # This file should be empty:
# flush_and_close( f_xmeta_err )
#
#
# f_apply_log = open( os.path.join(context['temp_directory'],'tmp_apply_5207.log'), 'w')
# f_apply_err = open( os.path.join(context['temp_directory'],'tmp_apply_5207.err'), 'w')
#
#
# subprocess.call( [context['isql_path'], dsn, "-i", f_xmeta_log.name],
# stdout = f_apply_log,
# stderr = f_apply_err
# )
#
#
# # Both of these files should be empty:
# flush_and_close( f_apply_log )
# flush_and_close( f_apply_err )
#
#
# # Output STDOUT+STDERR of restoring and STDERR of metadata extraction: they both should be EMPTY:
# with open( f_restore.name,'r') as f:
# for line in f:
# print( "RESTORE LOG: "+line )
#
#
# with open( f_xmeta_err.name,'r') as f:
# for line in f:
# print( "EXTRACT ERR: "+line )
#
#
# with open( f_apply_log.name,'r') as f:
# for line in f:
# print( "APPLY STDOUT: "+line )
#
#
# with open( f_apply_err.name,'r') as f:
# for line in f:
# print( "APPLY STDERR: "+line )
#
# # Check that STDOUT of metadata extration (f_xmeta_log) does __not__ contain
#
# # Check that STDOUT of metadata extration (f_xmeta_log) does __not__ contain
# # statement like 'GRANT USAGE ON DOMAIN'.
# # Output must be empty here:
# #
@ -156,23 +158,38 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# for line in f:
# if 'GRANT USAGE ON DOMAIN' in line:
# print( "WRONG GRANT: "+line )
#
#
#
#
# # Cleanup:
# ##########
#
# # do NOT remove this pause otherwise some of logs will not be enable for deletion and test will finish with
#
# # do NOT remove this pause otherwise some of logs will not be enable for deletion and test will finish with
# # Exception raised while executing Python test script. exception: WindowsError: 32
# time.sleep(1)
# cleanup( (f_restore,f_xmeta_log,f_xmeta_err,f_apply_log,f_apply_err,tmpbkp,tmpres) )
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
fbk_file_1 = temp_file('tmp_core_5207.fbk')
fdb_file_1 = temp_file('tmp_core_5207.fdb')
@pytest.mark.version('>=3.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, fbk_file_1: Path, fdb_file_1: Path, capsys):
zipped_fbk_file = zipfile.Path(act_1.vars['files'] / 'core_5207.zip',
at='core_5207.fbk')
fbk_file_1.write_bytes(zipped_fbk_file.read_bytes())
with act_1.connect_server() as srv:
srv.database.restore(database=str(fdb_file_1), backup=str(fbk_file_1))
srv.wait()
act_1.isql(switches=['-x', str(fdb_file_1)], connect_db=False)
metadata = act_1.stdout
# Check metadata
for line in metadata.splitlines():
if 'GRANT USAGE ON DOMAIN' in line:
pytest.fail(f'WRONG GRANT: {line}')
# Apply metadata to main test database
act_1.reset()
act_1.isql(switches=[], input=metadata)
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -2,31 +2,33 @@
#
# id: bugs.core_5210
# title: Firebird 3.0 + fbclient 3.0 - POST_EVENT won't work
# decription:
# decription:
# We create database-level trigger which sends event with name 'dml_event' on COMMIT.
# Then we do new connect and run thread with INSERT statement (with delay = 1.0 second), and wait
# Then we do new connect and run thread with INSERT statement (with delay = 1.0 second), and wait
# NO MORE than <max4delivering> sec.
# We should receive event during ~ 1.0 second.
# We should receive event during ~ 1.0 second.
# We have to consider result as FAIL if we do not receive event in <max4delivering> seconds.
# Result of "events.wait(max4delivering)" will be non-empty dictionary with following key-value:
# {'dml_event': 1} - when all fine and client got event;
# {'dml_event': 0} - when NO event was delivered
# All such actions are repeated several times in order to increase probability of failure if something
# in FB will be broken.
#
#
# Confirmed wrong result on: 4.0.0.145, V3.0.0.32493 - with probability approx 60%.
# All fine on: T4.0.0.150, WI-V3.0.0.32496 (SS/SC/CS).
#
# PS. Event handling code in this text was adapted from fdb manual:
#
# PS. Event handling code in this text was adapted from fdb manual:
# http://pythonhosted.org/fdb/usage-guide.html#database-events
#
#
# tracker_id: CORE-5210
# min_versions: ['2.5.6']
# versions: 2.5.6
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from time import time
from threading import Timer
from firebird.qa import db_factory, python_act, Action
# version: 2.5.6
# resources: None
@ -44,13 +46,13 @@ init_script_1 = """
recreate table test(id int);
commit;
"""
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
# test_script_1
#---
#
#
# def check_events(seqno):
# import fdb
# import threading
@ -58,38 +60,38 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# import time
# from time import time
# import os
#
#
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
#
#
# # Utility function
# def send_events(command_list):
# cur=db_conn.cursor()
# for cmd in command_list:
# cur.execute(cmd)
# db_conn.commit()
#
#
# timed_event = threading.Timer(1.0, send_events, args=[["insert into test(id) values ( rand()*1000 )",]])
#
# # Connection.event_conduit() takes a sequence of string event names as parameter, and returns
#
# # Connection.event_conduit() takes a sequence of string event names as parameter, and returns
# # EventConduit instance.
# events = db_conn.event_conduit(['dml_event'])
#
# # To start listening for events it's necessary (starting from FDB version 1.4.2)
#
# # To start listening for events it's necessary (starting from FDB version 1.4.2)
# # to call EventConduit.begin() method or use EventConduit's context manager interface
# # Immediately when begin() method is called, EventConduit starts to accumulate notifications
# # of any events that occur within the conduit's internal queue until the conduit is closed
# # Immediately when begin() method is called, EventConduit starts to accumulate notifications
# # of any events that occur within the conduit's internal queue until the conduit is closed
# # (via the close() method)
#
# #print("Start listening for event")
#
#
# #print("Start listening for event")
#
# events.begin()
#
#
# timed_event.start()
#
#
# # Notifications about events are aquired through call to wait() method, that blocks the calling
# # thread until at least one of the events occurs, or the specified timeout (if any) expires,
#
#
# # Notifications about events are aquired through call to wait() method, that blocks the calling
# # thread until at least one of the events occurs, or the specified timeout (if any) expires,
# # and returns None if the wait timed out, or a dictionary that maps event_name -> event_occurrence_count.
# #t1 = datetime.datetime.now()
# t1 = time()
@ -97,21 +99,22 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# e = events.wait(max4delivering)
# t2 = time()
# #t2 = datetime.datetime.now()
#
#
#
#
# events.close()
#
#
# print(e)
# print( str(seqno)+': event was SUCCESSFULLY delivered.' if t2-t1 < max4delivering else str(seqno)+': event was NOT delivered for %.2f s (threshold is %.2f s)' % ( (t2-t1), max4delivering ) )
#
#
# check_events(1)
# check_events(2)
# check_events(3)
# check_events(4)
# check_events(5)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
{'dml_event': 1}
@ -128,11 +131,36 @@ expected_stdout_1 = """
{'dml_event': 1}
5: event was SUCCESSFULLY delivered.
"""
"""
def send_events(con, command_list):
cur = con.cursor()
for cmd in command_list:
cur.execute(cmd)
con.commit()
@pytest.mark.version('>=2.5.6')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, capsys):
def check_events(seqno: int):
with act_1.db.connect() as con:
timed_event = Timer(1.0, send_events, args=[con, ["insert into test(id) values (rand()*1000)",]])
with con.event_collector(['dml_event']) as events:
timed_event.start()
t1 = time()
max4delivering = 3
e = events.wait(max4delivering)
t2 = time()
print(e)
print(f'{seqno}: event was SUCCESSFULLY delivered.' if t2-t1 < max4delivering
else f'{seqno}: event was NOT delivered for {t2-t1}s (threshold is {max4delivering}s)')
#
check_events(1)
check_events(2)
check_events(3)
check_events(4)
check_events(5)
# Check
act_1.expected_stdout = expected_stdout_1
act_1.stdout = capsys.readouterr().out
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -2,16 +2,16 @@
#
# id: bugs.core_5218
# title: Explicitly defined names for NOT NULL constraints are not exported into script by ISQL -x
# decription:
# decription:
# Checked on WI-V3.0.0.32501, WI-T4.0.0.155.
#
#
# tracker_id: CORE-5218
# min_versions: ['2.5.6']
# versions: 2.5.6
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 2.5.6
# resources: None
@ -26,12 +26,12 @@ init_script_1 = """
--- is created now, i.e. one may to declare FK-field like this:
-- ... f03 references test
-- That's not so for 2.5.x:
,f03 int constraint f03_nn not null
constraint f03_fk
,f03 int constraint f03_nn not null
constraint f03_fk
references test( f01 )
-- ^-- this must be specified in 2.5.x
);
"""
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
@ -40,29 +40,29 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# import os
# import time
# import subprocess
#
#
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
#
#
# db_conn.close()
#
#
# #--------------------------------------------
#
#
# def flush_and_close(file_handle):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
#
# #--------------------------------------------
#
#
# def cleanup( f_names_list ):
# global os
# for i in range(len( f_names_list )):
@ -73,12 +73,12 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# else:
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.')
# del_name = None
#
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
#
# #--------------------------------------------
#
#
# f_extract_meta_sql = open( os.path.join(context['temp_directory'],'tmp_5218_meta.log'), 'w')
# f_extract_meta_err = open( os.path.join(context['temp_directory'],'tmp_5218_meta.err'), 'w')
# subprocess.call( [context['isql_path'], dsn, "-x"],
@ -87,48 +87,52 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# )
# flush_and_close( f_extract_meta_sql )
# flush_and_close( f_extract_meta_err )
#
#
# ###############
# # CHECK RESULTS
# ###############
#
#
# # 1. STDERR for extracted metadata must be EMPTY.
# with open( f_extract_meta_err.name, 'r') as f:
# for line in f:
# if line.strip():
# print('EXTRACTED METADATA ERR: '+line)
#
#
# # 2. STDLOG for extracted metadata: we must ouput all
# # lines with phrase 'CONSTRAINT' in order to check that this
# # lines with phrase 'CONSTRAINT' in order to check that this
# # keyword actually present for each initial declaration:
#
#
# with open( f_extract_meta_sql.name, 'r') as f:
# for line in f:
# if 'CONSTRAINT' in line:
# print( 'EXTRACTED METADATA LOG: '+line )
#
#
#
#
# # Cleanup:
# ##########
# time.sleep(1)
# cleanup( (f_extract_meta_sql, f_extract_meta_err) )
#
#
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
EXTRACTED METADATA LOG: CREATE TABLE TEST (F01 INTEGER CONSTRAINT F01_NN NOT NULL,
EXTRACTED METADATA LOG: F02 INTEGER CONSTRAINT F02_NN NOT NULL,
EXTRACTED METADATA LOG: F03 INTEGER CONSTRAINT F03_NN NOT NULL,
EXTRACTED METADATA LOG: CONSTRAINT F01_PK PRIMARY KEY (F01),
EXTRACTED METADATA LOG: CONSTRAINT F02_UK UNIQUE (F02));
EXTRACTED METADATA LOG: ALTER TABLE TEST ADD CONSTRAINT F03_FK FOREIGN KEY (F03) REFERENCES TEST (F01);
"""
CREATE TABLE TEST (F01 INTEGER CONSTRAINT F01_NN NOT NULL,
F02 INTEGER CONSTRAINT F02_NN NOT NULL,
F03 INTEGER CONSTRAINT F03_NN NOT NULL,
CONSTRAINT F01_PK PRIMARY KEY (F01),
CONSTRAINT F02_UK UNIQUE (F02));
ALTER TABLE TEST ADD CONSTRAINT F03_FK FOREIGN KEY (F03) REFERENCES TEST (F01);
"""
@pytest.mark.version('>=2.5.6')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=['-x'])
# filter stdout
act_1.stdout = '\n'.join([line for line in act_1.stdout.splitlines() if 'CONSTRAINT' in line])
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -2,36 +2,44 @@
#
# id: bugs.core_5220
# title: ISQL -X: double quotes are missed for COLLATE <C> of CREATE DOMAIN statement when <C> is from any non-ascii charset
# decription:
# decription:
# We create in init_script two collations with non-ascii names and two varchar domains which use these collations.
# Then we extract metadata and save it to file as .sql script to be applied further.
# This script should contain CORRECT domains definition, i.e. collations should be enclosed in double quotes.
# We check correctness by removing from database all objects and applying this script: no errors should occur at that point.
# Then we extract metadata second time, store it to second .sql and COMPARE this file with result of first metadata extraction.
# These files should be equal, i.e. difference should be empty.
#
#
# Checked on WI-V3.0.0.32501, WI-T4.0.0.155.
#
#
# 13.04.2021. Adapted for run both on Windows and Linux. Checked on:
# Windows: 4.0.0.2416
# Linux: 4.0.0.2416
#
#
# tracker_id: CORE-5220
# min_versions: ['3.0']
# versions: 3.0
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from difflib import unified_diff
from firebird.qa import db_factory, python_act, Action
# version: 3.0
# resources: None
substitutions_1 = []
init_script_1 = """"""
init_script_1 = """
create collation "Циферки" for utf8 from unicode case insensitive 'NUMERIC-SORT=1';
create collation "Испания" for iso8859_1 from es_es_ci_ai 'SPECIALS-FIRST=1';;
commit;
create domain "Артикулы" varchar(12) character set utf8 collate "Циферки";
create domain "Комрады" varchar(40) character set iso8859_1 collate "Испания";
commit;
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
db_1 = db_factory(sql_dialect=3, init=init_script_1, charset='UTF8')
# test_script_1
#---
@ -40,29 +48,29 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# import subprocess
# import difflib
# import io
#
#
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
#
#
# db_conn.close()
#
#
# #--------------------------------------------
#
#
# def flush_and_close( file_handle ):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
#
# #--------------------------------------------
#
#
# def cleanup( f_names_list ):
# global os
# for i in range(len( f_names_list )):
@ -74,16 +82,16 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.')
# print('type(f_names_list[i])=',type(f_names_list[i]))
# del_name = None
#
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
#
# #--------------------------------------------
#
#
# sql_txt=''' set bail on;
# set names utf8;
# connect '%(dsn)s' user '%(user_name)s' password '%(user_password)s';
#
#
# create collation "Циферки" for utf8 from unicode case insensitive 'NUMERIC-SORT=1';
# create collation "Испания" for iso8859_1 from es_es_ci_ai 'SPECIALS-FIRST=1';;
# commit;
@ -91,34 +99,34 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# create domain "Комрады" varchar(40) character set iso8859_1 collate "Испания";
# commit;
# ''' % dict(globals(), **locals())
#
#
# f_ddl_sql = open( os.path.join(context['temp_directory'], 'tmp_5220_ddl.sql'), 'w' )
# f_ddl_sql.write( sql_txt )
# flush_and_close( f_ddl_sql )
#
#
# f_ddl_log = open( os.path.splitext(f_ddl_sql.name)[0]+'.log', 'w')
# subprocess.call( [ context['isql_path'], '-q', '-i', f_ddl_sql.name ],
# stdout = f_ddl_log,
# stderr = subprocess.STDOUT
# )
# flush_and_close( f_ddl_log )
#
#
#
#
# f_extract_meta1_sql = open( os.path.join(context['temp_directory'],'tmp_5220_meta1.sql'), 'w')
# subprocess.call( [context['isql_path'], dsn, "-x"],
# stdout = f_extract_meta1_sql,
# stderr = subprocess.STDOUT
# )
# flush_and_close( f_extract_meta1_sql )
#
#
# f_remove_meta_sql = open( os.path.join(context['temp_directory'],'tmp_5220_kill.sql'), 'w')
#
#
# sql_txt=''' drop domain "Комрады";
# drop domain "Артикулы";
# drop collation "Испания";
# drop collation "Циферки";
# commit;
#
#
# set list on;
# set count on;
# select f.rdb$field_name
@ -126,7 +134,7 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# where
# f.rdb$system_flag is distinct from 1
# and f.rdb$field_name not starting with upper('rdb$');
#
#
# select r.rdb$collation_name
# from rdb$collations r
# where
@ -134,94 +142,129 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# '''
# f_remove_meta_sql.write(sql_txt)
# flush_and_close( f_remove_meta_sql )
#
#
# f_remove_meta_log = open( os.path.join(context['temp_directory'],'tmp_5220_kill.log'), 'w')
# subprocess.call( [context['isql_path'], dsn, "-ch", "utf8", "-i", f_remove_meta_sql.name],
# stdout = f_remove_meta_log,
# stderr = subprocess.STDOUT
# )
# flush_and_close( f_remove_meta_log )
#
#
# f_apply_meta_log = open( os.path.join(context['temp_directory'],'tmp_5220_apply.log'), 'w')
# subprocess.call( [context['isql_path'], dsn, "-ch", "utf8", "-i", f_extract_meta1_sql.name],
# stdout = f_apply_meta_log,
# stderr = subprocess.STDOUT
# )
# flush_and_close( f_apply_meta_log )
#
#
#
#
# f_extract_meta2_sql = open( os.path.join(context['temp_directory'],'tmp_5220_meta2.sql'), 'w')
# subprocess.call( [context['isql_path'], dsn, "-x"],
# stdout = f_extract_meta2_sql,
# stderr = subprocess.STDOUT
# )
# flush_and_close( f_extract_meta2_sql )
#
#
# time.sleep(1)
#
#
# ###############
# # CHECK RESULTS
# ###############
#
#
# # 1. Log f_remove_meta_log (REMOVING metadata) should contain only phrases about absence of domains and collations
# with open( f_remove_meta_log.name, 'r') as f:
# for line in f:
# if line.strip():
# print('REMOVE METADATA LOG: '+line)
#
# # 2. Log f_apply_meta_log (result of APPLYING extracted metadata, file: f_extract_meta1_sql) should be EMPTY
#
# # 2. Log f_apply_meta_log (result of APPLYING extracted metadata, file: f_extract_meta1_sql) should be EMPTY
# # (because collation names now should be enclosed in double quotes)
# with open( f_apply_meta_log.name, 'r') as f:
# for line in f:
# if line.strip():
# print('APPLY EXTRACTED METADATA LOG: '+line)
#
#
# # 3. Log f_extract_meta2_sql should EXACTLY match to first extracted metadata log (f_extract_meta1_sql).
# # We compare these files using Python 'diff' package.
#
#
# f_diff_txt=open( os.path.join(context['temp_directory'],'tmp_5220_meta_diff.txt'), 'w')
#
#
# f_old=[]
# f_new=[]
#
#
# f_old.append(f_extract_meta1_sql) # tmp_5220_meta1.sql -- extracted metadata just after 'init_script' was done
# f_new.append(f_extract_meta2_sql) # tmp_5220_meta2.sql -- extracted metadata after drop all object and applying 'f_extract_meta1_sql'
#
#
# for i in range(len(f_old)):
# old_file=open(f_old[i].name,'r')
# new_file=open(f_new[i].name,'r')
#
#
# f_diff_txt.write( ''.join( difflib.unified_diff( old_file.readlines(), new_file.readlines() ) ) )
#
#
# old_file.close()
# new_file.close()
#
#
# flush_and_close( f_diff_txt )
#
#
# # Should be EMPTY:
# ##################
# with open( f_diff_txt.name,'r') as f:
# for line in f:
# print( 'METADATA DIFF:' + ' '.join(line.split()).upper() )
#
#
#
#
# #####################################################################
# # Cleanup:
# time.sleep(1)
# cleanup((f_extract_meta1_sql,f_extract_meta2_sql,f_apply_meta_log,f_remove_meta_log,f_remove_meta_sql,f_diff_txt,f_ddl_sql,f_ddl_log))
#
#
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
REMOVE METADATA LOG: Records affected: 0
REMOVE METADATA LOG: Records affected: 0
"""
Records affected: 0
Records affected: 0
"""
remove_metadata = """
drop domain "Комрады";
drop domain "Артикулы";
drop collation "Испания";
drop collation "Циферки";
commit;
set list on;
set count on;
select f.rdb$field_name
from rdb$fields f
where
f.rdb$system_flag is distinct from 1
and f.rdb$field_name not starting with upper('rdb$');
select r.rdb$collation_name
from rdb$collations r
where
r.rdb$system_flag is distinct from 1;
"""
@pytest.mark.version('>=3.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
#
act_1.isql(switches=['-x'])
metadata = act_1.stdout
# Remove metadata
act_1.reset()
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=[], input=remove_metadata)
assert act_1.clean_stdout == act_1.clean_expected_stdout
# Apply metadata
act_1.reset()
act_1.isql(switches=[], input=metadata)
# Extract metadatata again
act_1.reset()
act_1.isql(switches=['-x'])
# Check metadata
meta_diff = list(unified_diff(metadata.splitlines(), act_1.stdout.splitlines()))
assert meta_diff == []

View File

@ -2,40 +2,46 @@
#
# id: bugs.core_5222
# title: SELECT WITH LOCK may raise unexpected update conflict errors under concurrent load
# decription:
# decription:
# Prototype: https://groups.yahoo.com/neo/groups/firebird-support/conversations/messages/128920
# Done with suggestions from dimitr, see letter 01-may-2016 09:15.
# Confirmed on WI-V3.0.0.32366, WI-V3.0.0.32483, 3.0.0.32501 (SS,SC,CS) - it was enough to
# async. start THREE child ISQLs sessions, one or two of them always raise exception after
# Confirmed on WI-V3.0.0.32366, WI-V3.0.0.32483, 3.0.0.32501 (SS,SC,CS) - it was enough to
# async. start THREE child ISQLs sessions, one or two of them always raise exception after
# few seconds with text: 'deadlock / update conflicts' and could not finish its job.
#
#
# Checked on: WI-V3.0.0.32503, 3.0.1.32570, WI-T4.0.0.157 - works fine.
# Checked on 4.0.0.322 (Classic, SuperClassic, SuperServer) - works OK.
#
#
# 12.08.2018 ::: NB :::
# It is unclear now how this test can be implemented on 4.0 after introduction of READ CONSISTENCY
# with default value ReadConsistency = 1. According to doc:
# ===
# If ReadConsistency set to 1 (by default) engine ignores
# [NO] RECORD VERSION flags and makes all read-committed
# [NO] RECORD VERSION flags and makes all read-committed
# transactions READ COMMITTED READ CONSISTENCY.
# ===
#
#
# Also, doc\\README.read_consistency.md says that:
# "Old read-committed isolation modes (**RECORD VERSION** and **NO RECORD VERSION**) are still
# "Old read-committed isolation modes (**RECORD VERSION** and **NO RECORD VERSION**) are still
# allowed but considered as legacy and not recommended to use."
#
#
# This mean that one can NOT to check issues of this ticket under 4.0 using default (and recommended)
# value of config parameter 'ReadConsistency'.
# For that reason it was decided to make new EMPTY section of test for 4.0.
#
#
# tracker_id: CORE-5222
# min_versions: ['3.0.0']
# versions: 3.0, 4.0
# qmid: None
from __future__ import annotations
from typing import List
import pytest
from firebird.qa import db_factory, isql_act, Action
import subprocess
import time
import re
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_files, temp_file
# version: 3.0
# resources: None
@ -55,10 +61,10 @@ init_script_1 = """
);
set term ^;
alter procedure p_increment (a_iter_cnt int default 1000)
alter procedure p_increment (a_iter_cnt int default 1000)
returns (
proc_start timestamp,
selected_id bigint,
proc_start timestamp,
selected_id bigint,
proc_finish timestamp
) as
declare i bigint;
@ -70,17 +76,17 @@ init_script_1 = """
begin
in autonomous transaction do
begin
select id
select id
from gen_tab with lock -- this raised SQLSTATE = 40001 / -update conflicts with concurrent update
into selected_id;
i = i + 1;
id_new = selected_id + 1;
insert into gen_log(id, id_new, id_diff)
insert into gen_log(id, id_new, id_diff)
values( :selected_id, :id_new, :id_new - :selected_id);
update gen_tab set id = :id_new
update gen_tab set id = :id_new
where id = :selected_id;
end
@ -94,7 +100,7 @@ init_script_1 = """
insert into gen_tab (id) values (0);
commit;
"""
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
@ -105,78 +111,78 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# import subprocess
# from subprocess import Popen
# import re
#
#
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
#
#
# db_conn.close()
#
#
#
#
# f_run_sql=open( os.path.join(context['temp_directory'],'tmp_5222_run.sql'), 'w')
# f_run_sql.write(''' --show version;
# commit;
# commit;
# set list on;
# set transaction read committed no record_version lock timeout 3;
# select current_timestamp as before_proc
# select current_timestamp as before_proc
# from rdb$database;
#
#
# select * from p_increment(10);
#
#
# select current_timestamp as after_proc
# from rdb$database;
# ''')
# f_run_sql.close()
#
#
# ##########################################################################################
# # Launching several concurrent child ISQL processes which perform script from `f_run_sql`
# ##########################################################################################
# planned_dml_attachments = 3
#
#
# f_list = []
# p_list = []
#
#
#
#
# for i in range(0, planned_dml_attachments):
# sqllog=open( os.path.join(context['temp_directory'],'tmp_dml_5222_'+str(i)+'.log'), 'w')
# f_list.append(sqllog)
#
#
# for i in range(len(f_list)):
# p_isql=Popen( [ context['isql_path'] , dsn, "-i", f_run_sql.name ],
# stdout=f_list[i],
# stderr=subprocess.STDOUT
# )
# p_list.append(p_isql)
#
#
# time.sleep(7)
#
#
# for i in range(len(f_list)):
# f_list[i].close()
#
#
# for i in range(len(p_list)):
# p_list[i].terminate()
#
#
# # 12.08.2016: added small delay because it's possible to get:
# # WindowsError:
# # 32
# # The process cannot access the file because it is being used by another process
#
#
# time.sleep(2)
#
#
# ###########################
# # CHECK RESULTS and CLEANUP
# ###########################
#
# # 1. Each log _should_ contain ONLY following lines:
#
# # 1. Each log _should_ contain ONLY following lines:
# # BEFORE_PROC 2016-05-03 09:27:57.6210
# # PROC_START 2016-05-03 09:27:57.6210
# # SELECTED_ID 1569
# # PROC_FINISH 2016-05-03 09:28:04.0740
# # AFTER_PROC 2016-05-03 09:28:04.0740
# # 2. _NO_ log should contain 'SQLSTATE = 40001'
#
#
# # Open every log and print 1st word from each line, ignoring values of timestamp and ID.
# # Then close log and remove it:
#
#
# pattern = re.compile("BEFORE_PROC*|PROC_START*|SELECTED_ID*|PROC_FINISH*|AFTER_PROC*")
# for i in range(len(f_list)):
# with open( f_list[i].name, 'r') as f:
@ -188,9 +194,9 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# print( 'UNEXPECTED, LOG #'+str(i)+': '+ line )
# f.close()
# os.remove(f_list[i].name)
#
#
# os.remove(f_run_sql.name)
#
#
# # Sample of WRONG result (got on 3.0.0.32483):
# # ===============
# # EXPECTED, LOG #0: BEFORE_PROC
@ -212,14 +218,26 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# # + UNEXPECTED, LOG #2: -update conflicts with concurrent update
# # + UNEXPECTED, LOG #2: -concurrent transaction number is 32
# # + UNEXPECTED, LOG #2: -At procedure 'P_INCREMENT' line: 17, col: 17
# # + UNEXPECTED, LOG #2: After line 6 in file C:\\MIX
# irebird\\QA
# bt-repo mp mp_5222_run.sql
# # + UNEXPECTED, LOG #2: After line 6 in file C:\\MIX\\Firebird\\QA\\fbt-repo\\tmp\\tmp_5222_run.sql
# # EXPECTED, LOG #2: AFTER_PROC
#
#
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
test_script_1 = """
commit;
set list on;
set transaction read committed no record_version lock timeout 3;
select current_timestamp as before_proc
from rdb$database;
select * from p_increment(10);
select current_timestamp as after_proc
from rdb$database;
"""
expected_stdout_1 = """
EXPECTED, LOG #0: BEFORE_PROC
@ -239,13 +257,60 @@ expected_stdout_1 = """
EXPECTED, LOG #2: SELECTED_ID
EXPECTED, LOG #2: PROC_FINISH
EXPECTED, LOG #2: AFTER_PROC
"""
"""
@pytest.mark.version('>=3.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
PLANNED_DML_ATTACHMENTS = 3
run_sql = temp_file('core_5222_run.sql')
dml_logs_1 = temp_files([f'tmp_dml_5222_{i+1}' for i in range(PLANNED_DML_ATTACHMENTS)])
@pytest.mark.version('>=3.0,<4')
def test_1(act_1: Action, run_sql: Path, dml_logs_1: List[Path], capsys):
pattern = re.compile("BEFORE_PROC*|PROC_START*|SELECTED_ID*|PROC_FINISH*|AFTER_PROC*")
run_sql.write_text(test_script_1)
# Launching several concurrent child ISQL processes which perform `run_sql` script
f_logs = []
p_dml = []
try:
for dml_log in dml_logs_1: # Contains PLANNED_DML_ATTACHMENTS items
f = open(dml_log, mode='w')
f_logs.append(f)
p_dml.append(subprocess.Popen([act_1.vars['isql'],
'-i', str(run_sql),
'-user', act_1.db.user,
'-password', act_1.db.password,
act_1.db.dsn],
stdout=f, stderr=subprocess.STDOUT))
#
time.sleep(PLANNED_DML_ATTACHMENTS * 5)
finally:
for f in f_logs:
f.close()
for p in p_dml:
p.terminate()
#
# 1. Each log _should_ contain ONLY following lines:
# BEFORE_PROC 2016-05-03 09:27:57.6210
# PROC_START 2016-05-03 09:27:57.6210
# SELECTED_ID 1569
# PROC_FINISH 2016-05-03 09:28:04.0740
# AFTER_PROC 2016-05-03 09:28:04.0740
# 2. _NO_ log should contain 'SQLSTATE = 40001'
#
# Open every log and print 1st word from each line, ignoring values of timestamp and ID.
i = 0
for dml_log in dml_logs_1:
for line in dml_log.read_text().splitlines():
if line.split():
if pattern.match(line):
print(f'EXPECTED, LOG #{i}: {line.split()[0]}')
else:
print(f'UNEXPECTED, LOG #{i}: {line}')
i += 1
# Check
act_1.expected_stdout = expected_stdout_1
act_1.stdout = capsys.readouterr().out
assert act_1.clean_stdout == act_1.clean_expected_stdout
# version: 4.0
# resources: None
@ -258,15 +323,14 @@ db_2 = db_factory(sql_dialect=3, init=init_script_2)
# test_script_2
#---
#
#
#
#
#---
#act_2 = python_act('db_2', test_script_2, substitutions=substitutions_2)
@pytest.mark.version('>=4.0')
@pytest.mark.xfail
def test_2(db_2):
pytest.fail("Test not IMPLEMENTED")
pytest.skip("Requires changed firebird.conf [ReadConsistency=0]")

View File

@ -2,32 +2,33 @@
#
# id: bugs.core_5231
# title: EXECUTE STATEMENT: BLR error if more than 256 output parameters exist
# decription:
# decription:
# We define here number of output args for which one need to made test - see var 'sp_args_count'.
# Then we open .sql file and GENERATE it content based on value of 'sp_args_count' (procedure will
# have header and body with appropriate number of arguments and statement to be executed).
# Finally, we run ISQL subprocess with giving to it for execution just generated .sql script.
# ISQL should _not_ issue any error and all lines of its STDOUT should start from the names of
# Finally, we run ISQL subprocess with giving to it for execution just generated .sql script.
# ISQL should _not_ issue any error and all lines of its STDOUT should start from the names of
# output arguments (letter 'O': O1, O2, ... O5000).
#
#
# Confirmed bug on WI-T4.0.0.184 for number of output args >= 256:
# Statement failed, SQLSTATE = HY000
# invalid request BLR at offset 7157
# -BLR syntax error: expected statement at offset 7158, encountered 0
# Checked on WI-V3.0.1.32518, WI-T4.0.0.197 - works fine.
#
#
# tracker_id: CORE-5231
# min_versions: ['3.0']
# versions: 3.0
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 3.0
# resources: None
substitutions_1 = []
substitutions_1 = [('^O.*', '')]
init_script_1 = """"""
@ -38,28 +39,28 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# import os
# import time
# import subprocess
#
#
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
#
#
# db_conn.close()
# #--------------------------------------------
#
#
# def flush_and_close(file_handle):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
#
# #--------------------------------------------
#
#
# def cleanup( f_names_list ):
# global os
# for i in range(len( f_names_list )):
@ -70,17 +71,17 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# else:
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.')
# del_name = None
#
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
#
# #--------------------------------------------
#
#
#
#
# ####################### N U M B E R O F O U T P U T A R G S. ###########
# sp_args_count=5000
# ###################################################################################
#
#
# sql_pref='''set term ^;
# execute block as
# begin
@ -90,32 +91,32 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# commit ^
# create or alter procedure sp_test returns (
# '''
#
#
# f_ddl_sql = open( os.path.join(context['temp_directory'],'tmp_5231_ddl.sql'), 'w')
# f_ddl_sql.write(sql_pref)
#
#
# delimiter=''
# for i in range(sp_args_count):
# f_ddl_sql.write( '%so%s int' % (delimiter, str(i)) )
# delimiter=','
#
# f_ddl_sql.write(
#
# f_ddl_sql.write(
# ''') as begin
# for execute statement 'select
# '''
# )
#
#
# delimiter=''
# for i in range(sp_args_count):
# f_ddl_sql.write( '%s%s' % (delimiter, str(i)) )
# delimiter=','
# f_ddl_sql.write(" from rdb$database'\\ninto ")
#
#
# delimiter=''
# for i in range(sp_args_count):
# f_ddl_sql.write( '%so%s' % (delimiter, str(i)) )
# delimiter=','
#
#
# sql_suff='''
# do suspend;
# end^
@ -126,39 +127,83 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# '''
# f_ddl_sql.write(sql_suff)
# flush_and_close( f_ddl_sql )
#
#
# f_run_log=open( os.path.join(context['temp_directory'],'tmp_5231_run.log'), 'w')
# f_run_err=open( os.path.join(context['temp_directory'],'tmp_5231_run.err'), 'w')
#
#
# subprocess.call([context['isql_path'], dsn, "-i", f_ddl_sql.name],
# stdout=f_run_log,
# stdout=f_run_log,
# stderr=f_run_err)
# flush_and_close( f_run_log )
# flush_and_close( f_run_err )
#
#
# with open( f_run_err.name,'r') as f:
# for line in f:
# if line.split():
# print('UNEXPECTED STDERR: '+line)
#
#
# with open( f_run_log.name,'r') as f:
# for line in f:
# if line.split() and not line.startswith('O'):
# print('UNEXPECTED STDLOG: '+line)
#
#
# # CLEANUP
# #########
# time.sleep(1)
# cleanup( (f_ddl_sql, f_run_log, f_run_err) )
#
#
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
SP_ARGS_COUNT = 5000
ddl_script = temp_file('core_5231.sql')
def build_script(ddl_script: Path):
with open(ddl_script, 'w') as ddl_file:
ddl_file.write("""
set term ^;
execute block as
begin
execute statement 'drop procedure sp_test';
when any do begin end
end ^
commit ^
create or alter procedure sp_test returns (
""")
delimiter = ''
for i in range(SP_ARGS_COUNT):
ddl_file.write(f'{delimiter}o{i} int')
delimiter = ','
ddl_file.write(
""") as begin
for execute statement 'select
""")
delimiter = ''
for i in range(SP_ARGS_COUNT):
ddl_file.write(f'{delimiter}{i}')
delimiter = ','
ddl_file.write(" from rdb$database'\ninto ")
delimiter = ''
for i in range(SP_ARGS_COUNT):
ddl_file.write(f'{delimiter}o{i}')
delimiter = ','
ddl_file.write("""
do suspend;
end^
set term ;^
commit;
set list on;
select * from sp_test;
""")
@pytest.mark.version('>=3.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, ddl_script: Path):
build_script(ddl_script)
act_1.isql(switches=[], input_file=ddl_script, charset='NONE')
assert act_1.clean_stdout == act_1.clean_expected_stdout