6
0
mirror of https://github.com/FirebirdSQL/firebird-qa.git synced 2025-01-22 13:33:07 +01:00

More python tests and some enhancements

This commit is contained in:
Pavel Císař 2021-12-09 19:26:42 +01:00
parent 76b49666fa
commit 0711661a93
53 changed files with 2753 additions and 1599 deletions

BIN
files/core_5207.zip Normal file

Binary file not shown.

Binary file not shown.

BIN
files/core_5618.zip Normal file

Binary file not shown.

BIN
files/core_5637.zip Normal file

Binary file not shown.

BIN
files/core_5659.zip Normal file

Binary file not shown.

Binary file not shown.

View File

@ -25,7 +25,7 @@
import pytest
from firebird.qa import db_factory, python_act, Action
from firebird.driver import DbWriteMode, SrvRestoreFlag
from firebird.driver import SrvRestoreFlag
#from difflib import unified_diff
from io import BytesIO
@ -521,8 +521,7 @@ act_1 = python_act('db_1', substitutions=substitutions_1)
@pytest.mark.version('>=4.0')
def test_1(act_1: Action):
# CHANGE FW to OFF
with act_1.connect_server() as srv:
srv.database.set_write_mode(database=act_1.db.db_path, mode=DbWriteMode.ASYNC)
act_1.db.set_async_write()
# 1. FIRST RUN DML_TEST
act_1.script = test_script_1
act_1.execute()

View File

@ -30,7 +30,7 @@
import pytest
from firebird.qa import db_factory, python_act, Action
from firebird.driver import DbWriteMode, DbInfoCode
from firebird.driver import DbInfoCode
# version: 2.5
# resources: None
@ -172,8 +172,7 @@ act_1 = python_act('db_1', substitutions=substitutions_1)
@pytest.mark.version('>=2.5')
def test_1(act_1: Action):
# Change FW to OFF in order to speed up initial data filling:
with act_1.connect_server() as srv:
srv.database.set_write_mode(database=act_1.db.db_path, mode=DbWriteMode.ASYNC)
act_1.db.set_async_write()
# prepare DB for testing: create lot of tables:
num_of_tables = 1000
sql_ddl = f'''

View File

@ -279,6 +279,3 @@ def test_1(act_1: Action):
act_1.expected_stdout = expected_stdout_1
act_1.trace_to_stdout()
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -42,7 +42,6 @@
import pytest
from firebird.qa import db_factory, python_act, Action
from firebird.driver import DbWriteMode
# version: 2.5.2
# resources: None
@ -458,8 +457,7 @@ trace_1 = ['log_transactions = true',
def test_1(act_1: Action, capsys):
NUM_ROWS_TO_BE_ADDED = 45000
# Change FW to OFF in order to speed up initial data filling
with act_1.connect_server() as srv:
srv.database.set_write_mode(database=act_1.db.db_path, mode=DbWriteMode.ASYNC)
act_1.db.set_async_write()
# Make initial data filling into PERMANENT table for retrieving later number of data pages
# (it should be the same for any kind of tables, including GTTs):
with act_1.db.connect() as con:

View File

@ -56,7 +56,7 @@ import re
import subprocess
from datetime import datetime
from firebird.qa import db_factory, python_act, Action
from firebird.driver import DbWriteMode, ShutdownMethod, ShutdownMode
from firebird.driver import ShutdownMethod, ShutdownMode
# version: 3.0
# resources: None
@ -572,8 +572,7 @@ def test_1(act_1: Action, capsys):
"""
act_1.isql(switches=[], input=sql_ddl)
# Temporay change FW to OFF in order to make DML faster:
with act_1.connect_server() as srv:
srv.database.set_write_mode(database=act_1.db.db_path, mode=DbWriteMode.ASYNC)
act_1.db.set_async_write()
#
sql_data = f"""
set term ^;
@ -605,8 +604,7 @@ def test_1(act_1: Action, capsys):
act_1.reset()
act_1.isql(switches=['-nod'], input=sql_data)
# Restore FW to ON (make sweep to do its work "harder"):
with act_1.connect_server() as srv:
srv.database.set_write_mode(database=act_1.db.db_path, mode=DbWriteMode.SYNC)
act_1.db.set_async_write()
# Trace
with act_1.trace(db_events=trace_1):
# Traced action

View File

@ -373,5 +373,5 @@ expected_stdout_1 = """
@pytest.mark.version('>=4.0')
def test_1(db_1):
pytest.skip("Test requires 3rd party encryption plugin")
pytest.skip("Requires encryption plugin")
#pytest.fail("Test not IMPLEMENTED")

View File

@ -33,7 +33,6 @@ import subprocess
import time
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
from firebird.driver import DbWriteMode
# version: 3.0
# resources: None
@ -254,8 +253,7 @@ heavy_output_1 = temp_file('heavy_script.out')
@pytest.mark.version('>=3.0')
def test_1(act_1: Action, heavy_script_1: Path, heavy_output_1: Path, capsys):
# Change database FW to OFF in order to increase speed of insertions and output its header info
with act_1.connect_server() as srv:
srv.database.set_write_mode(database=act_1.db.db_path, mode=DbWriteMode.ASYNC)
act_1.db.set_async_write()
# Preparing script for ISQL that will do 'heavy DML'
heavy_script_1.write_text("""
recreate sequence g;

View File

@ -34,7 +34,6 @@
import pytest
from zipfile import Path
from firebird.qa import db_factory, python_act, Action
from firebird.driver import DbWriteMode
# version: 3.0
# resources: None
@ -119,8 +118,7 @@ expected_stdout_1 = """
@pytest.mark.version('>=3.0')
def test_1(act_1: Action):
with act_1.connect_server() as srv:
srv.database.set_write_mode(database=act_1.db.db_path, mode=DbWriteMode.ASYNC)
act_1.db.set_async_write()
# Read FNC scripts from zip file and execute it
script_file = Path(act_1.vars['files'] / 'core_4880.zip',
at='core_4880_fnc.tmp')

View File

@ -174,5 +174,5 @@ expected_stdout_1 = """
@pytest.mark.version('>=3.0')
def test_1(db_1):
pytest.skip("Test depends on 3rd party encryption plugin")
pytest.skip("Requires encryption plugin")
#pytest.fail("Test not IMPLEMENTED")

View File

@ -26,7 +26,7 @@
import pytest
from firebird.qa import db_factory, python_act, Action
from firebird.driver import DbWriteMode, TPB, Isolation
from firebird.driver import TPB, Isolation
# version: 2.5.6
# resources: None
@ -85,8 +85,7 @@ expected_stdout_1 = """
@pytest.mark.version('>=2.5.6')
def test_1(act_1: Action):
with act_1.connect_server() as srv:
srv.database.set_write_mode(database=act_1.db.db_path, mode=DbWriteMode.ASYNC)
act_1.db.set_async_write()
#
custom_tpb = TPB(isolation=Isolation.CONCURRENCY).get_buffer()
with act_1.db.connect(no_gc=True) as con:

View File

@ -21,7 +21,6 @@
import pytest
from firebird.qa import db_factory, python_act, Action
from firebird.driver import DbWriteMode
# version: 4.0
# resources: None
@ -126,8 +125,7 @@ test_sript_1 = """
@pytest.mark.version('>=4.0')
def test_1(act_1: Action):
with act_1.connect_server() as srv:
srv.database.set_write_mode(database=act_1.db.db_path, mode=DbWriteMode.ASYNC)
act_1.db.set_async_write()
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=[], input=test_sript_1)
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -26,7 +26,6 @@
import pytest
from firebird.qa import db_factory, python_act, Action
from firebird.driver import DbWriteMode
# version: 2.5.7
# resources: None
@ -267,8 +266,7 @@ test_script_1 = f"""
def test_1(act_1: Action):
if act_1.get_server_architecture() == 'SS':
# Bucgcheck is reproduced on 2.5.7.27030 only when FW = OFF
with act_1.connect_server() as srv:
srv.database.set_write_mode(database=act_1.db.db_path, mode=DbWriteMode.ASYNC)
act_1.db.set_async_write()
# Test
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=[], input=test_script_1)

View File

@ -33,7 +33,7 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 3.0.2
# resources: None
@ -304,16 +304,88 @@ db_1 = db_factory(page_size=8192, sql_dialect=3, init=init_script_1)
# cleanup( (f_trc_cfg, f_trc_lst, f_trc_log, f_trc_err, sql_log, sql_err, sql_cmd) )
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
PLAN (TEST ORDER TEST_F01_ID)
Number of fetches: acceptable.
"""
"""
FETCHES_THRESHOLD = 80
init_sql_1 = """
recreate table test
(
id int not null,
f01 int,
f02 int
);
set term ^;
create or alter procedure sp_add_init_data(a_rows_to_add int)
as
declare n int;
declare i int = 0;
begin
n = a_rows_to_add;
while (i < n) do
begin
insert into test(id, f01, f02) values(:i, nullif(mod(:i, :n/20), 0), iif(mod(:i,3)<2, 0, 1))
returning :i+1 into i;
end
end
^
set term ^;
commit;
execute procedure sp_add_init_data(300000);
commit;
create index test_f01_id on test(f01, id);
create index test_f02_only on test(f02);
commit;
"""
test_script_1 = """
set list on;
select count(*) cnt_check
from (
select *
from test
where f01 -- ###################################################################
IS NULL -- <<< ::: NB ::: we check here 'f01 is NULL', exactly as ticket says.
and f02=0 -- ###################################################################
order by f01, id
) ;
"""
trace_1 = ['time_threshold = 0',
'log_statement_finish = true',
'print_plan = true',
'print_perf = true',
'log_initfini = false',
]
@pytest.mark.version('>=3.0.2')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
act_1.db.set_async_write()
act_1.isql(switches=[], input=init_sql_1)
#
with act_1.trace(db_events=trace_1):
act_1.reset()
act_1.isql(switches=[], input=test_script_1)
# Process trace
run_with_plan = ''
num_of_fetches = 99999999
for line in act_1.trace_log:
if line.lower().startswith('plan ('):
run_with_plan = line.strip().upper()
elif 'fetch(es)' in line:
words = line.split()
for k in range(len(words)):
if words[k].startswith('fetch'):
num_of_fetches = int(words[k-1])
# Check
assert run_with_plan == 'PLAN (TEST ORDER TEST_F01_ID)'
assert num_of_fetches < FETCHES_THRESHOLD

View File

@ -14,13 +14,16 @@
#
# 03-mar-2021: replaced 'xnet' with 'localhost' in order have ability to run this test on Linux.
#
# [pcisar] 8.12.2021
# Fails with "no permission for remote access to database security.db" on Linux FB 4.0
#
# tracker_id: CORE-5496
# min_versions: ['3.0.2']
# versions: 3.0.2
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 3.0.2
# resources: None
@ -68,17 +71,42 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
SEC$USER_NAME SYSDBA
SEC$PLUGIN Srp
Records affected: 1
"""
"""
test_script_1 = """
connect 'localhost:security.db';
create or alter user foo password '123' grant admin role using plugin Srp;
create or alter user rio password '123' grant admin role using plugin Srp;
create or alter user bar password '123' grant admin role using plugin Srp;
commit;
grant rdb$admin to sysdba granted by foo;
grant rdb$admin to sysdba granted by rio;
grant rdb$admin to sysdba granted by bar;
commit;
set list on;
set count on;
select sec$user_name, sec$plugin from sec$users where upper(sec$user_name) = upper('sysdba') and upper(sec$plugin) = upper('srp');
commit;
drop user foo using plugin Srp;
drop user rio using plugin Srp;
drop user bar using plugin Srp;
commit;
quit;
"""
@pytest.mark.version('>=3.0.2')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
pytest.skip("Requires remote access to security.db")
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=['-q', '-b'], input=test_script_1)
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -32,18 +32,33 @@
#
# Checked on 3.0.2.32702 (CS/SC/SS), 4.0.0.563 (CS/SC/SS)
#
# [pcisar] 8.12.2021
# Reimplementation does not work as expected on Linux 4.0
# gstat output:
# Data pages: total 97, encrypted 0, non-crypted 97
# Index pages: total 85, encrypted 0, non-crypted 85
# Blob pages: total 199, encrypted 0, non-crypted 199
# Generator pages: total 1, encrypted 0, non-crypted 1
# Validation does not report BLOB page errors, only data and index corruptions.
#
# tracker_id: CORE-5501
# min_versions: ['3.0.2']
# versions: 3.0.2
# qmid: None
from __future__ import annotations
from typing import Dict
import pytest
from firebird.qa import db_factory, isql_act, Action
import re
from struct import unpack_from
from firebird.qa import db_factory, python_act, Action
from firebird.driver import Connection
# version: 3.0.2
# resources: None
substitutions_1 = [('total \\d+,', 'total'), ('non-crypted \\d+', 'non-crypted'), ('crypted \\d+', 'crypted')]
substitutions_1 = [('total \\d+,', 'total'),
('non-crypted \\d+', 'non-crypted'), ('crypted \\d+', 'crypted')]
init_script_1 = """
alter database drop linger;
@ -62,7 +77,7 @@ init_script_1 = """
from rdb$types
rows 100;
commit;
"""
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
@ -454,19 +469,250 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
Data pages: total 63, encrypted 0, non-crypted 63
Index pages: total 88, encrypted 0, non-crypted 88
Blob pages: total 199, encrypted 0, non-crypted 199
Other pages: total 115, ENCRYPTED 3 (DB problem!), non-crypted 112
Detect all THREE page types with problem ? => YES
"""
Detected all THREE page types with problem => YES
"""
PAGE_TYPES = {0: "undef/free",
1: "DB header",
2: "PIP",
3: "TIP",
4: "Pntr Page",
5: "Data Page",
6: "Indx Root",
7: "Indx Data",
8: "Blob Page",
9: "Gens Page",
10: "SCN" # only for ODS>=12
}
def fill_dbo(con: Connection, map_dbo: Dict):
cur = con.cursor()
sql = """
select rel_id, rel_name, idx_id, idx_name
from (
select
rr.rdb$relation_id rel_id, -- 0
rr.rdb$relation_name rel_name, -- 1
-1 idx_id, -- 2
'' idx_name, -- 3
rr.rdb$relation_type rel_type,
rr.rdb$system_flag sys_flag
from rdb$relations rr
union all
select
rr.rdb$relation_id rel_id, -- 0
rr.rdb$relation_name rel_name, -- 1
coalesce(ri.rdb$index_id-1,-1) idx_id, -- 2
coalesce(ri.rdb$index_name,'') idx_name, -- 3
rr.rdb$relation_type rel_type,
rr.rdb$system_flag sys_flag
from rdb$relations rr
join rdb$indices ri on
rr.rdb$relation_name = ri.rdb$relation_name
) r
where
coalesce(r.rel_type,0) = 0 -- exclude views, GTT and external tables
and r.sys_flag is distinct from 1
"""
cur.execute(sql)
for r in cur:
map_dbo[r[0], r[2]] = (r[1].strip(), r[3].strip())
def parse_page_header(con: Connection, page_number: int, map_dbo: Dict):
page_buffer = con.info.get_page_content(page_number)
# dimitr, 20.01.2017 ~13:00
# all *CHAR = 1 byte, *SHORT = 2 bytes, *LONG = 4 bytes.
# https://docs.python.org/2/library/struct.html
# struct.unpack_from(fmt, buffer[, offset=0])
# Unpack the buffer according to the given format.
# The result is a tuple even if it contains exactly one item.
# The buffer must contain at least the amount of data required by the format
# len(buffer[offset:]) must be at least calcsize(fmt).
# First character of the format string can be used to indicate the byte order,
# size and alignment of the packed data
# Native byte order is big-endian or little-endian:
# < little-endian
# > big-endian
# Intel x86 and AMD64 (x86-64) are little-endian
# Use sys.byteorder to check the endianness of your system:
# https://docs.python.org/2/library/struct.html#format-characters
# c char string of length 1
# b signed char
# B unsigned char
# h short
# H unsigned short integer
# i int integer 4
# I unsigned int integer 4
# l long (4)
# L unsigned long (4)
# q long long (8)
# Q unsigned long long
page_type = unpack_from('<b', page_buffer)[0]
relation_id = -1
index_id = -1
segment_cnt = -1 # for Data page: number of record segments on page
index_id = -1
ix_level = -1
btr_len = -1
if page_type == 4:
# POINTER pege:
# *pag* dpg_header=16, SLONG dpg_sequence=4, SLONG ppg_next=4, USHORT ppg_count=2 ==> 16+4+4+2=26
# struct pointer_page
# {
# pag ppg_header;
# SLONG ppg_sequence; // Sequence number in relation
# SLONG ppg_next; // Next pointer page in relation
# USHORT ppg_count; // Number of slots active
# USHORT ppg_relation; // Relation id
# USHORT ppg_min_space; // Lowest slot with space available
# USHORT ppg_max_space; // Highest slot with space available
# SLONG ppg_page[1]; // Data page vector
# };
relation_id = unpack_from('<H', page_buffer, 26)[0] # 'H' ==> USHORT
elif page_type == 5:
# DATA page:
# *pag* dpg_header=16, SLONG dpg_sequence=4 ==> 16+4 = 20:
# struct data_page
# {
# 16 pag dpg_header;
# 4 SLONG dpg_sequence; // Sequence number in relation
# 2 USHORT dpg_relation; // Relation id
# 2 USHORT dpg_count; // Number of record segments on page
# struct dpg_repeat
# {
# USHORT dpg_offset; // Offset of record fragment
# USHORT dpg_length; // Length of record fragment
# } dpg_rpt[1];
# };
relation_id = unpack_from('<H', page_buffer, 20)[0] # 'H' ==> USHORT
segment_cnt = unpack_from('<H', page_buffer, 22)[0]
elif page_type == 6:
# Index root page
# struct index_root_page
# {
# pag irt_header;
# USHORT irt_relation; // relation id (for consistency)
relation_id = unpack_from('<H', page_buffer, 16)[0] # 'H' ==> USHORT
elif page_type == 7:
# B-tree page ("bucket"):
# struct btree_page
# {
# 16 pag btr_header;
# 4 SLONG btr_sibling; // right sibling page
# 4 SLONG btr_left_sibling; // left sibling page
# 4 SLONG btr_prefix_total; // sum of all prefixes on page
# 2 USHORT btr_relation; // relation id for consistency
# 2 USHORT btr_length; // length of data in bucket
# 1 UCHAR btr_id; // index id for consistency
# 1 UCHAR btr_level; // index level (0 = leaf)
# btree_nod btr_nodes[1];
# };
relation_id = unpack_from('<H', page_buffer, 28)[0] # 'H' ==> USHORT
btr_len = unpack_from('<H', page_buffer, 30)[0] # 'H' ==> USHORT // length of data in bucket
index_id = unpack_from('<B', page_buffer, 32)[0] # 'B' => UCHAR
ix_level = unpack_from('<B', page_buffer, 33)[0]
#
if index_id>=0 and (relation_id, index_id) in map_dbo:
u = map_dbo[ relation_id, index_id ]
page_info = f'{PAGE_TYPES[page_type].ljust(9)}, {u[1].strip()}, data_len={btr_len}, lev={ix_level}'
#page_info = ''.join((PAGE_TYPES[page_type].ljust(9), ', ', u[1].strip(), ', data_len=', str(btr_len), ', lev=', str(ix_level))) # 'Indx Page, <index_name>, <length of data in bucket>'
elif (relation_id, -1) in map_dbo:
u = map_dbo[ relation_id, -1 ]
if page_type == 5:
page_info = f'{PAGE_TYPES[page_type].ljust(9)}, {u[0].strip()}, segments on page: {segment_cnt}'
#page_info = ''.join( ( PAGE_TYPES[page_type].ljust(9),', ',u[0].strip(),', segments on page: ',str(segment_cnt) ) ) # '<table_name>, segments on page: NNN' - for Data page
else:
page_info = f'{PAGE_TYPES[page_type].ljust(9)}, {u[0].strip()}'
#page_info = ''.join( ( PAGE_TYPES[page_type].ljust(9),', ',u[0].strip() ) ) # '<table_name>' - for Pointer page
elif relation_id == -1:
page_info = PAGE_TYPES[page_type].ljust(9)
else:
page_info = f'UNKNOWN; {PAGE_TYPES[page_type].ljust(9)}; relation_id {relation_id}; index_id {index_id}'
#page_info = ''.join( ('UNKNOWN; ',PAGE_TYPES[page_type].ljust(9),'; relation_id ', str(relation_id), '; index_id ', str(index_id)) )
return (page_type, relation_id, page_info)
@pytest.mark.version('>=3.0.2')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, capsys):
map_dbo = {}
sql = """
select p.rdb$relation_id, p.rdb$page_number
from rdb$pages p
join rdb$relations r on p.rdb$relation_id = r.rdb$relation_id
where r.rdb$relation_name=upper('TEST') and p.rdb$page_type = 4
order by p.rdb$page_number
rows 1
"""
with act_1.db.connect() as con:
fill_dbo(con, map_dbo)
c = con.cursor()
rel_id, pp1st = c.execute(sql).fetchone()
# Found first page for each of three types: Data, Index and Blob
# (loop starts from first PointerPage of table 'TEST')
brk_datapage = brk_indxpage = brk_blobpage = -1
for i in range(pp1st, con.info.pages_allocated):
page_type, relation_id, page_info = parse_page_header(con, i, map_dbo)
#print('page:',i, '; page_type:',page_type, '; rel_id:',relation_id,';', page_info)
if relation_id == 128 and page_type == 5:
brk_datapage = i
elif relation_id == 128 and page_type == 7:
brk_indxpage = i
elif page_type == 8:
brk_blobpage = i
if brk_datapage > 0 and brk_indxpage > 0 and brk_blobpage > 0:
break
#
# Store binary content of .fdb for futher restore
raw_db_content = act_1.db.db_path.read_bytes()
# Make pages damaged
# 0xFFAACCEEBB0000CC 0xDDEEAADDCC00DDEE
bw = bytearray(b'\\xff\\xaa\\xcc\\xee\\xbb\\x00\\x00\\xcc\\xdd\\xee\\xaa\\xdd\\xcc\\x00\\xdd\\xee')
with open(act_1.db.db_path, 'r+b') as w:
for brk_page in (brk_datapage, brk_indxpage, brk_blobpage):
w.seek(brk_page * con.info.page_size)
w.write(bw)
#
act_1.gstat(switches=['-e'])
pattern = re.compile('(data|index|blob|other)\\s+pages[:]{0,1}\\s+total[:]{0,1}\\s+\\d+[,]{0,1}\\s+encrypted[:]{0,1}\\s+\\d+.*[,]{0,1}non-crypted[:]{0,1}\\s+\\d+.*', re.IGNORECASE)
for line in act_1.stdout.splitlines():
if pattern.match(line.strip()):
print(line.strip())
# Validate DB - ensure that there are errors in pages
# RESULT: validation log should contain lines with problems about three diff. page types:
# expected data encountered unknown
# expected index B-tree encountered unknown
# expected blob encountered unknown
with act_1.connect_server() as srv:
srv.database.validate(database=act_1.db.db_path, lock_timeout=1)
validation_log = srv.readlines()
# Process validation log
data_page_problem = indx_page_problem = blob_page_problem = False
for line in validation_log:
if 'expected data' in line:
data_page_problem = True
elif 'expected index B-tree' in line:
indx_page_problem = True
elif 'expected blob' in line:
blob_page_problem = True
print(f"Detected all THREE page types with problem => {'YES' if data_page_problem and indx_page_problem and blob_page_problem else 'NO'}")
# restore DB content
act_1.db.db_path.write_bytes(raw_db_content)
# Check
act_1.reset()
act_1.expected_stdout = expected_stdout_1
act_1.stdout = capsys.readouterr().out
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -22,12 +22,14 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 4.0
# resources: None
substitutions_1 = [('[ \t]+', ' ')]
#substitutions_1 = []
init_script_1 = """
recreate view v_test as select 1 x from rdb$database;
@ -76,7 +78,6 @@ init_script_1 = """
;
commit;
insert into test_anna default values;
insert into test_beta default values;
insert into test_ciao default values;
@ -95,8 +96,7 @@ init_script_1 = """
insert into test_won2 default values;
insert into test_w_n3 default values;
commit;
"""
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
@ -110,8 +110,7 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# os.environ["ISC_USER"] = user_name
# os.environ["ISC_PASSWORD"] = user_password
#
# # dsn localhost/3400:C:\\FBTESTING\\qa
# bt-repo mpugs.core_NNNN.fdb
# # dsn localhost/3400:C:\\FBTESTING\\qa\\fbt-repo\\tmp\\bugs.core_NNNN.fdb
# # db_conn.database_name C:\\FBTESTING\\QA\\FBT-REPO\\TMP\\BUGS.CORE_NNNN.FDB
# # $(DATABASE_LOCATION)... C:/FBTESTING/qa/fbt-repo/tmp/bugs.core_NNN.fdb
#
@ -163,7 +162,8 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
0 test_doc% doca
@ -179,11 +179,49 @@ expected_stdout_1 = """
0 test_d% test_d(o|u)% dina
1 test_(a|b)[[:ALPHA:]]+a test_d(o|u)% anna
1 test_(a|b)[[:ALPHA:]]+a test_d(o|u)% beta
"""
"""
# this_fbk=os.path.join(context['temp_directory'],'tmp_5538.fbk')
# test_res=os.path.join(context['temp_directory'],'tmp_5538.tmp')
fbk_file_1 = temp_file('core_5538.fbk')
fdb_file_1 = temp_file('core_5538.fdb')
@pytest.mark.version('>=4.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, fbk_file_1: Path, fdb_file_1: Path, capsys):
# 1. Check that we can use patterns for include data only from several selected tables:
for i, p in enumerate(['test_doc%', 'test_d(o|u)ra', '%_w(i|o|_)n[[:DIGIT:]]', 'test_a[[:ALPHA:]]{1,}a']):
act_1.reset()
act_1.gbak(switches=['-b', act_1.db.dsn, str(fbk_file_1), '-include', p])
act_1.reset()
act_1.gbak(switches=['-rep', str(fbk_file_1), f'localhost:{fdb_file_1}'])
act_1.reset()
act_1.isql(switches=[f'localhost:{fdb_file_1}'], connect_db=False,
input=f"set heading off; select {i} ptn_indx, q'{{{p}}}' as ptn_text, v.* from v_test v;")
print(act_1.stdout)
# 2. Check interaction between -INCLUDE_DATA and -SKIP_DATA switches for a table:
# We must check only conditions marked by '**':
# +--------------------------------------------------+
# | | INCLUDE_DATA |
# | |--------------------------------------|
# | SKIP_DATA | NOT SET | MATCH | NOT MATCH |
# +-----------+------------+------------+------------+
# | NOT SET | included | included | excluded | <<< these rules can be skipped in this test
# | MATCH | excluded |**excluded**|**excluded**|
# | NOT MATCH | included |**included**|**excluded**|
# +-----------+------------+------------+------------+
skip_ptn = 'test_d(o|u)%'
for i, p in enumerate(['test_d%', 'test_(a|b)[[:ALPHA:]]+a']):
act_1.reset()
act_1.gbak(switches=['-b', act_1.db.dsn, str(fbk_file_1), '-include_data', p, '-skip_data', skip_ptn])
act_1.reset()
act_1.gbak(switches=['-rep', str(fbk_file_1), f'localhost:{fdb_file_1}'])
act_1.reset()
act_1.isql(switches=[f'localhost:{fdb_file_1}'], connect_db=False,
input=f"set heading off; select {i} ptn_indx, q'{{{p}}}' as include_ptn, q'{{{skip_ptn}}}' as exclude_ptn, v.* from v_test v;")
print(act_1.stdout)
# Check
act_1.reset()
act_1.expected_stdout = expected_stdout_1
act_1.stdout = capsys.readouterr().out
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -20,7 +20,8 @@
# qmid:
import pytest
from firebird.qa import db_factory, isql_act, Action
from decimal import Decimal
from firebird.qa import db_factory, python_act, Action
# version: 3.0.3
# resources: None
@ -62,17 +63,21 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
X -Infinity
Y Infinity
Records affected: 1
"""
"""
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
with act_1.db.connect() as con:
c = con.cursor()
c.execute('insert into test(x, y) values(?, ?)', [Decimal('-Infinity'), Decimal('Infinity')])
con.commit()
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=[], input="set list on; set count on; select * from test;")
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -30,12 +30,14 @@
# qmid:
import pytest
from firebird.qa import db_factory, isql_act, Action
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 3.0.3
# resources: None
substitutions_1 = [('[0-9][0-9]:[0-9][0-9]:[0-9][0-9].[0-9][0-9]', ''), ('Relation [0-9]{3,4}', 'Relation')]
substitutions_1 = [('[0-9][0-9]:[0-9][0-9]:[0-9][0-9].[0-9][0-9]', ''),
('Relation [0-9]{3,4}', 'Relation')]
init_script_1 = """
recreate table test (
@ -45,7 +47,7 @@ init_script_1 = """
);
insert into test values (1, 'format1opqwertyuiopqwertyuiop');
commit;
"""
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
@ -157,22 +159,43 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# cleanup( (f_bkrs_err, f_val_log, f_val_err, f_bkup_tmp, f_rest_tmp) )
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
expected_stdout_1 = """
X1 1
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1_a = """
X1 1
"""
expected_stdout_1_b = """
Validation started
Relation 128 (TEST)
process pointer page 0 of 1
Index 1 (RDB$PRIMARY1)
Relation 128 (TEST) is ok
Validation finished
"""
"""
fbk_file_1 = temp_file('core_5576.fbk')
fdb_file_1 = temp_file('core_5576.fdb')
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, fbk_file_1: Path, fdb_file_1: Path):
act_1.gbak(switches=['-b', act_1.db.dsn, str(fbk_file_1)])
act_1.reset()
act_1.gbak(switches=['-rep', str(fbk_file_1), f'localhost:{fdb_file_1}'])
#
for i in range(2): # Run isql twice!
act_1.reset()
act_1.expected_stdout = expected_stdout_1_a
act_1.isql(switches=[f'localhost:{fdb_file_1}'], connect_db=False,
input='set list on;select 1 x1 from test where i=1 with lock;')
assert act_1.clean_stdout == act_1.clean_expected_stdout
# Validate the database
act_1.reset()
act_1.expected_stdout = expected_stdout_1_b
with act_1.connect_server() as srv:
srv.database.validate(database=fdb_file_1)
act_1.stdout = ''.join(srv.readlines())
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -38,7 +38,11 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
import re
import zipfile
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
from firebird.driver import SrvRestoreFlag
# version: 2.5.8
# resources: None
@ -155,12 +159,23 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
fbk_file_1 = temp_file('core_5579_broken_nn.fbk')
fdb_file_1 = temp_file('core_5579_broken_nn.fdb')
@pytest.mark.version('>=2.5.8')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, fdb_file_1: Path, fbk_file_1: Path):
pattern = re.compile('[.*]*request\\s+synchronization\\s+error\\.*', re.IGNORECASE)
zipped_fbk_file = zipfile.Path(act_1.vars['files'] / 'core_5579_broken_nn.zip',
at='core_5579_broken_nn.fbk')
fbk_file_1.write_bytes(zipped_fbk_file.read_bytes())
with act_1.connect_server() as srv:
srv.database.restore(database=fdb_file_1, backup=fbk_file_1,
flags=SrvRestoreFlag.ONE_AT_A_TIME | SrvRestoreFlag.CREATE)
# before this ticket was fixed restore fails with: request synchronization error
for line in srv:
if pattern.search(line):
pytest.fail(f'RESTORE ERROR: {line}')

View File

@ -26,12 +26,13 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 3.0.3
# resources: None
substitutions_1 = [('[ \t]+', ' '), ('.*RECORD LENGTH:[ \t]+[\\d]+[ \t]*\\)', ''), ('.*COUNT[ \t]+[\\d]+', '')]
substitutions_1 = [('[ \t]+', ' '), ('.*RECORD LENGTH:[ \t]+[\\d]+[ \t]*\\)', ''),
('.*COUNT[ \t]+[\\d]+', '')]
init_script_1 = """"""
@ -163,22 +164,50 @@ db_1 = db_factory(charset='UTF8', sql_dialect=3, init=init_script_1)
# cleanup( ( isql_run, isql_log, isql_err ) )
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
STDOUT:SELECT EXPRESSION
STDOUT: -> AGGREGATE
STDOUT: -> FILTER
STDOUT: -> HASH JOIN (INNER)
STDOUT: -> TABLE "TEST" AS "B" FULL SCAN
STDOUT: -> RECORD BUFFER (RECORD LENGTH: 32793)
STDOUT: -> TABLE "TEST" AS "A" FULL SCAN
STDOUT:COUNT 17000
"""
SELECT EXPRESSION
-> AGGREGATE
-> FILTER
-> HASH JOIN (INNER)
-> TABLE "TEST" AS "B" FULL SCAN
-> RECORD BUFFER (RECORD LENGTH: 32793)
-> TABLE "TEST" AS "A" FULL SCAN
COUNT 17000
"""
MIN_RECS_TO_ADD = 17000
test_script_1 = """
set list on;
--show version;
set explain on;
select count(*) from test a join test b using(id, s);
set explain off;
quit;
select
m.MON$STAT_ID
,m.MON$STAT_GROUP
,m.MON$MEMORY_USED
,m.MON$MEMORY_ALLOCATED
,m.MON$MAX_MEMORY_USED
,m.MON$MAX_MEMORY_ALLOCATED
from mon$database d join mon$memory_usage m using (MON$STAT_ID);
"""
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
act_1.db.set_async_write()
with act_1.db.connect(charset='utf8') as con:
con.execute_immediate('create table test(id int, s varchar(8191))')
con.commit()
c = con.cursor()
c.execute(f"insert into test(id, s) select row_number()over(), lpad('', 8191, 'Алексей, Łukasz, Máté, François, Jørgen, Νικόλαος') from rdb$types,rdb$types rows {MIN_RECS_TO_ADD}")
con.commit()
#
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=[], input=test_script_1)
act_1.stdout = act_1.stdout.upper()
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -23,7 +23,11 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
import zipfile
from difflib import unified_diff
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
from firebird.driver import SrvRepairFlag
# version: 3.0.3
# resources: None
@ -32,7 +36,7 @@ substitutions_1 = []
init_script_1 = """
recreate table test(b blob sub_type 0);
"""
"""
db_1 = db_factory(page_size=4096, sql_dialect=3, init=init_script_1)
@ -199,15 +203,38 @@ db_1 = db_factory(page_size=4096, sql_dialect=3, init=init_script_1)
# cleanup( (f_fblog_before, f_fblog_after, f_diff_txt, val_log, blob_handle, fwoff_log) )
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
DIFF IN FIREBIRD.LOG: + VALIDATION FINISHED: 0 ERRORS, 0 WARNINGS, 0 FIXED
"""
+ VALIDATION FINISHED: 0 ERRORS, 0 WARNINGS, 0 FIXED
"""
blob_src_1 = temp_file('core_5618.bin')
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, blob_src_1: Path):
act_1.db.set_async_write()
zipped_blob_file = zipfile.Path(act_1.vars['files'] / 'core_5618.zip',
at='core_5618.bin')
blob_src_1.write_bytes(zipped_blob_file.read_bytes())
#
with act_1.db.connect() as con:
c = con.cursor()
with open(blob_src_1, mode='rb') as blob_handle:
c.execute('insert into test (b) values (?)', [blob_handle])
c.close()
con.execute_immediate('drop table test')
con.commit()
#
log_before = act_1.get_firebird_log()
# Run full validation (this is what 'gfix -v -full' does)
with act_1.connect_server() as srv:
srv.database.repair(database=act_1.db.db_path,
flags=SrvRepairFlag.FULL | SrvRepairFlag.VALIDATE_DB)
assert srv.readlines() == []
#
log_after = act_1.get_firebird_log()
log_diff = [line.strip().upper() for line in unified_diff(log_before, log_after)
if line.startswith('+') and 'WARNING' in line.upper()]
assert log_diff == ['+\tVALIDATION FINISHED: 0 ERRORS, 0 WARNINGS, 0 FIXED']

View File

@ -23,7 +23,8 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 3.0.3
# resources: None
@ -121,27 +122,83 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
expected_stdout_1 = """
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1_a = """
RDB$FILE_SEQUENCE 0
RDB$FILE_START 0
RDB$FILE_LENGTH 0
RDB$FILE_FLAGS 1
RDB$SHADOW_NUMBER 1
S_HASH_BEFORE 1499836372373901520
"""
expected_stdout_1_b = """
RDB$FILE_SEQUENCE 0
RDB$FILE_START 0
RDB$FILE_LENGTH 0
RDB$FILE_FLAGS 1
RDB$SHADOW_NUMBER 1
S_HASH_AFTER 1499836372373901520
"""
"""
fdb_file_1 = temp_file('core_5630.fdb')
fbk_file_1 = temp_file('core_5630.fbk')
shd_file_1 = temp_file('core_5630.shd')
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, fdb_file_1: Path, fbk_file_1: Path, shd_file_1: Path):
init_ddl = f"""
set bail on;
set list on;
create database 'localhost:{fdb_file_1}' user '{act_1.db.user}' password '{act_1.db.password}';
recreate table test(s varchar(30));
commit;
create or alter view v_shadow_info as
select
rdb$file_sequence -- 0
,rdb$file_start -- 0
,rdb$file_length -- 0
,rdb$file_flags -- 1
,rdb$shadow_number -- 1
from rdb$files
where lower(rdb$file_name) containing lower('core_5630.shd')
;
insert into test select 'line #' || lpad(row_number()over(), 3, '0' ) from rdb$types rows 200;
commit;
create shadow 1 '{shd_file_1}';
commit;
set list on;
select * from v_shadow_info;
select hash( list(s) ) as s_hash_before from test;
quit;
"""
act_1.expected_stdout = expected_stdout_1_a
act_1.isql(switches=['-q'], input=init_ddl)
assert act_1.clean_stdout == act_1.clean_expected_stdout
#
with act_1.connect_server() as srv:
srv.database.backup(database=fdb_file_1, backup=fbk_file_1)
srv.wait()
#
fdb_file_1.unlink()
shd_file_1.unlink()
#
act_1.reset()
act_1.gbak(switches=['-c', '-use_all_space', str(fbk_file_1), f'localhost:{fdb_file_1}'])
# Check that we have the same data in DB tables
sql_text = """
set list on;
select * from v_shadow_info;
select hash( list(s) ) as s_hash_after from test;
"""
act_1.reset()
act_1.expected_stdout = expected_stdout_1_b
act_1.isql(switches=['-q', f'localhost:{fdb_file_1}'], input=sql_text, connect_db=False)
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -22,7 +22,11 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
import zipfile
from difflib import unified_diff
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
from firebird.driver import SrvRestoreFlag
# version: 4.0
# resources: None
@ -207,12 +211,29 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
sec_fbk_1 = temp_file('core5637-security3.fbk')
sec_fdb_1 = temp_file('core5637-security3.fdb')
@pytest.mark.version('>=4.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, sec_fbk_1: Path, sec_fdb_1: Path):
zipped_fbk_file = zipfile.Path(act_1.vars['files'] / 'core_5637.zip',
at='core5637-security3.fbk')
sec_fbk_1.write_bytes(zipped_fbk_file.read_bytes())
#
log_before = act_1.get_firebird_log()
# Restore security database
with act_1.connect_server() as srv:
srv.database.restore(database=sec_fdb_1, backup=sec_fbk_1, flags=SrvRestoreFlag.REPLACE)
restore_log = srv.readlines()
#
log_after = act_1.get_firebird_log()
#
srv.database.validate(database=sec_fdb_1)
validation_log = srv.readlines()
#
assert [line for line in restore_log if 'ERROR' in line.upper()] == []
assert [line for line in validation_log if 'ERROR' in line.upper()] == []
assert list(unified_diff(log_before, log_after)) == []

View File

@ -17,14 +17,21 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action, Database
# version: 3.0.3
# resources: None
substitutions_1 = [('INFO_BLOB_ID.*', '')]
init_script_1 = """"""
init_script_1 = """
create table persons (
id integer not null,
name varchar(60) not null,
address varchar(60),
info blob sub_type text
);
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
@ -142,7 +149,8 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
ID 1
@ -157,11 +165,49 @@ expected_stdout_1 = """
INFO_BLOB_ID 80:1
some_blob_info
Records affected: 2
"""
"""
db_1_repl = db_factory(sql_dialect=3, init=init_script_1, filename='tmp_5645_repl.fd')
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, db_1_repl: Database):
pytest.skip("Requires UDR udrcpp_example")
ddl_for_replication = f"""
create table replicate_config (
name varchar(31) not null,
data_source varchar(255) not null
);
insert into replicate_config (name, data_source)
values ('ds1', '{db_1_repl}');
create trigger persons_replicate
after insert on persons
external name 'udrcpp_example!replicate!ds1'
engine udr;
create trigger persons_replicate2
after insert on persons
external name 'udrcpp_example!replicate_persons!ds1'
engine udr;
commit;
"""
act_1.isql(switches=['-q'], input=ddl_for_replication)
#
with act_1.db.connect() as con:
c = con.cursor()
c.execute("insert into persons values (1, 'One', 'some_address', 'some_blob_info')")
con.commit()
#
if act_1.is_version('>4.0'):
act_1.reset()
act_1.isql(switches=['-q'], input='ALTER EXTERNAL CONNECTIONS POOL CLEAR ALL;')
# Check
act_1.reset()
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=['-q', db_1_repl.dsn],
input='set list on; set count on; select id,name,address,info as info_blob_id from persons; rollback;',
connect_db=False)
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -17,7 +17,7 @@
# qmid:
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 4.0
# resources: None
@ -71,15 +71,48 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# runProgram('isql',[dsn],script)
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
RET_CODE 0
"""
"""
test_script_1 = """
set bail on;
set list on;
set term ^;
execute block returns(ret_code smallint) as
declare n int = 300;
begin
while (n > 0) do
begin
if (mod(n, 2) = 0) then
begin
in autonomous transaction do
begin
execute statement 'create or alter view vw1 (dump1) as select 1 from rdb$database';
end
end
else
begin
in autonomous transaction do
begin
execute statement 'create or alter view vw1 (dump1, dump2) as select 1, 2 from rdb$database';
end
end
n = n - 1;
end
ret_code = -abs(n);
suspend;
end ^
set term ;^
quit;
"""
@pytest.mark.version('>=4.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
act_1.db.set_async_write()
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=[], input=test_script_1)
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -35,7 +35,11 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
import subprocess
import time
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
from firebird.driver import ShutdownMode, ShutdownMethod
# version: 3.0.3
# resources: None
@ -238,18 +242,68 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
STDLOG of 2nd EDS: RESULT_MSG OK: second EDS was fast
STDLOG of DB reset: Point before DB shutdown.
STDLOG of DB reset: Point after DB shutdown.
STDLOG of DB reset: Point after DB online.
"""
RESULT_MSG OK: second EDS was fast
"""
eds_script_1 = temp_file('eds_script.sql')
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, eds_script_1: Path):
eds_sql = f"""
set bail on;
set list on;
--select current_timestamp as " " from rdb$database;
set term ^;
execute block as
declare c smallint;
declare remote_host varchar(50) = '%(remote_host)s'; -- never unreachable: 192.0.2.2
begin
rdb$set_context('USER_SESSION','DTS_BEG', cast('now' as timestamp) );
execute statement 'select 1 from rdb$database'
on external remote_host || ':' || rdb$get_context('SYSTEM', 'DB_NAME')
as user '{act_1.db.user}' password '{act_1.db.password}'
into c;
end
^
set term ;^
--select current_timestamp as " " from rdb$database;
select iif( waited_ms < max_wait_ms,
'OK: second EDS was fast',
'FAILED: second EDS waited too long, ' || waited_ms || ' ms - more than max_wait_ms='||max_wait_ms
) as result_msg
from (
select
datediff( millisecond from cast( rdb$get_context('USER_SESSION','DTS_BEG') as timestamp) to current_timestamp ) as waited_ms
,500 as max_wait_ms
-- ^
-- | #################
-- +-------------------------------- T H R E S H O L D
-- #################
from rdb$database
);
"""
#
remote_host = '192.0.2.2'
eds_script_1.write_text(eds_sql % locals())
p_unavail_host = subprocess.Popen([act_1.vars['isql'], '-n', '-i', str(eds_script_1),
'-user', act_1.db.user,
'-password', act_1.db.password, act_1.db.dsn],
stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
try:
time.sleep(2)
remote_host = 'localhost'
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=['-n'], input=eds_sql % locals())
finally:
p_unavail_host.terminate()
# Ensure that database is not busy
with act_1.connect_server() as srv:
srv.database.shutdown(database=act_1.db.db_path, mode=ShutdownMode.FULL,
method=ShutdownMethod.FORCED, timeout=0)
srv.database.bring_online(database=act_1.db.db_path)
# Check
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -20,7 +20,9 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
import zipfile
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 3.0
# resources: None
@ -129,15 +131,40 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# cleanup( (f_restore_log, tmpfdb, tmpfbk) )
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
PLAN JOIN (B INDEX (COM_PEDIDO_IDX1), A INDEX (FK_COM_PEDIDO_ITEM_PEDIDO), C INDEX (PK_EST_PRODUTO))
"""
"""
test_script_1 = """
set planonly;
select
a.id_pedido_item,
c.descricao
from com_pedido b
join com_pedido_item a on a.id_pedido = b.id_pedido
and ( not(a.id_produto =1 and a.id_pedido_item_pai is not null))
join est_produto c on c.id_produto = a.id_produto
where
-- b.dth_pedido between cast('10.12.16 05:00:00' as timestamp) and cast('10.12.16 20:00:00' as timestamp)
b.dth_pedido between ? and ? ;
"""
fbk_file_1 = temp_file('core5637-security3.fbk')
fdb_file_1 = temp_file('bad_plan_5659.fdb')
@pytest.mark.version('>=3.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, fbk_file_1: Path, fdb_file_1: Path):
zipped_fbk_file = zipfile.Path(act_1.vars['files'] / 'core_5659.zip',
at='core_5659.fbk')
fbk_file_1.write_bytes(zipped_fbk_file.read_bytes())
#
with act_1.connect_server() as srv:
srv.database.restore(backup=fbk_file_1, database=fdb_file_1)
srv.wait()
#
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=['-q', f'localhost:{fdb_file_1}'], input=test_script_1, connect_db=False)
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -35,7 +35,7 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 3.0.3
# resources: None
@ -112,7 +112,8 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# con.close()
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
Error while executing SQL statement:
@ -128,11 +129,8 @@ expected_stdout_1 = """
- Problematic key value is ("X" = 2)
-803
335544665
"""
"""
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
pytest.skip("Requires encryption plugin")

View File

@ -29,7 +29,7 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 2.5.8
# resources: None
@ -76,7 +76,8 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# con.close()
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
Try revert bytes in decimal value: 1 using struct format: "b" ; result: 01
@ -92,11 +93,8 @@ expected_stdout_1 = """
Try revert bytes in decimal value: -65536 using struct format: "H" ; result: ushort format requires 0 <= number <= USHRT_MAX
Try revert bytes in decimal value: 65535 using struct format: "H" ; result: ushort format requires 0 <= number <= USHRT_MAX
Try revert bytes in decimal value: 65536 using struct format: "H" ; result: ushort format requires 0 <= number <= USHRT_MAX
"""
"""
@pytest.mark.version('>=2.5.8')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
pytest.skip("Requires function not provided by driver")

View File

@ -1,7 +1,7 @@
#coding:utf-8
#
# id: bugs.core_5685
# title: Sometime it is impossible to cancel\\kill connection executing external query
# title: Sometime it is impossible to cancel/kill connection executing external query
# decription:
# Problem did appear when host "A" established connection to host "B" but could not get completed reply from this "B".
# This can be emulated by following steps:
@ -54,12 +54,18 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
import re
import subprocess
import time
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
from firebird.driver import ShutdownMode, ShutdownMethod
# version: 3.0.3
# resources: None
substitutions_1 = [('.*After line.*', ''), ('.*Data source.*', '.*Data source'), ('.*HANGING_STATEMENT_BLOB_ID.*', '')]
substitutions_1 = [('.*After line.*', ''), ('.*Data source.*', '.*Data source'),
('.*HANGING_STATEMENT_BLOB_ID.*', '')]
init_script_1 = """
create sequence g;
@ -90,7 +96,7 @@ init_script_1 = """
^
set term ;^
commit;
"""
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
@ -327,36 +333,117 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
HANGED ATTACH, STDOUT: Records affected: 0
HANGED ATTACH, STDERR: Statement failed, SQLSTATE = 42000
HANGED ATTACH, STDERR: Execute statement error at isc_dsql_fetch :
HANGED ATTACH, STDERR: <found pattern about closed connection>
HANGED ATTACH, STDERR: Statement : select u.unreachable_address from sp_unreachable as u
HANGED ATTACH, STDOUT: Records affected: 0
HANGED ATTACH, STDERR: Statement failed, SQLSTATE = 42000
HANGED ATTACH, STDERR: Execute statement error at isc_dsql_fetch :
HANGED ATTACH, STDERR: <found pattern about closed connection>
HANGED ATTACH, STDERR: Statement : select u.unreachable_address from sp_unreachable as u
.*Data source
HANGED ATTACH, STDERR: -At procedure 'SP_GET_DATA' line: 3, col: 9
HANGED ATTACH, STDERR: <found pattern about failed statement>
HANGED ATTACH, STDERR: <found pattern about closed connection>
HANGED ATTACH, STDERR: <found pattern about failed statement>
HANGED ATTACH, STDERR: <found pattern about closed connection>
KILLER ATTACH, STDOUT: ITERATION_NO 1
KILLER ATTACH, STDOUT: HANGING_ATTACH_CONNECTION 1
KILLER ATTACH, STDOUT: HANGING_ATTACH_PROTOCOL TCP
KILLER ATTACH, STDOUT: HANGING_STATEMENT_STATE 1
KILLER ATTACH, STDOUT: select * from sp_get_data
KILLER ATTACH, STDOUT: Records affected: 1
KILLER ATTACH, STDOUT: ITERATION_NO 2
KILLER ATTACH, STDOUT: HANGING_ATTACH_CONNECTION <null>
KILLER ATTACH, STDOUT: HANGING_ATTACH_PROTOCOL <null>
KILLER ATTACH, STDOUT: HANGING_STATEMENT_STATE <null>
KILLER ATTACH, STDOUT: Records affected: 0
"""
HANGED ATTACH, STDERR: -At procedure 'SP_GET_DATA' line: 3, col: 9
HANGED ATTACH, STDERR: <found pattern about failed statement>
HANGED ATTACH, STDERR: <found pattern about closed connection>
HANGED ATTACH, STDERR: <found pattern about failed statement>
HANGED ATTACH, STDERR: <found pattern about closed connection>
KILLER ATTACH, STDOUT: ITERATION_NO 1
KILLER ATTACH, STDOUT: HANGING_ATTACH_CONNECTION 1
KILLER ATTACH, STDOUT: HANGING_ATTACH_PROTOCOL TCP
KILLER ATTACH, STDOUT: HANGING_STATEMENT_STATE 1
KILLER ATTACH, STDOUT: select * from sp_get_data
KILLER ATTACH, STDOUT: Records affected: 1
KILLER ATTACH, STDOUT: ITERATION_NO 2
KILLER ATTACH, STDOUT: HANGING_ATTACH_CONNECTION <null>
KILLER ATTACH, STDOUT: HANGING_ATTACH_PROTOCOL <null>
KILLER ATTACH, STDOUT: HANGING_STATEMENT_STATE <null>
KILLER ATTACH, STDOUT: Records affected: 0
"""
kill_script = """
set list on;
set blob all;
select gen_id(g,1) as ITERATION_NO from rdb$database;
commit;
select
sign(a.mon$attachment_id) as hanging_attach_connection
,left(a.mon$remote_protocol,3) as hanging_attach_protocol
,s.mon$state as hanging_statement_state
,s.mon$sql_text as hanging_statement_blob_id
from rdb$database d
left join mon$attachments a on a.mon$remote_process containing 'isql'
-- do NOT use, field not existed in 2.5.x: and a.mon$system_flag is distinct from 1
and a.mon$attachment_id is distinct from current_connection
left join mon$statements s on
a.mon$attachment_id = s.mon$attachment_id
and s.mon$state = 1 -- 4.0 Classic: 'SELECT RDB$MAP_USING, RDB$MAP_PLUGIN, ... FROM RDB$AUTH_MAPPING', mon$state = 0
;
set count on;
delete from mon$attachments a
where
a.mon$attachment_id <> current_connection
and a.mon$remote_process containing 'isql'
;
commit;
"""
hang_script_1 = temp_file('hang_script.sql')
hang_stdout_1 = temp_file('hang_script.out')
hang_stderr_1 = temp_file('hang_script.err')
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, hang_script_1: Path, hang_stdout_1: Path, hang_stderr_1: Path,
capsys):
hang_script_1.write_text('set list on; set count on; select * from sp_get_data;')
pattern_for_failed_statement = re.compile('Statement failed, SQLSTATE = (08006|08003)')
pattern_for_connection_close = re.compile('(Error (reading|writing) data (from|to) the connection)|(connection shutdown)')
pattern_for_ignored_messages = re.compile('(-send_packet/send)|(-Killed by database administrator.)')
killer_output = []
#
with open(hang_stdout_1, mode='w') as hang_out, open(hang_stderr_1, mode='w') as hang_err:
p_hang_sql = subprocess.Popen([act_1.vars['isql'], '-i', str(hang_script_1),
'-user', act_1.db.user,
'-password', act_1.db.password, act_1.db.dsn],
stdout=hang_out, stderr=hang_err)
try:
time.sleep(4)
for i in range(2):
act_1.reset()
act_1.isql(switches=[], input=kill_script)
killer_output.append(act_1.stdout)
finally:
p_hang_sql.terminate()
# Ensure that database is not busy
with act_1.connect_server() as srv:
srv.database.shutdown(database=act_1.db.db_path, mode=ShutdownMode.FULL,
method=ShutdownMethod.FORCED, timeout=0)
srv.database.bring_online(database=act_1.db.db_path)
#
output = []
for line in hang_stdout_1.read_text().splitlines():
if line.strip():
output.append(f'HANGED ATTACH, STDOUT: {line}')
for line in hang_stderr_1.read_text().splitlines():
if line.strip():
if pattern_for_ignored_messages.search(line):
continue
elif pattern_for_failed_statement.search(line):
msg = '<found pattern about failed statement>'
elif pattern_for_connection_close.search(line):
msg = '<found pattern about closed connection>'
else:
msg = line
output.append(f'HANGED ATTACH, STDERR: {msg}')
for step in killer_output:
for line in step.splitlines():
if line.strip():
output.append(f"KILLER ATTACH, STDOUT: {' '.join(line.split())}")
# Check
act_1.reset()
act_1.expected_stdout = expected_stdout_1
act_1.stdout = '\n'.join(output)
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -20,7 +20,11 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
import subprocess
import time
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
from firebird.driver import ShutdownMode, ShutdownMethod
# version: 3.0.3
# resources: None
@ -211,23 +215,82 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
STDOUT in ISQL: CHECK_EDS_RESULT 1
STDOUT in ISQL: Records affected: 1
STDOUT in ISQL: Records affected: 0
STDOUT in ISQL: CHECK_EDS_RESULT 1
STDOUT in ISQL: Records affected: 1
STDOUT in ISQL: Records affected: 0
STDOUT in DB reset: Point before DB shutdown.
STDOUT in DB reset: Point after DB shutdown.
STDOUT in DB reset: Point after DB online.
"""
CHECK_EDS_RESULT 1
Records affected: 1
Records affected: 0
CHECK_EDS_RESULT 1
Records affected: 1
Records affected: 0
"""
eds_script = temp_file('eds_script.sql')
eds_output = temp_file('eds_script.out')
new_diff_file = temp_file('_new_diff_5704.tmp')
new_main_file = temp_file('new_main_5704.tmp')
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, eds_script: Path, eds_output: Path, new_diff_file: Path,
new_main_file: Path):
eds_script.write_text(f"""
set count on;
set list on;
set autoddl off;
set term ^;
create or alter procedure sp_connect returns(check_eds_result int) as
declare usr varchar(31);
declare pwd varchar(31);
declare v_sttm varchar(255) = 'select 1 from rdb$database';
begin
usr ='{act_1.db.user}';
pwd = '{act_1.db.password}';
execute statement v_sttm
on external 'localhost:' || rdb$get_context('SYSTEM','DB_NAME')
as user usr password pwd
into check_eds_result;
suspend;
end
^
set term ^;
commit;
set transaction read committed no record_version lock timeout 1;
alter database add difference file '{new_diff_file}';
select * from sp_connect;
rollback;
select * from rdb$files;
rollback;
set transaction read committed no record_version lock timeout 1;
alter database add file '{new_main_file}';
select * from sp_connect;
--select * from rdb$files;
rollback;
select * from rdb$files;
""")
#
with open(eds_output, mode='w') as eds_out:
p_eds_sql = subprocess.Popen([act_1.vars['isql'], '-i', str(eds_script),
'-user', act_1.db.user,
'-password', act_1.db.password, act_1.db.dsn],
stdout=eds_out, stderr=subprocess.STDOUT)
try:
time.sleep(4)
finally:
p_eds_sql.terminate()
# Ensure that database is not busy
with act_1.connect_server() as srv:
srv.database.shutdown(database=act_1.db.db_path, mode=ShutdownMode.FULL,
method=ShutdownMethod.FORCED, timeout=0)
srv.database.bring_online(database=act_1.db.db_path)
# Check
act_1.expected_stdout = expected_stdout_1
act_1.stdout = eds_output.read_text()
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -41,7 +41,8 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from difflib import unified_diff
from firebird.qa import db_factory, python_act, Action
# version: 3.0.3
# resources: None
@ -265,12 +266,30 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
trace_conf = """
# ::: NOTE :::
# First 'database' section here INTENTIONALLY was written WRONG!
database = (%[\\\\/](security[[:digit:]]).fdb|(security.db))
enabled = false
{
}
database =
{
enabled = true
log_connections = true
}
"""
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
log_before = act_1.get_firebird_log()
with act_1.trace(config=trace_conf, keep_log=False):
# We run here ISQL only in order to "wake up" trace session and force it to raise error in its log.
# NO message like 'Statement failed, SQLSTATE = 08004/connection rejected by remote interface' should appear now!
act_1.isql(switches=['-n', '-q'], input='quit;')
log_after = act_1.get_firebird_log()
assert list(unified_diff(log_before, log_after)) == []

View File

@ -18,7 +18,11 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
import zipfile
from difflib import unified_diff
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
from firebird.driver import SrvRestoreFlag
# version: 3.0
# resources: None
@ -181,12 +185,27 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# cleanup( f_list )
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
fbk_file_1 = temp_file('core5719-ods-11_2.fbk')
fdb_file_1 = temp_file('check_restored_5719.fdb')
@pytest.mark.version('>=3.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, fbk_file_1: Path, fdb_file_1: Path):
zipped_fbk_file = zipfile.Path(act_1.vars['files'] / 'core_5719-ods-11_2.zip',
at='core5719-ods-11_2.fbk')
fbk_file_1.write_bytes(zipped_fbk_file.read_bytes())
log_before = act_1.get_firebird_log()
#
with act_1.connect_server() as srv:
srv.database.restore(backup=fbk_file_1, database=fdb_file_1,
flags=SrvRestoreFlag.REPLACE, verbose=True)
restore_err = [line for line in srv if 'ERROR' in line.upper()]
log_after = act_1.get_firebird_log()
srv.database.validate(database=fdb_file_1)
validate_err = [line for line in srv if 'ERROR' in line.upper()]
#
assert restore_err == []
assert validate_err == []
assert list(unified_diff(log_before, log_after)) == []

View File

@ -23,7 +23,10 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
import subprocess
import time
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 3.0.3
# resources: None
@ -130,16 +133,37 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# cleanup( (f_show_command_sql, f_show_command_log, f_show_command_err) )
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
STDOUT: TEST1
STDOUT: TEST1_ID_PK_DESC UNIQUE DESCENDING INDEX ON TEST1(ID)
"""
TEST1
TEST1_ID_PK_DESC UNIQUE DESCENDING INDEX ON TEST1(ID)
"""
show_script_1 = temp_file('show_script.sql')
show_output_1 = temp_file('show_script.out')
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, show_script_1: Path, show_output_1: Path):
show_script_1.write_text('show table;show index;')
with act_1.db.connect() as con:
con.execute_immediate('recreate table test1(id int primary key using descending index test1_id_pk_desc)')
con.commit()
c = con.cursor()
c.execute('recreate table test2(id int primary key using descending index test2_id_pk_desc)')
# WARNING: we launch ISQL here in async mode in order to have ability to kill its
# process if it will hang!
with open(show_output_1, mode='w') as show_out:
p_show_sql = subprocess.Popen([act_1.vars['isql'], '-i', str(show_script_1),
'-user', act_1.db.user,
'-password', act_1.db.password, act_1.db.dsn],
stdout=show_out, stderr=subprocess.STDOUT)
try:
time.sleep(4)
finally:
p_show_sql.terminate()
#
act_1.expected_stdout = expected_stdout_1
act_1.stdout = show_output_1.read_text()
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -11,12 +11,16 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from difflib import unified_diff
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 4.0
# resources: None
substitutions_1 = []
substitutions_1 = [('database .*tmp_core_5771.fdb already exists.', 'database tmp_core_5771.fdb already exists.'),
('opened file .*tmp_core_5771.fbk',
'opened file tmp_core_5771.fbk')]
init_script_1 = """"""
@ -151,17 +155,35 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# cleanup( (f_restore_log,f_restore_err,f_fblog_before,f_fblog_after,f_diff_txt,tmpfbk,tmpfdb) )
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
RESTORE STDOUT:GBAK:OPENED FILE TMP_CORE_5771.FBK
RESTORE STDERR: DATABASE TMP_5771_RESTORED.FDB ALREADY EXISTS. TO REPLACE IT, USE THE -REP SWITCH
RESTORE STDERR: -EXITING BEFORE COMPLETION DUE TO ERRORS
"""
gbak:opened file tmp_core_5771.fbk
"""
expected_stderr_1 = """
database tmp_core_5771.fdb already exists. To replace it, use the -REP switch
-Exiting before completion due to errors
"""
fbk_file = temp_file('tmp_core_5771.fbk')
fdb_file = temp_file('tmp_core_5771.fdb')
@pytest.mark.version('>=4.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, fbk_file: Path, fdb_file: Path):
act_1.gbak(switches=['-b', act_1.db.dsn, str(fbk_file)])
act_1.gbak(switches=['-rep', str(fbk_file), f'localhost:{fdb_file}'])
#
log_before = act_1.get_firebird_log()
#
act_1.reset()
act_1.expected_stdout = expected_stdout_1
act_1.expected_stderr = expected_stderr_1
act_1.svcmgr(switches=['action_restore', 'bkp_file', str(fbk_file),
'dbname', str(fdb_file), 'verbose'])
#
log_after = act_1.get_firebird_log()
assert list(unified_diff(log_before, log_after)) == []
assert act_1.clean_stderr == act_1.clean_expected_stderr
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -40,7 +40,7 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 3.0.4
# resources: None
@ -75,7 +75,8 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
Query line: select
@ -85,11 +86,19 @@ expected_stdout_1 = """
Query line: -- comment N2
Query line: rdb$database
Query result: foo
"""
"""
@pytest.mark.version('>=3.0.4')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, capsys):
with act_1.db.connect() as con:
c = con.cursor()
sql_expr = "select \r -- comment N1 \r 'foo' as msg \r from \r -- comment N2 \r rdb$database"
for line in sql_expr.split('\r'):
print(f'Query line: {line}')
c.execute(sql_expr)
for row in c:
print(f'Query result: {row[0]}')
#
act_1.expected_stdout = expected_stdout_1
act_1.stdout = capsys.readouterr().out
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -18,7 +18,8 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, user_factory, User, temp_file
# version: 3.0.4
# resources: None
@ -153,7 +154,8 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# cleanup( f_list )
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
RDB$USER TMP$C5790
@ -166,11 +168,40 @@ expected_stdout_1 = """
rdb_object_type_is_expected ? 1
Records affected: 1
Records affected: 0
"""
"""
test_user = user_factory(name='tmp$c5790', password='123')
fdb_file = temp_file('tmp_5790.fdb')
@pytest.mark.version('>=3.0.4')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, test_user: User, fdb_file: Path):
test_script = f"""
create database 'localhost:{fdb_file}';
alter database set linger to 0;
commit;
grant drop database to {test_user.name};
commit;
connect 'localhost:{fdb_file}' user {test_user.name} password '{test_user.password}';
set list on;
set count on;
select
r.rdb$user -- {test_user.name}
,r.rdb$grantor -- sysdba
,r.rdb$privilege -- o
,r.rdb$grant_option -- 0
,r.rdb$relation_name -- sql$database
,r.rdb$field_name -- <null>
,r.rdb$user_type -- 8
,iif( r.rdb$object_type = decode( left(rdb$get_context('SYSTEM', 'ENGINE_VERSION'),1), '3',20, '4',21), 1, 0) "rdb_object_type_is_expected ?"
from rdb$user_privileges r
where r.rdb$user=upper('{test_user.name}');
-- this should NOT show any attachments: "Records affected: 0" must be shown here.
select * from mon$attachments where mon$attachment_id != current_connection;
commit;
drop database;
rollback;
"""
act_1.isql(switches=['-q'], input=test_script)
assert not fdb_file.exists()

View File

@ -51,7 +51,7 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 3.0.4
# resources: None
@ -151,7 +151,8 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# db_conn.close()
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
1.1. Trying to encrypt with existing key.
@ -167,11 +168,10 @@ expected_stdout_1 = """
<FOUND PATTERN-2 ABOUT MISSED ENCRYPTION KEY>
-607
335544351
"""
"""
@pytest.mark.version('>=3.0.4')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
pytest.skip("Requires encryption plugin")

View File

@ -35,12 +35,14 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 3.0.4
# resources: None
substitutions_1 = [('ATTRIBUTES .* ENCRYPTED, PLUGIN .*', 'ATTRIBUTES ENCRYPTED'), ('CRYPT CHECKSUM.*', 'CRYPT CHECKSUM'), ('KEY HASH.*', 'KEY HASH'), ('ENCRYPTION KEY NAME.*', 'ENCRYPTION KEY')]
substitutions_1 = [('ATTRIBUTES .* ENCRYPTED, PLUGIN .*', 'ATTRIBUTES ENCRYPTED'),
('CRYPT CHECKSUM.*', 'CRYPT CHECKSUM'), ('KEY HASH.*', 'KEY HASH'),
('ENCRYPTION KEY NAME.*', 'ENCRYPTION KEY')]
init_script_1 = """"""
@ -176,7 +178,8 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
ATTRIBUTES FORCE WRITE, ENCRYPTED, PLUGIN DBCRYPT
@ -186,8 +189,7 @@ expected_stdout_1 = """
"""
@pytest.mark.version('>=3.0.4')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
pytest.skip("Requires encryption plugin")

View File

@ -34,7 +34,8 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 3.0.6
# resources: None
@ -141,18 +142,39 @@ db_1 = db_factory(charset='WIN1251', sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
expected_stdout_1 = """
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stderr_1 = """
Statement failed, SQLSTATE = 42000
Dynamic SQL Error
-SQL error code = -104
-Name longer than database column size
"""
"""
test_script = temp_file('test_script.sql')
@pytest.mark.version('>=3.0.6')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, test_script: Path):
if act_1.is_version('<4'):
# Maximal number of characters in the column for FB 3.x is 31.
# Here we use name of 32 characters and this must raise error
# with text "Name longer than database column size":
column_title = 'СъешьЖеЕщёЭтихМягкихФранкоБулок'
else:
# Maximal number of characters in the column for FB 4.x is 63.
# Here we use name of 64 characters and this must raise error
# with text "Name longer than database column size":
column_title = 'СъешьЖеЕщёЭтихПрекрасныхФранкоБулокВместоДурацкихМорковныхКотлет'
# Code to be executed further in separate ISQL process:
test_script.write_text(f"""
set list on;
set sqlda_display on;
-- Maximal number of characters in the column for FB 3.x is 31.
-- Here we use name of 32 characters and this must raise error
-- with text "Name longer than database column size":
select 1 as "{column_title}" from rdb$database;
""", encoding='cp1251')
#
act_1.expected_stderr = expected_stderr_1
act_1.isql(switches=['-q'], input_file=test_script, charset='WIN1251')
assert act_1.clean_stderr == act_1.clean_expected_stderr

View File

@ -34,7 +34,7 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 4.0
# resources: None
@ -233,16 +233,14 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
+ VALIDATION STARTED
+ VALIDATION FINISHED: 0 ERRORS, 0 WARNINGS, 0 FIXED
"""
"""
@pytest.mark.version('>=4.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
pytest.skip("Requires encryption plugin")

View File

@ -41,12 +41,13 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 3.0.4
# resources: None
substitutions_1 = [('ATTRIBUTES FORCE WRITE, ENCRYPTED, PLUGIN.*', 'ATTRIBUTES FORCE WRITE, ENCRYPTED'), ('CRYPT CHECKSUM.*', 'CRYPT CHECKSUM'), ('KEY HASH.*', 'KEY HASH')]
substitutions_1 = [('ATTRIBUTES FORCE WRITE, ENCRYPTED, PLUGIN.*', 'ATTRIBUTES FORCE WRITE, ENCRYPTED'),
('CRYPT CHECKSUM.*', 'CRYPT CHECKSUM'), ('KEY HASH.*', 'KEY HASH')]
init_script_1 = """"""
@ -176,18 +177,18 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
ATTRIBUTES FORCE WRITE, ENCRYPTED, PLUGIN DBCRYPT
CRYPT CHECKSUM: MUB2NTJQCHH9RSHMP6XFAIIC2II=
KEY HASH: ASK88TFWBINVC6B1JVS9MFUH47C=
ENCRYPTION KEY NAME: RED
"""
"""
@pytest.mark.version('>=3.0.4')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
pytest.skip("Requires encryption plugin")

View File

@ -1,7 +1,7 @@
#coding:utf-8
#
# id: bugs.core_5833
# title: Server crashes on preparing empty query when trace is enabled
# title: DDL triggers for some object types (views, exceptions, roles, indexes, domains) are lost in backup-restore process
# decription:
# We create DDL triggers for all cases that are enumerated in $FB_HOME/doc/sql.extensions/README.ddl_triggers.txt.
# Then query to RDB$TRIGGERS table is applied to database and its results are stored in <log_file_1>.
@ -21,7 +21,9 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from difflib import unified_diff
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 3.0
# resources: None
@ -218,12 +220,64 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# cleanup( (f_ddl_sql, f_chk_sql, f_xmeta1_log, f_xmeta1_err, f_xmeta2_log, f_xmeta2_err, f_diff_txt, tmp_bkup, tmp_rest) )
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
ddl_list = ['CREATE TABLE', 'ALTER TABLE', 'DROP TABLE',
'CREATE PROCEDURE', 'ALTER PROCEDURE', 'DROP PROCEDURE',
'CREATE FUNCTION', 'ALTER FUNCTION', 'DROP FUNCTION',
'CREATE TRIGGER', 'ALTER TRIGGER', 'DROP TRIGGER',
'CREATE EXCEPTION', 'ALTER EXCEPTION', 'DROP EXCEPTION',
'CREATE VIEW', 'ALTER VIEW', 'DROP VIEW',
'CREATE DOMAIN', 'ALTER DOMAIN', 'DROP DOMAIN',
'CREATE ROLE', 'ALTER ROLE', 'DROP ROLE',
'CREATE SEQUENCE', 'ALTER SEQUENCE', 'DROP SEQUENCE',
'CREATE USER', 'ALTER USER', 'DROP USER',
'CREATE INDEX', 'ALTER INDEX', 'DROP INDEX',
'CREATE COLLATION', 'DROP COLLATION', 'ALTER CHARACTER SET',
'CREATE PACKAGE', 'ALTER PACKAGE', 'DROP PACKAGE',
'CREATE PACKAGE BODY', 'DROP PACKAGE BODY']
test_script_1 = """
set blob all;
set list on;
set count on;
select rdb$trigger_name, rdb$trigger_type, rdb$trigger_source as blob_id_for_trg_source, rdb$trigger_blr as blob_id_for_trg_blr
from rdb$triggers
where rdb$system_flag is distinct from 1
order by 1;
"""
fbk_file = temp_file('tmp_5833.fbk')
fdb_file = temp_file('tmp_5833.fdb')
@pytest.mark.version('>=3.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, fbk_file: Path, fdb_file: Path):
script = ['set bail on;', 'set term ^;']
# Initial DDL: create all triggers
for item in ddl_list:
for evt_time in ['before', 'after']:
script.append(f"recreate trigger trg_{evt_time}_{item.replace(' ', '_').lower()} active {evt_time} {item.lower()} as")
script.append(" declare c rdb$field_name;")
script.append("begin")
script.append(" c = rdb$get_context('DDL_TRIGGER', 'OBJECT_NAME');")
script.append("end ^")
script.append("")
script.append("set term ;^")
script.append("commit;")
act_1.isql(switches=[], input='\n'.join(script))
# Query RDB$TRIGGERS before b/r:
act_1.reset()
act_1.isql(switches=[], input=test_script_1)
meta_before = [line for line in act_1.stdout.splitlines() if not line.startswith('BLOB_ID_FOR_TRG')]
# B/S
act_1.reset()
act_1.gbak(switches=['-b', act_1.db.dsn, str(fbk_file)])
act_1.reset()
act_1.gbak(switches=['-c', str(fbk_file), f'localhost:{fdb_file}'])
# Query RDB$TRIGGERS after b/r:
act_1.reset()
act_1.isql(switches=[f'localhost:{fdb_file}'], input=test_script_1, connect_db=False)
meta_after = [line for line in act_1.stdout.splitlines() if not line.startswith('BLOB_ID_FOR_TRG')]
# Check
assert list(unified_diff(meta_before, meta_after)) == []

View File

@ -14,7 +14,7 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
# version: 3.0.3
# resources: None
@ -24,7 +24,7 @@ substitutions_1 = []
init_script_1 = """
recreate global temporary table gtt(id int) on commit preserve rows;
commit;
"""
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
@ -76,17 +76,35 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
expected_stdout_1 = """
con1.rollback: point before.
con1.rollback: point after.
sample-2 finished OK.
"""
act_1 = python_act('db_1', substitutions=substitutions_1)
#expected_stdout_1 = """
#con1.rollback: point before.
#con1.rollback: point after.
#sample-2 finished OK.
#"""
@pytest.mark.version('>=3.0.3')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
with act_1.db.connect() as con1, act_1.db.connect() as con2:
c2 = con2.cursor()
# Following 'select count' is MANDATORY for reproduce:
c2.execute('select count(*) from gtt').fetchall()
#
c1 = con1.cursor()
c1.execute('insert into gtt(id) values(?)', [1])
c1.execute('insert into gtt(id) values(?)', [1])
#
c2.execute('insert into gtt(id) values(?)', [2])
#
con1.rollback()
#
c2.execute('insert into gtt(id) select 2 from rdb$types rows 200', [2])
con2.commit()
#
c1.execute('insert into gtt(id) values(?)', [11])
c1.execute('insert into gtt(id) values(?)', [11])
#
con1.rollback()
# This test does not need to assert anything, it passes if we get here without error

View File

@ -15,12 +15,14 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action
from firebird.driver import DatabaseError
# version: 2.5.9
# resources: None
substitutions_1 = [('Problematic key value is .*', 'Problematic key value is')]
#substitutions_1 = [('Problematic key value is .*', 'Problematic key value is')]
substitutions_1 = []
init_script_1 = """
recreate table test(
@ -30,7 +32,7 @@ init_script_1 = """
commit;
insert into test values( gen_uuid() );
commit;
"""
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
@ -63,27 +65,31 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
# con2.close()
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
expected_stdout_1 = """
1 0 : Error while executing SQL statement:
- SQLCODE: -803
- violation of PRIMARY or UNIQUE KEY constraint "TEST_UID_PK" on table "TEST"
- Problematic key value is ("UID" = x'AA70F788EB634073AD328C284F775A3E')
1 1 : -803
1 2 : 335544665
act_1 = python_act('db_1', substitutions=substitutions_1)
2 0 : Error while executing SQL statement:
- SQLCODE: -803
- violation of PRIMARY or UNIQUE KEY constraint "TEST_UID_PK" on table "TEST"
- Problematic key value is ("UID" = x'AA70F788EB634073AD328C284F775A3E')
2 1 : -803
2 2 : 335544665
"""
#expected_stdout_1 = """
#1 0 : Error while executing SQL statement:
#- SQLCODE: -803
#- violation of PRIMARY or UNIQUE KEY constraint "TEST_UID_PK" on table "TEST"
#- Problematic key value is ("UID" = x'AA70F788EB634073AD328C284F775A3E')
#1 1 : -803
#1 2 : 335544665
#2 0 : Error while executing SQL statement:
#- SQLCODE: -803
#- violation of PRIMARY or UNIQUE KEY constraint "TEST_UID_PK" on table "TEST"
#- Problematic key value is ("UID" = x'AA70F788EB634073AD328C284F775A3E')
#2 1 : -803
#2 2 : 335544665
#"""
@pytest.mark.version('>=2.5.9')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action):
with act_1.db.connect(charset='utf8') as con1, act_1.db.connect() as con2:
c1 = con1.cursor()
c2 = con2.cursor()
cmd = 'insert into test(uid) select uid from test rows 1'
for c in [c1, c2]:
with pytest.raises(DatabaseError, match='.*Problematic key value is.*'):
c.execute(cmd)

View File

@ -20,7 +20,8 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from pathlib import Path
from firebird.qa import db_factory, python_act, Action, temp_file
# version: 3.0
# resources: None
@ -32,7 +33,7 @@ init_script_1 = """
commit;
comment on sequence "new sequence" is 'foo rio bar';
commit;
"""
"""
db_1 = db_factory(sql_dialect=3, init=init_script_1)
@ -163,18 +164,39 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
SEQ_NAME new sequence
SEQ_INIT 123
SEQ_INCR -456
foo rio bar
"""
"""
test_script_1 = """
set list on;
set blob all;
set list on;
select
rdb$generator_name as seq_name,
rdb$initial_value as seq_init,
rdb$generator_increment as seq_incr,
rdb$description as blob_id
from rdb$generators
where rdb$system_flag is distinct from 1;
"""
fbk_file = temp_file('tmp_core_5855.fbk')
fdb_file = temp_file('tmp_core_5855.fdb')
@pytest.mark.version('>=3.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, fbk_file: Path, fdb_file: Path):
with act_1.connect_server() as srv:
srv.database.backup(database=act_1.db.db_path, backup=fbk_file)
srv.wait()
srv.database.restore(backup=fbk_file, database=fdb_file)
srv.wait()
act_1.expected_stdout = expected_stdout_1
act_1.isql(switches=[f'localhost:{fdb_file}'], input=test_script_1, connect_db=False)
assert act_1.clean_stdout == act_1.clean_expected_stdout

View File

@ -19,7 +19,7 @@
# qmid: None
import pytest
from firebird.qa import db_factory, isql_act, Action
from firebird.qa import db_factory, python_act, Action, user_factory, User
# version: 4.0
# resources: None
@ -126,25 +126,83 @@ db_1 = db_factory(sql_dialect=3, init=init_script_1)
#
#
#---
#act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1)
act_1 = python_act('db_1', substitutions=substitutions_1)
expected_stdout_1 = """
definer_-_who_am_i : TMP$C5892
definer_-_who_else_here : SYSDBA
definer_-_effective_user : SYSDBA
definer_-_who_am_i TMP$C5892
definer_-_who_else_here SYSDBA
definer_-_effective_user SYSDBA
definer_-_who_am_i : TMP$C5892
definer_-_who_else_here : TMP$C5892
definer_-_effective_user : SYSDBA
definer_-_who_am_i TMP$C5892
definer_-_who_else_here TMP$C5892
definer_-_effective_user SYSDBA
invoker_-_who_am_i : TMP$C5892
invoker_-_who_else_here : TMP$C5892
invoker_-_effective_user : TMP$C5892
"""
invoker_-_who_am_i TMP$C5892
invoker_-_who_else_here TMP$C5892
invoker_-_effective_user TMP$C5892
"""
sp_definer_ddl = """
create or alter procedure sp_test_definer returns( another_name varchar(31), another_conn_id int, execution_context varchar(31) ) SQL SECURITY DEFINER
as
begin
execution_context = rdb$get_context('SYSTEM', 'EFFECTIVE_USER');
for
select mon$user, mon$attachment_id
from mon$attachments a
where a.mon$system_flag is distinct from 1 and a.mon$attachment_id != current_connection
into
another_name,
another_conn_id
do suspend;
end
"""
sp_invoker_ddl = """
create or alter procedure sp_test_invoker returns( another_name varchar(31), another_conn_id int, execution_context varchar(31) ) SQL SECURITY INVOKER
as
begin
execution_context = rdb$get_context('SYSTEM', 'EFFECTIVE_USER');
for
select mon$user, mon$attachment_id
from mon$attachments a
where
a.mon$system_flag is distinct from 1
and a.mon$attachment_id != current_connection
and a.mon$user = current_user
into
another_name,
another_conn_id
do suspend;
end
"""
test_user = user_factory(name='TMP$C5892', password='123')
@pytest.mark.version('>=4.0')
@pytest.mark.xfail
def test_1(db_1):
pytest.fail("Test not IMPLEMENTED")
def test_1(act_1: Action, test_user: User, capsys):
sql_chk_definer = 'select current_user as "definer_-_who_am_i", d.another_name as "definer_-_who_else_here", d.execution_context as "definer_-_effective_user" from rdb$database r left join sp_test_definer d on 1=1'
sql_chk_invoker = 'select current_user as "invoker_-_who_am_i", d.another_name as "invoker_-_who_else_here", d.execution_context as "invoker_-_effective_user" from rdb$database r left join sp_test_invoker d on 1=1'
with act_1.db.connect() as con1, \
act_1.db.connect(user=test_user.name, password=test_user.password) as con2, \
act_1.db.connect(user=test_user.name, password=test_user.password) as con3:
#
con1.execute_immediate(sp_definer_ddl)
con1.execute_immediate(sp_invoker_ddl)
con1.commit()
con1.execute_immediate('grant execute on procedure sp_test_definer to public')
con1.execute_immediate('grant execute on procedure sp_test_invoker to public')
con1.commit()
#
with con2.cursor() as c2:
c2.execute(sql_chk_definer)
act_1.print_data_list(c2)
#
with con2.cursor() as c2:
c2.execute(sql_chk_invoker)
act_1.print_data_list(c2)
# Check
act_1.expected_stdout = expected_stdout_1
act_1.stdout = capsys.readouterr().out
assert act_1.clean_stdout == act_1.clean_expected_stdout