#coding:utf-8 # # id: bugs.core_4135 # title: In SS sweep blocks establishment of concurrent attachments # decription: # We create DB-level trigger in order to log attachments info. # Also, we create a table with long-key indexed fields for sweep have a big time to be completed. # Then we insert data into this table and create indices on its fields. # After this we delete just inserted rows and make commit thus providing lot of job to GC. # # When we call on next step sweep, it must work a very long time (maybe several minutes) - # this was checked on host with common PC characteristics: P-IV 3.0 GHz, SATA). # # We launch SWEEP by async. call of FBSVCMGR and save current timestamp in 'DTS_BEG_FOR_ATTACHMENTS' variable. # We allow sweep to work several seconds alone (see 'WAIT_BEFORE_RUN_1ST_ATT') and then run loop with launching # ISQL attachments, all of them - also in async. mode. # # Each ISQL connect will add one row to the log-table ('mon_attach_data') by ON-CONNECT trigger - and we'll # query later its data: how many ISQL did establish connections while sweep worked. # # After loop we wait several seconds in order to be sure that all ISQL are loaded (see 'WAIT_FOR_ALL_ATT_DONE'). # # Then we save new value of current timestamp in 'DTS_END_FOR_ATTACHMENTS' variable. # After this we call FBSVCMGR with arguments to make SHUTDOWN of database, thus killing all existing attachments # (SWEEP will be also interrupted in that case). This is done in SYNC mode (control will not return from SHUTDOWN # process until it will be fully completed). # # Then we return database to ONLINE state and make single ISQL connect with '-nod' switch. # This (last) attachment to database will query data of Log table 'mon_attach_data' and check that number of # records (i.e. ACTUALLY etsablished attachment) is equal to which we check (see 'PLANNED_ATTACH_CNT'). # # Checked 07-jun-2016 on: # 3.0.0.32527, SS/SC/CS. # 4.0.0.238, SS/SC/CS. # # 11-JUN-2020. # We also launch trace just before sweep with logging: # 1) sweep activity # 2) execute_trigger_finish events # At the final point we make parsing of trace log and check that: # 1) no attachments were made *before* sweep start; NB: WAIT_BEFORE_RUN_1ST_ATT must be increased if this is not so. # 2) number of attachments AFTER sweep start and BEFORE its finish equals to required ('PLANNED_ATTACH_CNT'). # Checked again on: 3.0.6.33296 SS ; V3.0.6.33289 CS ; 4.0.0.2037 SS ; 4.0.0.2025 CS # # 04-JUL-2020. # Reduced PLANNED_ATTACH_CNT from 30 to 5 because 4.0 Classic remains fail. # # tracker_id: CORE-4135 # min_versions: ['3.0'] # versions: 3.0 # qmid: None import pytest from firebird.qa import db_factory, isql_act, Action # version: 3.0 # resources: None substitutions_1 = [('[ \t]+', ' ')] init_script_1 = """""" db_1 = db_factory(sql_dialect=3, init=init_script_1) # test_script_1 #--- # # import os # import sys # import subprocess # from subprocess import Popen # import time # import datetime # from datetime import datetime # import re # import shutil # from fdb import services # # os.environ["ISC_USER"] = user_name # os.environ["ISC_PASSWORD"] = user_password # db_file = db_conn.database_name # # db_conn.close() # # #--------------------------------------------- # # def flush_and_close(file_handle): # # https://docs.python.org/2/library/os.html#os.fsync # # If you're starting with a Python file object f, # # first do f.flush(), and # # then do os.fsync(f.fileno()), to ensure that all internal buffers associated with f are written to disk. # global os # # file_handle.flush() # if file_handle.mode not in ('r', 'rb'): # # otherwise: "OSError: [Errno 9] Bad file descriptor"! # os.fsync(file_handle.fileno()) # file_handle.close() # # #-------------------------------------------- # # def cleanup( f_names_list ): # global os # for i in range(len( f_names_list )): # if os.path.isfile( f_names_list[i]): # os.remove( f_names_list[i] ) # if os.path.isfile( f_names_list[i]): # print('ERROR: can not remove file ' + f_names_list[i]) # # #-------------------------------------------- # # # ######################## # ### s e t t i n g s ### # ######################## # # # How many rows must be inserted to the test table: # RECORDS_TO_ADD = 10000 # # # Length of indexed fields. # # NB, 11.06.2020: sweep can make its work very fast # # so we have to increase length of indexed fields # # Checked on 4.0.0.2025 Classic # INDEXED_FLD_LEN = 500 # # # How many attachments we want to establish # # during sweep will work: # PLANNED_ATTACH_CNT = 5 # # # How many seconds we allow to all PLANNED_ATTACH_CNT # # sessions for establishing their connections: # WAIT_FOR_ALL_ATT_DONE = 10 # # # How many seconds we allow SWEEP to work alone, # # i.e. before 1st ISQL make its attachment: # WAIT_BEFORE_RUN_1ST_ATT = 5 # # ##################### # # fb_home = services.connect(host='localhost').get_home_directory() # # sql_ddl=''' recreate table mon_attach_data ( # mon_attachment_id bigint # ,mon_timestamp timestamp default 'now' # ); # # recreate table test ( # s01 varchar(%(INDEXED_FLD_LEN)s) # ,s02 varchar(%(INDEXED_FLD_LEN)s) # ,s03 varchar(%(INDEXED_FLD_LEN)s) # ); # # set term ^; # create or alter trigger trg_connect active on connect position 0 as # begin # -- IN AUTONOMOUS TRANSACTION DO # insert into mon_attach_data ( mon_attachment_id ) values (current_connection); # end # ^ # set term ;^ # commit; # ''' % locals() # # sql_ddl_cmd=open( os.path.join(context['temp_directory'],'tmp_ddl_4135.sql'), 'w') # sql_ddl_cmd.write(sql_ddl) # flush_and_close( sql_ddl_cmd ) # # sql_ddl_log=open( os.path.join(context['temp_directory'],'tmp_ddl_4135.log'), 'w') # subprocess.call( [context['isql_path'], dsn, "-i", sql_ddl_cmd.name], # stdout=sql_ddl_log, # stderr=subprocess.STDOUT # ) # flush_and_close( sql_ddl_log ) # # #-------------------------------------------------- # # sql_data=''' set term ^; # execute block as # declare n int = %(RECORDS_TO_ADD)s; # begin # while (n>0) do # insert into test(s01, s02, s03) # values( rpad('', %(INDEXED_FLD_LEN)s, uuid_to_char(gen_uuid()) ), # rpad('', %(INDEXED_FLD_LEN)s, uuid_to_char(gen_uuid()) ), # rpad('', %(INDEXED_FLD_LEN)s, uuid_to_char(gen_uuid()) ) # ) returning :n-1 into n; # end^ # set term ;^ # commit; # # create index test_a on test(s01,s02,s03); # create index test_b on test(s02,s03,s01); # create index test_c on test(s03,s02,s01); # create index test_d on test(s01,s03,s02); # create index test_e on test(s02,s01,s03); # create index test_f on test(s03,s01,s02); # commit; # # delete from test; # commit; -- ==> lot of garbage in indexed pages will be after this. # ''' % locals() # # # # Temporay change FW to OFF in order to make DML faster: # fn_nul = open(os.devnull, 'w') # # subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr", # "action_properties", "dbname", db_file, # "prp_write_mode", "prp_wm_async"], # stdout=fn_nul, stderr=subprocess.STDOUT) # # # sql_data_cmd=open( os.path.join(context['temp_directory'],'tmp_data_4135.sql'), 'w') # sql_data_cmd.write(sql_data) # flush_and_close( sql_data_cmd ) # # sql_data_log=open( os.path.join(context['temp_directory'],'tmp_data_4135.log'), 'w') # subprocess.call( [context['isql_path'], dsn, "-nod", "-i", sql_data_cmd.name], # stdout=sql_data_log, # stderr=subprocess.STDOUT) # flush_and_close( sql_data_log ) # # # Restore FW to ON (make sweep to do its work "harder"): # subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr", # "action_properties", "dbname", db_file, # "prp_write_mode", "prp_wm_sync"], # stdout=fn_nul, stderr=subprocess.STDOUT) # fn_nul.close() # # #-------------------------------------------------- # txt = ''' # database=%[\\\\\\\\/]security?.fdb # { # enabled = false # } # database=%[\\\\\\\\/]bugs.core_4135.fdb # { # enabled = true # log_initfini = false # log_errors = true # log_sweep = true # # log_connections = true # log_trigger_finish = true # time_threshold = 0 # } # ''' # # f_trc_cfg=open( os.path.join(context['temp_directory'],'tmp_trace_4135.cfg'), 'w') # f_trc_cfg.write(txt) # flush_and_close( f_trc_cfg ) # # # ############################################################## # # S T A R T T R A C E i n S E P A R A T E P R O C E S S # # ############################################################## # # f_trc_log=open( os.path.join(context['temp_directory'],'tmp_trace_4135.log'), "w") # f_trc_err=open( os.path.join(context['temp_directory'],'tmp_trace_4135.err'), "w") # # # ::: NB ::: DO NOT USE 'localhost:service_mgr' here! Use only local protocol: # p_trace = Popen( [ context['fbsvcmgr_path'], 'service_mgr', 'action_trace_start' , 'trc_cfg', f_trc_cfg.name],stdout=f_trc_log,stderr=f_trc_err) # # # time.sleep(1) # # # #################################################### # # G E T A C T I V E T R A C E S E S S I O N I D # # #################################################### # # Save active trace session info into file for further parsing it and obtain session_id back (for stop): # # f_trc_lst = open( os.path.join(context['temp_directory'],'tmp_trace_4135.lst'), 'w') # subprocess.call([context['fbsvcmgr_path'], 'localhost:service_mgr', 'action_trace_list'], stdout=f_trc_lst) # flush_and_close( f_trc_lst ) # # # !!! DO NOT REMOVE THIS LINE !!! # time.sleep(1) # # trcssn=0 # with open( f_trc_lst.name,'r') as f: # for line in f: # i=1 # if 'Session ID' in line: # for word in line.split(): # if i==3: # trcssn=word # i=i+1 # break # f.close() # # Result: `trcssn` is ID of active trace session. Now we have to terminate it: # # #-------------------------------------------------- # # # Now we run SWEEP in child process (asynchronous) and while it will in work - try to establish several attachments. # ################## # # fbsvc_log=open( os.path.join(context['temp_directory'],'tmp_svc_4135.log'), 'w') # fbsvc_log.write("Starting sweep") # fbsvc_log.write("") # fbsvc_log.seek(0,2) # p_sweep=subprocess.Popen([ context['fbsvcmgr_path'],"localhost:service_mgr", # "action_repair", "dbname", db_file, "rpr_sweep_db"], # stdout=fbsvc_log, stderr=subprocess.STDOUT # ) # # time.sleep( WAIT_BEFORE_RUN_1ST_ATT ) # # #--------------------------------------------------------- # # # Save current timestamp: this is point BEFORE we try to establish attachmentas using several ISQL sessions: # DTS_BEG_FOR_ATTACHMENTS = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S') # # # Try to establish several attachments to database while sweep is in work: # # sqlatt=open( os.path.join(context['temp_directory'],'tmp_att_4135.sql'), 'w') # sql_wrk=''' # set heading off; # set term ^; # execute block returns(att bigint) as # begin # att = current_connection; # suspend; # end # ^ # set term ;^ # commit; # ''' # sqlatt.write(sql_wrk) # sqlatt.close() # # f_list=[] # p_list=[] # for i in range(0, PLANNED_ATTACH_CNT): # sqllog=open( os.path.join(context['temp_directory'],'tmp_att_4135_'+str(i)+'.log'), 'w') # f_list.append(sqllog) # # for i in range(len(f_list)): # p_isql=Popen( [ context['isql_path'] , dsn, "-n", "-i", sqlatt.name ], # stdout=f_list[i], # stderr=subprocess.STDOUT # ) # p_list.append(p_isql) # # # Here we have to wait a little in order to all ISQL will establish their attachments: # # 4.0.0.238 Classic: one needed to increased time from 3 to 5 seconds otherwise NOT ALL # # ISQL attachments can be etsablished! # # # 11.06.2020: 4.0 Classic requires valuable delay here! # time.sleep( WAIT_FOR_ALL_ATT_DONE ) # # # Save current timestamp: we assume that now ALL isql sessions already FINISHED to establish attachment (or the whole job and quited): # DTS_END_FOR_ATTACHMENTS = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S') # # #--------------------------------------------------------- # # # Move database to shutdown in order to stop sweep, and then extract header info: # # dbshut_log=open( os.path.join(context['temp_directory'],'tmp_sht_4135.log'), 'w') # dbshut_log.write("Call DB shutdown") # dbshut_log.write("") # dbshut_log.seek(0,2) # subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr", # "action_properties", "dbname", db_file, # "prp_shutdown_mode", "prp_sm_full", "prp_force_shutdown", "0"], # stdout=dbshut_log, stderr=subprocess.STDOUT) # # dbshut_log.seek(0,2) # # subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr", # "action_db_stats", "dbname", db_file, "sts_hdr_pages"], # stdout=dbshut_log, stderr=subprocess.STDOUT) # # # Kill all child ISQL processes: # for i in range(len(f_list)): # p_list[i].terminate() # flush_and_close( f_list[i] ) # # p_sweep.terminate() # # #--------------------------------------------------------- # # # #################################################### # # S E N D R E Q U E S T T R A C E T O S T O P # # #################################################### # if trcssn>0: # fn_nul = open(os.devnull, 'w') # subprocess.call([context['fbsvcmgr_path'], 'localhost:service_mgr', 'action_trace_stop','trc_id', trcssn], stdout=fn_nul) # fn_nul.close() # # DO NOT REMOVE THIS LINE: # time.sleep(1) # # p_trace.terminate() # flush_and_close( f_trc_log ) # flush_and_close( f_trc_err ) # # #--------------------------------------------------------- # # # Return database online in order to check number of attachments that were established while sweep was in work: # # dbshut_log.seek(0,2) # dbshut_log.write("Return DB to online") # dbshut_log.seek(0,2) # subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr", # "action_properties", "dbname", db_file, "prp_db_online"], # stdout=dbshut_log, stderr=subprocess.STDOUT) # # dbshut_log.seek(0,2) # subprocess.call([context['fbsvcmgr_path'],"localhost:service_mgr", # "action_db_stats", "dbname", db_file, "sts_hdr_pages"], # stdout=dbshut_log, stderr=subprocess.STDOUT) # # flush_and_close( dbshut_log ) # flush_and_close( fbsvc_log ) # # # # 4debug only, remove later: # ############################ # #bk_file="$(DATABASE_LOCATION)tmp_4135_copy.fdb" # #shutil.copy2( db_file, bk_file ) # # #--------------------------------------------------------- # # # Check: number of ISQL attachments between DTS_BEG_FOR_ATTACHMENTS and DTS_END_FOR_ATTACHMENTS must be equal to 'PLANNED_ATTACH_CNT' # # # SuperServer has two system attachments with mon$user='Garbage Collector' and 'Cache Writer', # # we have to SKIP them from counting: # # sql_data=''' set list on; # select iif( cnt = %(PLANNED_ATTACH_CNT)s # ,'EXPECTED' -- OK, number of attachments is equal to expected value. # ,'POOR: only ' || cnt || ' attachments established for %(WAIT_FOR_ALL_ATT_DONE)s seconds, expected: %(PLANNED_ATTACH_CNT)s' # ) as "DB-logged attachments:" # from ( # select count(*) as cnt # from mon_attach_data d # where d.mon_timestamp between '%(DTS_BEG_FOR_ATTACHMENTS)s' and '%(DTS_END_FOR_ATTACHMENTS)s' # ); # # /* # -- 4debug, do not delete: # set list off; # set count on; # select d.* # from mon_attach_data d # where d.mon_timestamp between '%(DTS_BEG_FOR_ATTACHMENTS)s' and '%(DTS_END_FOR_ATTACHMENTS)s'; # -- */ # # commit; # ''' % locals() # # sqlchk=open( os.path.join(context['temp_directory'],'tmp_chk_4135.sql'), 'w') # sqlchk.write( sql_data ) # flush_and_close( sqlchk ) # # sqllog=open( os.path.join(context['temp_directory'],'tmp_chk_4135.log'), 'w') # subprocess.call( [ context['isql_path'] , dsn, "-pag", "99999", "-nod", "-i", sqlchk.name ], # stdout=sqllog, stderr=subprocess.STDOUT # ) # flush_and_close( sqllog ) # # with open(sqllog.name) as f: # print( f.read() ) # # allowed_patterns = [ # re.compile('EXECUTE_TRIGGER_FINISH', re.IGNORECASE) # ,re.compile('SWEEP_START', re.IGNORECASE) # ,re.compile('SWEEP_FINISH', re.IGNORECASE) # ,re.compile('SWEEP_FAILED', re.IGNORECASE) # ] # # # All events 'EXECUTE_TRIGGER_FINISH' must occus between SWEEP_START and SWEEP_FAILED # # found_swp_start = False # found_swp_finish = False # triggers_count_before_swp_start = 0 # triggers_count_before_swp_finish = 0 # with open(f_trc_log.name, 'r') as f: # for line in f: # for p in allowed_patterns: # result = p.search(line) # if result: # what_found = result.group(0) # if 'SWEEP_START' in what_found: # found_swp_start = True # if 'SWEEP_FINISH' in what_found or 'SWEEP_FAILED' in what_found: # found_swp_finish = True # if 'EXECUTE_TRIGGER_FINISH' in what_found: # triggers_count_before_swp_start += (1 if not found_swp_start else 0) # triggers_count_before_swp_finish += ( 1 if found_swp_start and not found_swp_finish else 0 ) # # time.sleep(1) # # #print('Trace log parsing. Found triggers before sweep start:', triggers_count_before_swp_start ) # #print('Trace log parsing. Found triggers before sweep finish:', triggers_count_before_swp_finish ) # print('Trace log parsing. Found triggers before sweep start:', 'EXPECTED (no triggers found).' if triggers_count_before_swp_start == 0 else 'UNEXPECTED: %(triggers_count_before_swp_start)s instead of 0.' % locals() ) # print('Trace log parsing. Found triggers before sweep finish:', 'EXPECTED (equals to planned).' if triggers_count_before_swp_finish == PLANNED_ATTACH_CNT else 'UNEXPECTED: %(triggers_count_before_swp_finish)s instead of %(PLANNED_ATTACH_CNT)s.' % locals() ) # # #--------------------------------------------------------- # # #CLEANUP # ######### # time.sleep(1) # # cleanup( [i.name for i in f_list] ) # cleanup( [i.name for i in (dbshut_log, fbsvc_log, sqllog, sql_data_log, sql_data_cmd, sql_ddl_log, sql_ddl_cmd, sqlchk, sqlatt, f_trc_log, f_trc_err, f_trc_cfg, f_trc_lst) ] ) # # #--- #act_1 = python_act('db_1', test_script_1, substitutions=substitutions_1) expected_stdout_1 = """ DB-logged attachments: EXPECTED Trace log parsing. Found triggers before sweep start: EXPECTED (no triggers found). Trace log parsing. Found triggers before sweep finish: EXPECTED (equals to planned). """ @pytest.mark.version('>=3.0') @pytest.mark.xfail def test_core_4135_1(db_1): pytest.fail("Test not IMPLEMENTED")