6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 21:43:06 +01:00
firebird-qa/tests/bugs/core_2307_test.py
2021-11-15 19:01:46 +01:00

244 lines
7.7 KiB
Python

#coding:utf-8
#
# id: bugs.core_2307
# title: Incomplete API information values
# decription:
# Test creates lot of tables with names starting with 'TEST'.
# Then we retrieve from rdb$relations min and max values of this tables ID ('r_min', 'r_max').
# After each table is scanned via execute statement, statistics that we retrieve by call db_into()
# is filled with pair: {relation_id, number_of_seq_reads}.
# We have to ckeck that number of entries in this set with r_min <= relation_id <= rmax NOT LESS than
# number of created tables.
# Also, scan for every table should take at least 1 sequential read - and this is checked too.
#
# NOTE: we can SKIP checking concrete values of 'number_of_seq_reads', it does not matter in this test!
#
# See also:
# http://pythonhosted.org/fdb/reference.html#fdb.Connection.database_info
#
# Info about 'isc_info_read_seq_count':
# Number of sequential database reads, that is, the number of sequential table scans (row reads)
# Reported per table.
# Calculated since the current database attachment started.
#
# Confirmed bug on WI-V2.1.2.18118: db_into() received imcompleted data (i.e. not for all tables).
#
# tracker_id: CORE-2307
# min_versions: ['2.5.0']
# versions: 2.5
# qmid: None
import pytest
from firebird.qa import db_factory, python_act, Action
from firebird.driver import DbWriteMode, DbInfoCode
# version: 2.5
# resources: None
substitutions_1 = []
init_script_1 = """"""
db_1 = db_factory(page_size=4096, sql_dialect=3, init=init_script_1)
# test_script_1
#---
# import os
# import subprocess
# import fdb
#
# db_file=db_conn.database_name
# db_conn.close()
#
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
#
#
# # Change FW to OFF in order to speed up initial data filling:
# ##################
#
# fn_nul = open(os.devnull, 'w')
# subprocess.call([ context['fbsvcmgr_path'], "localhost:service_mgr",
# "action_properties", "prp_write_mode", "prp_wm_async",
# "dbname", db_file ],
# stdout = fn_nul,
# stderr = subprocess.STDOUT
# )
# fn_nul.close()
#
# # prepare DB for testing: create lot of tables:
# ###############################################
# f_work_sql=open( os.path.join(context['temp_directory'],'tmp_work_2307.sql'), 'w')
#
# num_of_tables = 1000
#
# sql_ddl='''
# set term ^;
# Execute block as
# declare variable i integer = 0;
# begin
# while ( i < %(num_of_tables)s )
# do
# begin
# execute statement 'create table test' || cast(:i as varchar(5)) || ' (c integer)';
# i = i + 1 ;
# end
# end ^
# commit ^
#
# execute block as
# declare variable i integer = 0;
# begin
# while (i < %(num_of_tables)s )
# do
# begin
# execute statement 'insert into test' || cast(:i as varchar(5)) || ' (c) values (1)';
# i = i + 1 ;
# end
# end
# ^
# set term ;^
# commit;
# ''' % locals()
#
# f_work_sql.write(sql_ddl)
# f_work_sql.close()
#
# f_work_log=open( os.path.join(context['temp_directory'],'tmp_work_2307.log'), 'w')
# f_work_err=open( os.path.join(context['temp_directory'],'tmp_work_2307.err'), 'w')
# subprocess.call( [ context['isql_path'], dsn, "-i", f_work_sql.name],
# stdout = f_work_log,
# stderr = f_work_err
# )
#
# f_work_log.close()
# f_work_err.close()
#
# sql_dml = '''
# execute block returns(r_min int, r_max int) as
# declare n varchar(31);
# declare i integer;
# begin
# for
# select min(rdb$relation_id),max(rdb$relation_id)
# from rdb$relations
# where rdb$relation_name starting with upper('test')
# into r_min, r_max
# do
# suspend;
#
# for
# select rdb$relation_name
# from rdb$relations
# -- 4 debug only! >> rows 100
# into :n
# do
# execute statement 'select 1 as k from ' || :n || ' rows 1' into :i;
# end
# '''
#
# con = fdb.connect(dsn=dsn)
# cur = con.cursor()
# cur.execute(sql_dml)
# r_min=99999999
# r_max=-9999999
# for r in cur:
# r_min=r[0] # minimal ID in rdb$relations for user tables ('TEST1')
# r_max=r[1] # maximal ID in rdb$relations for user tables ('TESTnnnnn')
#
# info = con.db_info(fdb.isc_info_read_seq_count)
# cnt=0
# for k,v in info.items():
# cnt = cnt+1 if k >= r_min and k <= r_max and v >= 1 else cnt
#
# print( 'OK' if cnt >= num_of_tables else 'FAILED: db_info(fdb.isc_info_read_seq_count) contains only '+str(cnt)+' entries for scanned '+str(num_of_tables)+ ' user tables.' )
#
# #for k, v in sorted(info.items()):
# # print('page: '+str(k).zfill(8) + ', num of seq_reads: '+str(v).zfill(8) )
#
# # Cleanup.
# ###############################
#
# f_list=[f_work_sql,f_work_log,f_work_err]
# for i in range(len(f_list)):
# if os.path.isfile(f_list[i].name):
# os.remove(f_list[i].name)
#
#
#---
act_1 = python_act('db_1', substitutions=substitutions_1)
@pytest.mark.version('>=2.5')
def test_1(act_1: Action):
# Change FW to OFF in order to speed up initial data filling:
with act_1.connect_server() as srv:
srv.database.set_write_mode(database=str(act_1.db.db_path), mode=DbWriteMode.ASYNC)
# prepare DB for testing: create lot of tables:
num_of_tables = 1000
sql_ddl = f'''
set term ^;
Execute block as
declare variable i integer = 0;
begin
while ( i < {num_of_tables} )
do
begin
execute statement 'create table test' || cast(:i as varchar(5)) || ' (c integer)';
i = i + 1 ;
end
end ^
commit ^
execute block as
declare variable i integer = 0;
begin
while (i < {num_of_tables} )
do
begin
execute statement 'insert into test' || cast(:i as varchar(5)) || ' (c) values (1)';
i = i + 1 ;
end
end
^
set term ;^
commit;
'''
act_1.isql(switches=[], input=sql_ddl)
sql_dml = '''
execute block returns(r_min int, r_max int) as
declare n varchar(31);
declare i integer;
begin
for
select min(rdb$relation_id),max(rdb$relation_id)
from rdb$relations
where rdb$relation_name starting with upper('test')
into r_min, r_max
do
suspend;
for
select rdb$relation_name
from rdb$relations
-- 4 debug only! >> rows 100
into :n
do
execute statement 'select 1 as k from ' || :n || ' rows 1' into :i;
end
'''
with act_1.db.connect() as con:
c = con.cursor()
c.execute(sql_dml)
r_min = 99999999
r_max = -9999999
for r in c:
r_min = r[0] # minimal ID in rdb$relations for user tables ('TEST1')
r_max = r[1] # maximal ID in rdb$relations for user tables ('TESTnnnnn')
#
info = con.info.get_info(DbInfoCode.READ_SEQ_COUNT)
cnt = 0
for k, v in info.items():
cnt = cnt + 1 if k >= r_min and k <= r_max and v >= 1 else cnt
assert cnt >= num_of_tables