mirror of
https://github.com/FirebirdSQL/firebird-qa.git
synced 2025-01-22 13:33:07 +01:00
Added/Updated bugs\core_2493_test.py: Checked on: 3.0.8.33535, 4.0.1.2692, 5.0.0.497
This commit is contained in:
parent
2020660ba4
commit
01b7317d18
@ -33,7 +33,7 @@ DESCRIPTION:
|
||||
NB: we current OS user using call of getpass.getuser(). It must be compared in case insensitive manner;
|
||||
* token N-2 is just word "user" (as is);
|
||||
* token N-3 is port number, it has to be positive value;
|
||||
* token N-4 is IP. It must be equal to rdb$get_context('SYSTEM','CLIENT_ADDRESS').
|
||||
* token N-4 is IP. It must be equal to rdb$get_context('SYSTEM','CLIENT_ADDRESS'). / fe80::a408:4a80:5496:1548%8/49569
|
||||
|
||||
This is how differences look in firebird.log:
|
||||
# 2.5.9:
|
||||
@ -48,244 +48,176 @@ DESCRIPTION:
|
||||
# INET/inet_error: read errno = 10054, client host = csprog, address = fe80::fcf1:e33c:e924:969d%16/56887, user = zotov
|
||||
# INET/inet_error: read errno = 10054, client host = csprog, address = fe80::fcf1:e33c:e924:969d%16/56883, user = zotov
|
||||
NOTES:
|
||||
[20.02.2021]
|
||||
[20.02.2021] pzotov
|
||||
changed 'platform' attribute to Windows only. Content of firebird.log has no changes on Linux during this test run.
|
||||
Perhaps, this is temporary and another solution will be found/implemented. Sent letter to dimitr et al, 21.02.2021 08:20.
|
||||
[27.05.2022] pzotov
|
||||
Re-implemented for work in firebird-qa suite.
|
||||
Checked on: 3.0.8.33535, 4.0.1.2692, 5.0.0.497
|
||||
|
||||
JIRA: CORE-2493
|
||||
FBTEST: bugs.core_2493
|
||||
"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import pytest
|
||||
import locale
|
||||
import time
|
||||
import getpass
|
||||
from pathlib import Path
|
||||
from firebird.qa import *
|
||||
from firebird.driver import ShutdownMode,ShutdownMethod
|
||||
from firebird.driver.types import DatabaseError
|
||||
from difflib import unified_diff
|
||||
|
||||
#--------------------------------------------
|
||||
|
||||
# http://stackoverflow.com/questions/319279/how-to-validate-ip-address-in-python
|
||||
def is_valid_ipv4(address):
|
||||
import socket
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET, address)
|
||||
except AttributeError: # no inet_pton here, sorry
|
||||
try:
|
||||
socket.inet_aton(address)
|
||||
except socket.error:
|
||||
return False
|
||||
return address.count('.') == 3
|
||||
except socket.error: # not a valid address
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
#--------------------------------------------
|
||||
|
||||
def is_valid_ipv6(address):
|
||||
import socket
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET6, address)
|
||||
except socket.error: # not a valid address
|
||||
return False
|
||||
return True
|
||||
|
||||
#--------------------------------------------
|
||||
|
||||
init_script = """
|
||||
recreate table log(ip varchar(255));
|
||||
create sequence g;
|
||||
create table tlog(ip varchar(255));
|
||||
create table tdelay(id int primary key);
|
||||
set term ^;
|
||||
create procedure sp_delay as
|
||||
begin
|
||||
insert into tdelay(id) values(1);
|
||||
in autonomous transaction do
|
||||
begin
|
||||
execute statement ('insert into tdelay(id) values(?)') (1);
|
||||
when any do
|
||||
begin
|
||||
-- nop --
|
||||
end
|
||||
end
|
||||
delete from tdelay where id = 1;
|
||||
end
|
||||
^
|
||||
set term ;^
|
||||
commit;
|
||||
"""
|
||||
|
||||
db = db_factory(init=init_script)
|
||||
|
||||
db = db_factory(init = init_script)
|
||||
tmp_user = user_factory('db', name='tmp_user_2493', password='123')
|
||||
act = python_act('db')
|
||||
|
||||
expected_stdout = """
|
||||
String IP/port: valid, equal to 'CLIENT_ADDRESS'
|
||||
Port value: valid, positive integer.
|
||||
OS user: valid, passed getpass.getuser()
|
||||
tmp_paused_sql = temp_file('tmp_paused_2493.sql')
|
||||
tmp_paused_log = temp_file('tmp_paused_2493.log')
|
||||
|
||||
expected_stdout_log_diff = """
|
||||
Check IP using methods from 'socket' package: PASSED.
|
||||
Check IP for equality to 'CLIENT_ADDRESS' value: PASSED.
|
||||
Check port value: PASSED, positive integer.
|
||||
Check OS user using 'getpass' package: PASSED
|
||||
"""
|
||||
|
||||
@pytest.mark.skip('FIXME: Not IMPLEMENTED')
|
||||
@pytest.mark.version('>=3')
|
||||
@pytest.mark.platform('Windows')
|
||||
def test_1(act: Action):
|
||||
pytest.fail("Not IMPLEMENTED")
|
||||
def test_1(act: Action, tmp_paused_sql: Path, tmp_paused_log: Path, capsys):
|
||||
|
||||
# test_script_1
|
||||
#---
|
||||
# import os
|
||||
# import time
|
||||
# import subprocess
|
||||
# from subprocess import Popen
|
||||
# import signal
|
||||
# import difflib
|
||||
# import re
|
||||
# import socket
|
||||
# import getpass
|
||||
#
|
||||
# os.environ["ISC_USER"] = user_name
|
||||
# os.environ["ISC_PASSWORD"] = user_password
|
||||
#
|
||||
# engine = str(db_conn.engine_version)
|
||||
# db_conn.close()
|
||||
#
|
||||
# #-----------------------------------
|
||||
#
|
||||
# def flush_and_close(file_handle):
|
||||
# # https://docs.python.org/2/library/os.html#os.fsync
|
||||
# # If you're starting with a Python file object f,
|
||||
# # first do f.flush(), and
|
||||
# # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk.
|
||||
# global os
|
||||
#
|
||||
# file_handle.flush()
|
||||
# os.fsync(file_handle.fileno())
|
||||
#
|
||||
# file_handle.close()
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# def cleanup( f_names_list ):
|
||||
# global os
|
||||
# for i in range(len( f_names_list )):
|
||||
# if os.path.isfile( f_names_list[i]):
|
||||
# os.remove( f_names_list[i] )
|
||||
# if os.path.isfile( f_names_list[i]):
|
||||
# print('ERROR: can not remove file ' + f_names_list[i])
|
||||
#
|
||||
# #-------------------------------------------
|
||||
#
|
||||
#
|
||||
# def svc_get_fb_log( engine, f_fb_log ):
|
||||
#
|
||||
# import subprocess
|
||||
#
|
||||
# if engine.startswith('2.5'):
|
||||
# get_firebird_log_key='action_get_ib_log'
|
||||
# else:
|
||||
# get_firebird_log_key='action_get_fb_log'
|
||||
#
|
||||
# subprocess.call([ context['fbsvcmgr_path'],
|
||||
# "localhost:service_mgr",
|
||||
# get_firebird_log_key
|
||||
# ],
|
||||
# stdout=f_fb_log,
|
||||
# stderr=subprocess.STDOUT
|
||||
# )
|
||||
#
|
||||
# return
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# # http://stackoverflow.com/questions/319279/how-to-validate-ip-address-in-python
|
||||
# def is_valid_ipv4(address):
|
||||
# import socket
|
||||
# try:
|
||||
# socket.inet_pton(socket.AF_INET, address)
|
||||
# except AttributeError: # no inet_pton here, sorry
|
||||
# try:
|
||||
# socket.inet_aton(address)
|
||||
# except socket.error:
|
||||
# return False
|
||||
# return address.count('.') == 3
|
||||
# except socket.error: # not a valid address
|
||||
# return False
|
||||
#
|
||||
# return True
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# def is_valid_ipv6(address):
|
||||
# import socket
|
||||
# try:
|
||||
# socket.inet_pton(socket.AF_INET6, address)
|
||||
# except socket.error: # not a valid address
|
||||
# return False
|
||||
# return True
|
||||
#
|
||||
# #--------------------------------------------
|
||||
#
|
||||
# f_fblog_before=open(os.path.join(context['temp_directory'],'tmp_2493_fblog_before.txt'), 'w')
|
||||
#
|
||||
# svc_get_fb_log( engine, f_fblog_before )
|
||||
#
|
||||
# f_fblog_before.close()
|
||||
#
|
||||
# isql_txt=''' insert into log(ip) values( rdb$get_context('SYSTEM','CLIENT_ADDRESS') );
|
||||
# commit;
|
||||
# select count(i) from (select gen_id(g,1) i from rdb$types a,rdb$types b,rdb$types c,rdb$types d);
|
||||
# '''
|
||||
#
|
||||
# f_sql_txt=open( os.path.join(context['temp_directory'],'tmp_2493_isql.sql'), 'w')
|
||||
# f_sql_txt.write(isql_txt)
|
||||
# flush_and_close( f_sql_txt )
|
||||
#
|
||||
# f_sql_log=open(os.path.join(context['temp_directory'],'tmp_2493_isql.log'), 'w' )
|
||||
# f_sql_err=open(os.path.join(context['temp_directory'],'tmp_2493_isql.err'), 'w' )
|
||||
#
|
||||
# p_isql=Popen( [ context['isql_path'], dsn, "-i", f_sql_txt.name ], stdout=f_sql_log, stderr=f_sql_err
|
||||
# )
|
||||
# time.sleep(3)
|
||||
#
|
||||
# p_isql.terminate()
|
||||
#
|
||||
# flush_and_close( f_sql_log )
|
||||
# flush_and_close( f_sql_err )
|
||||
#
|
||||
# f_sql_txt=open(os.path.join(context['temp_directory'],'tmp_2493_isql.sql'), 'w')
|
||||
# f_sql_txt.write("set heading off; select iif(gen_id(g,0) = 0, 'Trouble with subprocess: job was not started.', ip) as msg from log; quit;")
|
||||
# flush_and_close( f_sql_txt )
|
||||
#
|
||||
# mon_ip=subprocess.check_output( [ context['isql_path'], dsn, '-i', f_sql_txt.name ]).split()[0]
|
||||
#
|
||||
# f_fblog_after=open(os.path.join(context['temp_directory'],'tmp_2493_fblog_after.txt'), 'w')
|
||||
#
|
||||
# svc_get_fb_log( engine, f_fblog_after )
|
||||
#
|
||||
# flush_and_close( f_fblog_after )
|
||||
#
|
||||
# oldfb=open(f_fblog_before.name, 'r')
|
||||
# newfb=open(f_fblog_after.name, 'r')
|
||||
#
|
||||
# difftext = ''.join(difflib.unified_diff(
|
||||
# oldfb.readlines(),
|
||||
# newfb.readlines()
|
||||
# ))
|
||||
# oldfb.close()
|
||||
# newfb.close()
|
||||
#
|
||||
# f_diff_txt=open( os.path.join(context['temp_directory'],'tmp_2493_diff.txt'), 'w')
|
||||
# f_diff_txt.write(difftext)
|
||||
# flush_and_close( f_diff_txt )
|
||||
#
|
||||
# inet_msg_words = []
|
||||
# logged_err=0
|
||||
# with open( f_diff_txt.name,'r') as f:
|
||||
# for line in f:
|
||||
# if line.startswith('+') and 'INET/INET_ERROR' in line.upper():
|
||||
# # DO NOT include ':' to the list of delimiters! It is involved in IPv6 address:
|
||||
# inet_msg_words = line.replace(',',' ').replace('/',' ').replace('=',' ').split()
|
||||
# break
|
||||
#
|
||||
# # Tokens, numerated from zero (NB: leftmost is "PLUS" sign and has index = 0)
|
||||
# # ---------------------------------------------------------------------------
|
||||
# # + INET inet_error read errno 10054 client host prog1 address 127.0.0.1 4417 user john ------- for IPv4
|
||||
# # 0 1 2 3 4 5 6 7 8 9 10 11 12 13
|
||||
# # + INET inet_error read errno 10054 client host prog2 address x::y:z:u:v 56831 user mick ------- for IPv6
|
||||
#
|
||||
# # + INET/inet_error: read errno = 10054, client host = csprog, address = fe80::fcf1:e33c:e924:969d%16/56883, user = zotov
|
||||
# # 0 1 2 3 4 5 6 7 8 9 10 11 12 --> len() = 13
|
||||
#
|
||||
# n = len(inet_msg_words)
|
||||
#
|
||||
# parsing_problem_msg = 'Problem with parsing content of firebird.log'
|
||||
# if len(inet_msg_words) == 0:
|
||||
# print('%s: message with "inet_error" not found.' % parsing_problem_msg)
|
||||
# elif len(inet_msg_words) < 4:
|
||||
# print('%s: message with "inet_error" contains less than 4 tokens.' % parsing_problem_msg)
|
||||
# else:
|
||||
#
|
||||
# #print('Fixed data: '+inet_msg_words[4]+' '+inet_msg_words[5]+' '+inet_msg_words[6]+' '+inet_msg_words[7])
|
||||
#
|
||||
# # http://stackoverflow.com/questions/4271740/how-can-i-use-python-to-get-the-system-hostname
|
||||
#
|
||||
# # commented 17.02.2017 due to 2.5.9 (no info about remote host there):
|
||||
# #if inet_msg_words[8].upper()==socket.gethostname().upper():
|
||||
# # print('Remote host: valid, passed socket.gethostname()')
|
||||
# #else:
|
||||
# # print('Invalid host=|'+inet_msg_words[8]+'|')
|
||||
#
|
||||
# # does not work on Python 3.4! >>> if is_valid_ipv4(inet_msg_words[10]) or is_valid_ipv6(inet_msg_words[10]):
|
||||
# if inet_msg_words[n-4] + '/' + inet_msg_words[n-3] == mon_ip:
|
||||
# print("String IP/port: valid, equal to 'CLIENT_ADDRESS'")
|
||||
# else:
|
||||
# print('Invalid IP/port=|'+inet_msg_words[n-4]+'/'+inet_msg_words[n-3]+'| - differ from mon_ip=|'+mon_ip+'|')
|
||||
#
|
||||
# if inet_msg_words[n-3].isdigit():
|
||||
# print('Port value: valid, positive integer.')
|
||||
# else:
|
||||
# print('Invalid port=|'+inet_msg_words[n-3]+'|')
|
||||
#
|
||||
# if inet_msg_words[n-1].upper().split('.')[0] == getpass.getuser().upper():
|
||||
# # 2.5.9: got 'ZOTOV.-1.-1' ==> must be kust of one word: 'ZOTOV'
|
||||
# print('OS user: valid, passed getpass.getuser()')
|
||||
# else:
|
||||
# print('Invalid OS user=|'+inet_msg_words[n-1]+'|')
|
||||
#
|
||||
#
|
||||
# # Cleanup.
|
||||
# ##########
|
||||
# time.sleep(1)
|
||||
# cleanup( [i.name for i in (f_sql_txt,f_sql_log,f_sql_err,f_fblog_before,f_fblog_after,f_diff_txt) ] )
|
||||
#
|
||||
#
|
||||
#---
|
||||
mon_ip = 'UNKNOWN_IP'
|
||||
with act.db.connect() as con1:
|
||||
cur1 = con1.cursor()
|
||||
cur1.execute("select rdb$get_context('SYSTEM','CLIENT_ADDRESS') from rdb$database")
|
||||
mon_ip = cur1.fetchone()[0].split('/')[0] # '<ip_address>/<port>' --> '<ip_address>' etc
|
||||
|
||||
with act.connect_server(encoding=locale.getpreferredencoding()) as srv:
|
||||
srv.info.get_log()
|
||||
fb_log_init = srv.readlines()
|
||||
|
||||
tmp_paused_sql.write_text("rollback;set transaction lock timeout 7;execute procedure sp_delay;")
|
||||
|
||||
with open(os.devnull, 'w') as fn_nul:
|
||||
#with open(tmp_paused_log, mode='w') as fn_nul:
|
||||
p_paused_isql = subprocess.Popen([act.vars['isql'], act.db.dsn,
|
||||
'-user', act.db.user,
|
||||
'-password', act.db.password,
|
||||
'-q', '-i', str(tmp_paused_sql)],
|
||||
stdout = fn_nul,
|
||||
stderr = subprocess.STDOUT
|
||||
)
|
||||
time.sleep(1) # Let ISQL to establish connection and stay there in pause
|
||||
p_paused_isql.terminate()
|
||||
|
||||
|
||||
time.sleep(1) # Let changes in firebird.log be flushed on disk
|
||||
|
||||
with act.connect_server(encoding=locale.getpreferredencoding()) as srv:
|
||||
srv.info.get_log()
|
||||
fb_log_curr = srv.readlines()
|
||||
|
||||
inet_msg_words = []
|
||||
for line in unified_diff(fb_log_init, fb_log_curr):
|
||||
if line.startswith('+') and 'INET/INET_ERROR' in line.upper():
|
||||
# DO NOT include ':' in list of delimiters! It occurs in IPv6 address:
|
||||
inet_msg_words = line.replace(',',' ').replace('/',' ').replace('=',' ').split()
|
||||
break
|
||||
|
||||
# Tokens, numerated from zero (NB: leftmost is "PLUS" sign and has index = 0)
|
||||
# ---------------------------------------------------------------------------
|
||||
# + INET inet_error read errno 10054 client host prog1 address 127.0.0.1 4417 user john ------- for IPv4
|
||||
# + INET inet_error read errno 10054 client host prog2 address x::y:z:u:v 56831 user mick ------- for IPv6
|
||||
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13
|
||||
|
||||
n = len(inet_msg_words)
|
||||
|
||||
parsing_problem_msg = 'Problem with parsing content of firebird.log'
|
||||
if len(inet_msg_words) == 0:
|
||||
print(f'{parsing_problem_msg}: message with "inet_error" not found.')
|
||||
elif len(inet_msg_words) < 4:
|
||||
print(f'{parsing_problem_msg}: message with "inet_error" contains less than 4 tokens.')
|
||||
else:
|
||||
|
||||
# http://stackoverflow.com/questions/4271740/how-can-i-use-python-to-get-the-system-hostname
|
||||
|
||||
if is_valid_ipv4(inet_msg_words[n-4]) or is_valid_ipv6(inet_msg_words[n-4]):
|
||||
print("Check IP using methods from 'socket' package: PASSED.")
|
||||
else:
|
||||
print("IP address is INVALID.")
|
||||
|
||||
if inet_msg_words[n-4] == mon_ip:
|
||||
print("Check IP for equality to 'CLIENT_ADDRESS' value: PASSED.")
|
||||
else:
|
||||
print('IP address: |'+inet_msg_words[n-4]+'| - differ from mon_ip=|'+mon_ip+'|')
|
||||
|
||||
if inet_msg_words[n-3].isdigit():
|
||||
print('Check port value: PASSED, positive integer.')
|
||||
else:
|
||||
print('Invalid port=|'+inet_msg_words[n-3]+'|')
|
||||
|
||||
if inet_msg_words[n-1].upper().split('.')[0] == getpass.getuser().upper():
|
||||
# 2.5.9: got 'ZOTOV.-1.-1' ==> must be just one word: 'ZOTOV'
|
||||
print("Check OS user using 'getpass' package: PASSED")
|
||||
else:
|
||||
print('Invalid OS user=|'+inet_msg_words[n-1]+'|')
|
||||
|
||||
|
||||
act.expected_stdout = expected_stdout_log_diff
|
||||
act.stdout = capsys.readouterr().out
|
||||
assert act.clean_stdout == act.clean_expected_stdout
|
||||
|
Loading…
Reference in New Issue
Block a user