6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 21:43:06 +01:00
firebird-qa/tests/bugs/gh_3106_test.py

231 lines
8.0 KiB
Python

#coding:utf-8
#
# id: bugs.gh_3106
# title: Many indexed reads in a compound index with NULLs [CORE2709]
# decription:
# https://github.com/FirebirdSQL/firebird/issues/3106
#
# BEFORE fix trace log was like this:
# ======
# Table Natural Index
# *******************************
# RDB$DATABASE 1
# TEST_TABLE 3 <<< this line must NOT present now.
# ======
# AFTER fix trace must contain line only for RDB$DATABASE in the table statistics section.
#
# Confirmed bug on 4.0.0.2451: trace statistics contain line with three indexed reads for test table.
# Checked on 4.0.0.2453 SS/CS: all OK, there are no indexed reads on test table in the trace log.
#
# tracker_id:
# min_versions: ['4.0.0']
# versions: 4.0
# qmid: None
import pytest
from firebird.qa import db_factory, python_act, Action
# version: 4.0
# resources: None
substitutions_1 = []
init_script_1 = """"""
db_1 = db_factory(page_size=8192, sql_dialect=3, init=init_script_1)
# test_script_1
#---
#
# import os
# import subprocess
# import time
# import re
# from subprocess import Popen
#
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
# db_conn.close()
#
# #--------------------------------------------
#
# def flush_and_close( file_handle ):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
# #--------------------------------------------
#
# def cleanup( f_names_list ):
# global os
# for i in range(len( f_names_list )):
# if type(f_names_list[i]) == file:
# del_name = f_names_list[i].name
# elif type(f_names_list[i]) == str:
# del_name = f_names_list[i]
# else:
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.')
# del_name = None
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
# #--------------------------------------------
#
# # Prepare table: add data.
#
# sql_init='''
# recreate table test_table (
# id1 integer,
# id2 integer,
# id3 integer
# );
# commit;
#
# insert into test_table (id1, id2, id3) values (1, 1, null);
# insert into test_table (id1, id2, id3) values (1, 2, null);
# insert into test_table (id1, id2, id3) values (1, 3, null);
# insert into test_table (id1, id2, id3) values (2, 1, null);
# insert into test_table (id1, id2, id3) values (2, 2, null);
# insert into test_table (id1, id2, id3) values (2, 3, null);
# commit;
#
# create index test_table_idx1 on test_table (id1,id2,id3);
# commit;
# '''
#
# sql_cmd=open(os.path.join(context['temp_directory'],'tmp_core_3106.sql'), 'w')
# sql_cmd.write(sql_init)
# flush_and_close( sql_cmd )
#
# sql_log=open(os.path.join(context['temp_directory'],'tmp_core_3106.log'),'w')
# sql_err=open(os.path.join(context['temp_directory'],'tmp_core_3106.err'),'w')
#
# subprocess.call([ context['isql_path'], dsn, "-i", sql_cmd.name],stdout=sql_log, stderr=sql_err)
#
# flush_and_close( sql_log )
# flush_and_close( sql_err )
#
# #########################
#
# txt = '''# Generated auto, do not edit!
# database=%[\\\\\\\\/]bugs.gh_3106.fdb
# {
# log_initfini = false
# enabled = true
# time_threshold = 0
# log_statement_finish = true
# print_perf = true
# }
# '''
#
# f_trc_cfg=open( os.path.join(context['temp_directory'],'tmp_trace_3106.cfg'), 'w')
# f_trc_cfg.write(txt)
# flush_and_close( f_trc_cfg )
#
# # ##############################################################
# # S T A R T T R A C E i n S E P A R A T E P R O C E S S
# # ##############################################################
#
# f_trc_log=open( os.path.join(context['temp_directory'],'tmp_trace_3106.log'), "w")
# f_trc_err=open( os.path.join(context['temp_directory'],'tmp_trace_3106.err'), "w")
#
# p_trace = Popen( [ context['fbsvcmgr_path'], 'localhost:service_mgr', 'action_trace_start' , 'trc_cfg', f_trc_cfg.name],stdout=f_trc_log,stderr=f_trc_err)
#
# sql_run='''
# set list on;
# select 1 as dummy from rdb$database r left join test_table t on t.id1 = 1 and t.id2 is null;
# '''
#
# sql_cmd=open(os.path.join(context['temp_directory'],'tmp_core_3106.sql'), 'w')
#
# sql_cmd.write(sql_run)
# flush_and_close( sql_cmd )
#
# sql_log=open(os.path.join(context['temp_directory'],'tmp_core_3106.log'),'w')
# sql_err=open(os.path.join(context['temp_directory'],'tmp_core_3106.err'),'w')
#
# subprocess.call([ context['isql_path'], dsn, "-i", sql_cmd.name],stdout=sql_log, stderr=sql_err)
#
# flush_and_close( sql_log )
# flush_and_close( sql_err )
#
# # 01-mar-2021: do NOT remove delay from here.
# # It must be at least 2 seconds, otherwise trace log will not be fulfilled
# # when run on Linux!
# time.sleep(2)
#
# # ####################################################
# # G E T A C T I V E T R A C E S E S S I O N I D
# # ####################################################
# # Save active trace session info into file for further parsing it and obtain session_id back (for stop):
#
# f_trc_lst = open( os.path.join(context['temp_directory'],'tmp_trace_3106.lst'), 'w')
# subprocess.call([context['fbsvcmgr_path'], 'localhost:service_mgr', 'action_trace_list'], stdout=f_trc_lst)
# flush_and_close( f_trc_lst )
#
# trcssn=0
# with open( f_trc_lst.name,'r') as f:
# for line in f:
# i=1
# if 'Session ID' in line:
# for word in line.split():
# if i==3:
# trcssn=word
# i=i+1
# break
#
# # Result: `trcssn` is ID of active trace session. Now we have to terminate it:
#
# # ####################################################
# # S E N D R E Q U E S T T R A C E T O S T O P
# # ####################################################
# if trcssn>0:
# fn_nul = open(os.devnull, 'w')
# subprocess.call([context['fbsvcmgr_path'], 'localhost:service_mgr', 'action_trace_stop','trc_id', trcssn], stdout=fn_nul)
# fn_nul.close()
# else:
# print('Trace session was finished forcedly: could not find its trc_id.')
#
# p_trace.terminate()
# flush_and_close( f_trc_log )
# flush_and_close( f_trc_err )
#
# p_tablestat_head = re.compile('Table\\s+Natural\\s+Index', re.IGNORECASE)
# p_tablestat_must_found = re.compile('rdb\\$database\\s+\\d+', re.IGNORECASE)
# p_tablestat_must_miss = re.compile('test_table\\s+\\d+', re.IGNORECASE)
# with open(f_trc_log.name,'r') as f:
# for line in f:
# if p_tablestat_head.search(line):
# print('Found table statistics header.')
# elif p_tablestat_must_found.search(line):
# print('Found EXPECTED line for rdb$database.')
# elif p_tablestat_must_miss.search(line):
# print('### FAILED ### found UNEXPECTED line for test_table:')
# print(line.strip())
#
# # CLEANUP
# #########
# time.sleep(1)
# cleanup( (f_trc_cfg, f_trc_lst, f_trc_log, f_trc_err, sql_log, sql_err, sql_cmd) )
#---
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
Found table statistics header.
Found EXPECTED line for rdb$database.
"""
@pytest.mark.version('>=4.0')
@pytest.mark.xfail
def test_1(act_1: Action):
pytest.fail("Test not IMPLEMENTED")