6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 21:43:06 +01:00
firebird-qa/tests/bugs/core_1347_test.py
2021-11-11 18:01:08 +01:00

189 lines
5.7 KiB
Python

#coding:utf-8
#
# id: bugs.core_1347
# title: Unexpected "cannot transliterate" error
# decription:
# 05-mar-2021. Re-implemented in order to have ability to run this test on Linux.
# Test encodes to UTF8 all needed statements (SET NAMES; CONNECT; DDL and DML) and stores this text in .sql file.
# NOTE: 'SET NAMES' contain character set that must be used for reproducing problem (WIN1251 in this test).
# Then ISQL is launched in separate (child) process which performs all necessary actions (using required charset).
# Result will be redirected to log(s) which will be opened further via codecs.open(...encoding='cp1251').
# Finally, its content will be converted to UTF8 for showing in expected_stdout.
# Checked on:
# * Windows: 4.0.0.2377, 3.0.8.33423; old version: 3.0.4.32972, 3.0.2.32703, 3.0.1.32609, 3.0.0.32483, 2.5.9.27152
# * Linux: 4.0.0.2379, 3.0.8.33415
#
# tracker_id: CORE-1347
# min_versions: ['2.5.0']
# versions: 2.5
# qmid: None
import pytest
from firebird.qa import db_factory, python_act, Action, temp_file
from pathlib import Path
# version: 2.5
# resources: None
substitutions_1 = []
init_script_1 = """"""
db_1 = db_factory(charset='WIN1251', sql_dialect=3, init=init_script_1)
# test_script_1
#---
#
# import os
# import codecs
# import subprocess
# import time
# engine = db_conn.engine_version
# db_conn.close()
#
# #--------------------------------------------
#
# def flush_and_close( file_handle ):
# # https://docs.python.org/2/library/os.html#os.fsync
# # If you're starting with a Python file object f,
# # first do f.flush(), and
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
# global os
#
# file_handle.flush()
# if file_handle.mode not in ('r', 'rb') and file_handle.name != os.devnull:
# # otherwise: "OSError: [Errno 9] Bad file descriptor"!
# os.fsync(file_handle.fileno())
# file_handle.close()
#
# #--------------------------------------------
#
# def cleanup( f_names_list ):
# global os
# for i in range(len( f_names_list )):
# if type(f_names_list[i]) == file:
# del_name = f_names_list[i].name
# elif type(f_names_list[i]) == str:
# del_name = f_names_list[i]
# else:
# print('Unrecognized type of element:', f_names_list[i], ' - can not be treated as file.')
# del_name = None
#
# if del_name and os.path.isfile( del_name ):
# os.remove( del_name )
#
# #--------------------------------------------
#
# sql_txt=''' set bail on;
# set names win1251;
# connect '%(dsn)s' user '%(user_name)s' password '%(user_password)s';
#
# set term ^ ;
# create procedure sp_test (
# p_tablename varchar(30) ,
# p_idname varchar(30) ,
# p_seqname varchar(30) ,
# p_isusefunc smallint
# )
# returns (
# column_value bigint
# )
# as
# declare variable l_maxid bigint;
# begin
# /*
# -- Находим разрыв в значениях ПК таблицы
# -- если разрыв отсутствует то дергаем секвенс
# -- p_IsUseFunc=1 - дергать секвенс ч/з ф-цию GetSeqValue
# */
# end ^
# set term ;^
# commit;
#
# set list on;
# set count on;
#
# select pr.rdb$procedure_name
# from rdb$procedures pr
# where pr.rdb$procedure_source containing '1'
# and pr.rdb$procedure_name = upper('sp_test');
#
# ''' % dict(globals(), **locals())
#
# f_run_sql = open( os.path.join(context['temp_directory'], 'tmp_5325_win1251.sql'), 'w' )
# f_run_sql.write( sql_txt.decode('utf8').encode('cp1251') )
# flush_and_close( f_run_sql )
#
# # result: file tmp_5325_win1251.sql is encoded in win1251
#
# f_run_log = open( os.path.splitext(f_run_sql.name)[0]+'.log', 'w')
# subprocess.call( [ context['isql_path'], '-q', '-i', f_run_sql.name ],
# stdout = f_run_log,
# stderr = subprocess.STDOUT
# )
# flush_and_close( f_run_log ) # result: output will be encoded in win1251
#
# with codecs.open(f_run_log.name, 'r', encoding='cp1251' ) as f:
# result_in_win1251 = f.readlines()
#
# for i in result_in_win1251:
# print( i.encode('utf8') )
#
# # cleanup:
# ###########
# cleanup( (f_run_sql, f_run_log) )
#
#
#---
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
RDB$PROCEDURE_NAME SP_TEST
Records affected: 1
"""
tmp_file_1 = temp_file('non_ascii_ddl.sql')
sql_txt = '''set bail on;
set term ^ ;
create procedure sp_test (
p_tablename varchar(30) ,
p_idname varchar(30) ,
p_seqname varchar(30) ,
p_isusefunc smallint
)
returns (
column_value bigint
)
as
declare variable l_maxid bigint;
begin
/*
-- Находим разрыв в значениях ПК таблицы
-- если разрыв отсутствует то дергаем секвенс
-- p_IsUseFunc=1 - дергать секвенс ч/з ф-цию GetSeqValue
*/
end ^
set term ;^
commit;
set list on;
set count on;
select pr.rdb$procedure_name
from rdb$procedures pr
where pr.rdb$procedure_source containing '1'
and pr.rdb$procedure_name = upper('sp_test');
'''
@pytest.mark.version('>=2.5')
def test_1(act_1: Action, tmp_file_1: Path):
tmp_file_1.write_bytes(sql_txt.encode('cp1251'))
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=['-q'], input_file=tmp_file_1, charset='win1251', io_enc='cp1251')
assert act_1.clean_expected_stdout == act_1.clean_stdout