diff --git a/src/common/ThreadStart.h b/src/common/ThreadStart.h index 5c465c139f..40be6ef392 100644 --- a/src/common/ThreadStart.h +++ b/src/common/ThreadStart.h @@ -118,7 +118,7 @@ class ThreadFinishSync public: typedef void ThreadRoutine(TA); - ThreadFinishSync(Firebird::MemoryPool& pool, ThreadRoutine* routine, int priority_arg) + ThreadFinishSync(Firebird::MemoryPool& pool, ThreadRoutine* routine, int priority_arg = THREAD_medium) : #ifdef USE_FINI_SEM fini(pool), @@ -126,7 +126,8 @@ public: threadHandle(0), #endif threadRoutine(routine), - threadPriority(priority_arg) + threadPriority(priority_arg), + closing(false) { } void run(TA arg) @@ -140,13 +141,26 @@ public: ); } + bool tryWait() + { + if (closing) + { + waitForCompletion(); + return true; + } + return false; + } + void waitForCompletion() { #ifdef USE_FINI_SEM fini.enter(); #else - Thread::waitForCompletion(threadHandle); - threadHandle = 0; + if (threadHandle) + { + Thread::waitForCompletion(threadHandle); + threadHandle = 0; + } #endif } @@ -160,6 +174,7 @@ private: TA threadArg; ThreadRoutine* threadRoutine; int threadPriority; + bool closing; static THREAD_ENTRY_DECLARE internalRun(THREAD_ENTRY_PARAM arg) { @@ -188,6 +203,7 @@ private: threadArg->exceptionHandler(ex, threadRoutine); } #endif + closing = true; } }; diff --git a/src/jrd/CryptoManager.cpp b/src/jrd/CryptoManager.cpp index 7ca5ed5415..09d93531ba 100644 --- a/src/jrd/CryptoManager.cpp +++ b/src/jrd/CryptoManager.cpp @@ -1346,7 +1346,7 @@ namespace Jrd { bool CryptoManager::down() const { - return flDown || (dbb.dbb_flags & DBB_closing); + return flDown; } void CryptoManager::addClumplet(string& signature, ClumpletReader& block, UCHAR tag) diff --git a/src/jrd/Database.cpp b/src/jrd/Database.cpp index f4424e0959..9200f41add 100644 --- a/src/jrd/Database.cpp +++ b/src/jrd/Database.cpp @@ -141,6 +141,7 @@ namespace Jrd { SPTHR_DEBUG(fprintf(stderr, "blocking_ast_sweep true %p\n", dbb)); dbb->dbb_thread_mutex.leave(); + LCK_release(tdbb, dbb->dbb_sweep_lock); break; } } @@ -178,13 +179,6 @@ namespace Jrd return false; } - if (dbb_flags & DBB_closing) - { - SPTHR_DEBUG(fprintf(stderr, "allowSweepThread false, dbb closing\n")); - dbb_thread_mutex.leave(); - return false; - } - while (true) { AtomicCounter::counter_type old = dbb_flags; @@ -239,7 +233,7 @@ namespace Jrd { SPTHR_DEBUG(fprintf(stderr, "allowSweepRun %p\n", this)); - if (readOnly() || (dbb_flags & DBB_closing)) + if (readOnly()) return false; Jrd::Attachment* const attachment = tdbb->getAttachment(); diff --git a/src/jrd/Database.h b/src/jrd/Database.h index 1c287a443a..d7f3ff1a35 100644 --- a/src/jrd/Database.h +++ b/src/jrd/Database.h @@ -224,7 +224,7 @@ const ULONG DBB_no_fs_cache = 0x40000L; // Not using file system cache const ULONG DBB_sweep_starting = 0x80000L; // Auto-sweep is starting const ULONG DBB_creating = 0x100000L; // Database creation is in progress const ULONG DBB_shared = 0x200000L; // Database object is shared among connections -const ULONG DBB_closing = 0x400000L; // Database closing, special backgroud threads should exit +//const ULONG DBB_closing = 0x400000L; // Database closing, special backgroud threads should exit // // dbb_ast_flags @@ -474,7 +474,6 @@ public: CryptoManager* dbb_crypto_manager; Firebird::RefPtr dbb_init_fini; Firebird::XThreadMutex dbb_thread_mutex; // special threads start/stop mutex - Thread::Handle dbb_sweep_thread; Firebird::RefPtr dbb_linger_timer; unsigned dbb_linger_seconds; time_t dbb_linger_end; @@ -530,7 +529,6 @@ private: dbb_external_file_directory_list(NULL), dbb_shared_counter(shared), dbb_init_fini(FB_NEW_POOL(*getDefaultMemoryPool()) ExistenceRefMutex()), - dbb_sweep_thread(0), dbb_linger_seconds(0), dbb_linger_end(0), dbb_plugin_config(pConf) diff --git a/src/jrd/jrd.cpp b/src/jrd/jrd.cpp index eadf332167..a5acef1c54 100644 --- a/src/jrd/jrd.cpp +++ b/src/jrd/jrd.cpp @@ -6396,6 +6396,54 @@ static void release_attachment(thread_db* tdbb, Jrd::Attachment* attachment) if (dbb->dbb_crypto_manager) dbb->dbb_crypto_manager->detach(tdbb, attachment); + Sync sync(&dbb->dbb_sync, "jrd.cpp: release_attachment"); + + // avoid races with special threads + XThreadEnsureUnlock threadGuard(dbb->dbb_thread_mutex, FB_FUNCTION); + threadGuard.enter(); + + sync.lock(SYNC_EXCLUSIVE); + + // stop special threads if and only if we release last regular attachment + bool other = false; + { // checkout scope + EngineCheckout checkout(tdbb, FB_FUNCTION); + + SPTHR_DEBUG(fprintf(stderr, "\nrelease attachment=%p\n", attachment)); + for (Jrd::Attachment* att = dbb->dbb_attachments; att; att = att->att_next) + { + SPTHR_DEBUG(fprintf(stderr, "att=%p FromThr=%c ", att, att->att_flags & ATT_from_thread ? '1' : '0')); + if (att == attachment) + { + SPTHR_DEBUG(fprintf(stderr, "self\n")); + continue; + } + if (att->att_flags & ATT_from_thread) + { + SPTHR_DEBUG(fprintf(stderr, "found special att=%p\n", att)); + continue; + } + + // Found attachment that is not current (to be released) and is not special + other = true; + SPTHR_DEBUG(fprintf(stderr, "other\n")); + break; + } + + // Notify special threads + threadGuard.leave(); + + // Sync with special threads + sync.unlock(); + if (!other) + { + // crypt thread + if (dbb->dbb_crypto_manager) + dbb->dbb_crypto_manager->terminateCryptThread(tdbb, true); + } + + } // EngineCheckout scope + Monitoring::cleanupAttachment(tdbb); dbb->dbb_extManager.closeAttachment(tdbb, attachment); @@ -6451,69 +6499,8 @@ static void release_attachment(thread_db* tdbb, Jrd::Attachment* attachment) attachment->mergeStats(); - Sync sync(&dbb->dbb_sync, "jrd.cpp: release_attachment"); - - // avoid races with special threads - XThreadEnsureUnlock threadGuard(dbb->dbb_thread_mutex, FB_FUNCTION); - threadGuard.enter(); - - sync.lock(SYNC_EXCLUSIVE); - - // stop special threads if and only if we release last regular attachment - bool other = false; - { // checkout scope - EngineCheckout checkout(tdbb, FB_FUNCTION); - - SPTHR_DEBUG(fprintf(stderr, "\nrelease attachment=%p\n", attachment)); - for (Jrd::Attachment* att = dbb->dbb_attachments; att; att = att->att_next) - { - SPTHR_DEBUG(fprintf(stderr, "att=%p FromThr=%c ", att, att->att_flags & ATT_from_thread ? '1' : '0')); - if (att == attachment) - { - SPTHR_DEBUG(fprintf(stderr, "self\n")); - continue; - } - if (att->att_flags & ATT_from_thread) - { - SPTHR_DEBUG(fprintf(stderr, "found special att=%p\n", att)); - continue; - } - - // Found attachment that is not current (to be released) and is not special - other = true; - SPTHR_DEBUG(fprintf(stderr, "other\n")); - break; - } - - // Notify special threads - if (!other) - dbb->dbb_flags |= DBB_closing; - threadGuard.leave(); - - // Sync with special threads - if (!other) - { - sync.unlock(); - - // crypt thread - if (dbb->dbb_crypto_manager) - dbb->dbb_crypto_manager->terminateCryptThread(tdbb, true); - - // sweep thread - if (dbb->dbb_sweep_thread) - { - Thread::waitForCompletion(dbb->dbb_sweep_thread); - dbb->dbb_sweep_thread = 0; - } - } - - } // EngineCheckout scope - - // restore database lock if needed - if (!other) - sync.lock(SYNC_EXCLUSIVE); - // remove the attachment block from the dbb linked list + sync.lock(SYNC_EXCLUSIVE); for (Jrd::Attachment** ptr = &dbb->dbb_attachments; *ptr; ptr = &(*ptr)->att_next) { if (*ptr == attachment) @@ -7552,6 +7539,7 @@ static THREAD_ENTRY_DECLARE shutdown_thread(THREAD_ENTRY_PARAM arg) // Extra shutdown operations Service::shutdownServices(); + TRA_shutdown_sweep(); } catch (const Exception& ex) { diff --git a/src/jrd/tra.cpp b/src/jrd/tra.cpp index dddb89e6da..2defab69f6 100644 --- a/src/jrd/tra.cpp +++ b/src/jrd/tra.cpp @@ -102,7 +102,7 @@ static void release_temp_tables(thread_db*, jrd_tra*); static void retain_temp_tables(thread_db*, jrd_tra*, TraNumber); static void restart_requests(thread_db*, jrd_tra*); static void start_sweeper(thread_db*); -static THREAD_ENTRY_DECLARE sweep_database(THREAD_ENTRY_PARAM); +//static THREAD_ENTRY_DECLARE sweep_database(THREAD_ENTRY_PARAM); static void transaction_flush(thread_db* tdbb, USHORT flush_flag, TraNumber tra_number); static void transaction_options(thread_db*, jrd_tra*, const UCHAR*, USHORT); static void transaction_start(thread_db* tdbb, jrd_tra* temp); @@ -1789,13 +1789,6 @@ void TRA_sweep(thread_db* tdbb) try { - // Avoid races with release_attachment() - - XThreadEnsureUnlock releaseAttGuard(dbb->dbb_thread_mutex, FB_FUNCTION); - releaseAttGuard.enter(); - if (dbb->dbb_flags & DBB_closing) - return; - // Identify ourselves as a sweeper thread. This accomplishes two goals: // 1) Sweep transaction is started "precommitted" and // 2) Execution is throttled in JRD_reschedule() by @@ -1824,11 +1817,6 @@ void TRA_sweep(thread_db* tdbb) attachment->att_flags &= ~ATT_notify_gc; - // Mark our attachment as special one - - attachment->att_flags |= ATT_from_thread; - releaseAttGuard.leave(); - if (VIO_sweep(tdbb, transaction, &traceSweep)) { // At this point, we know that no record versions belonging to dead @@ -2669,6 +2657,90 @@ static void retain_context(thread_db* tdbb, jrd_tra* transaction, bool commit, i } +namespace { + class SweepParameter : public GlobalStorage + { + public: + SweepParameter(Database* d) + : dbb(d) + { } + + void waitForStartup() + { + sem.enter(); + } + + static void runSweep(SweepParameter* par) + { + FbLocalStatus status; + PathName dbName(par->dbb->dbb_database_name); + + // reference is needed to guarantee that provider exists + // between semaphore release and attach database + AutoPlugin prov(JProvider::getInstance()); + par->sem.release(); + + AutoDispose dpb(UtilInterfacePtr()->getXpbBuilder(&status, IXpbBuilder::DPB, NULL, 0)); + status.check(); + dpb->insertString(&status, isc_dpb_user_name, "sweeper"); + status.check(); + UCHAR byte = isc_dpb_records; + dpb->insertBytes(&status, isc_dpb_sweep, &byte, 1); + status.check(); + const UCHAR* dpbBytes = dpb->getBuffer(&status); + status.check(); + unsigned dpbLen = dpb->getBufferLength(&status); + status.check(); + + AutoRelease att(prov->attachDatabase(&status, dbName.c_str(), dpbLen, dpbBytes)); + status.check(); + } + + void exceptionHandler(const Exception& ex, ThreadFinishSync::ThreadRoutine*) + { + FbLocalStatus st; + ex.stuffException(&st); + if (st->getErrors()[1] != isc_att_shutdown) + iscLogException("Automatic sweep error", ex); + } + + private: + Semaphore sem; + Database* dbb; + }; + + typedef ThreadFinishSync SweepSync; + typedef HalfStaticArray SweepThreads; + InitInstance sweepThreads; + GlobalPtr swThrMutex; + bool sweepDown = false; +} + + +void TRA_shutdown_sweep() +{ +/************************************** + * + * T R A _ s h u t d o w n _ s w e e p + * + ************************************** + * + * Functional description + * Wait for sweep threads exit. + * + **************************************/ + MutexLockGuard g(swThrMutex, FB_FUNCTION); + if (sweepDown) + return; + sweepDown = true; + + SweepThreads& swThr(sweepThreads()); + for (unsigned n = 0; n < swThr.getCount(); ++n) + swThr[n]->waitForCompletion(); + swThr.clear(); +} + + static void start_sweeper(thread_db* tdbb) { /************************************** @@ -2683,65 +2755,45 @@ static void start_sweeper(thread_db* tdbb) **************************************/ SET_TDBB(tdbb); Database* const dbb = tdbb->getDatabase(); + bool started = false; if (!dbb->allowSweepThread(tdbb)) return; TRA_update_counters(tdbb, dbb); - // pass dbb to sweep thread - if allowSweepThread() returned TRUE that is safe try { - Thread::start(sweep_database, dbb, THREAD_medium, &dbb->dbb_sweep_thread); - return; - } - catch (const Firebird::Exception& ex) - { - iscLogException("cannot start sweep thread", ex); - dbb->clearSweepFlags(tdbb); - } -} + MutexLockGuard g(swThrMutex, FB_FUNCTION); + if (sweepDown) + return; + // perform housekeeping + SweepThreads& swThr(sweepThreads()); + for (unsigned n = 0; n < swThr.getCount(); ) + { + if (swThr[n]->tryWait()) + { + delete swThr[n]; + swThr.remove(n); + } + else + ++n; + } -static THREAD_ENTRY_DECLARE sweep_database(THREAD_ENTRY_PARAM d) -{ -/************************************** - * - * s w e e p _ d a t a b a s e - * - ************************************** - * - * Functional description - * Sweep database. - * - **************************************/ - // determine database name - // taking into an account that thread is started successfully - // we should take care about parameters reference counter and DBB flags - Database* dbb = (Database*) d; - try - { - ISC_STATUS_ARRAY status_vector = {0}; - isc_db_handle db_handle = 0; - - Firebird::ClumpletWriter dpb(Firebird::ClumpletReader::dpbList, MAX_DPB_SIZE); - dpb.insertByte(isc_dpb_sweep, isc_dpb_records); - // use embedded authentication to attach database - const char* szAuthenticator = "sweeper"; - dpb.insertString(isc_dpb_user_name, szAuthenticator, fb_strlen(szAuthenticator)); - - isc_attach_database(status_vector, 0, dbb->dbb_database_name.c_str(), - &db_handle, dpb.getBufferLength(), - reinterpret_cast(dpb.getBuffer())); - if (db_handle) - isc_detach_database(status_vector, &db_handle); + AutoPtr sweepSync(FB_NEW SweepSync(*getDefaultMemoryPool(), SweepParameter::runSweep)); + SweepParameter swPar(dbb); + sweepSync->run(&swPar); + started = true; + swPar.waitForStartup(); + sweepThreads().add(sweepSync.release()); } catch (const Exception&) - { } - - dbb->clearSweepStarting(); // actually needed here only for classic, - // but do danger calling for super - return 0; + { + if (!started) + dbb->clearSweepStarting(); + throw; + } } @@ -3513,6 +3565,9 @@ static void transaction_start(thread_db* tdbb, jrd_tra* trans) // If the transaction block is getting out of hand, force a sweep + SPTHR_DEBUG(fprintf(stderr, "dbb_sweep_interval=%d tra_oldest_active=%d oldest=%d oldest_state=%d\n", + dbb->dbb_sweep_interval, trans->tra_oldest_active, oldest, oldest_state)); + if (dbb->dbb_sweep_interval && (trans->tra_oldest_active > oldest) && (trans->tra_oldest_active - oldest > dbb->dbb_sweep_interval) && diff --git a/src/jrd/tra_proto.h b/src/jrd/tra_proto.h index 4715fbc2f5..d9955434f1 100644 --- a/src/jrd/tra_proto.h +++ b/src/jrd/tra_proto.h @@ -64,5 +64,6 @@ void TRA_update_counters(Jrd::thread_db*, Jrd::Database*); int TRA_wait(Jrd::thread_db* tdbb, Jrd::jrd_tra* trans, TraNumber number, Jrd::jrd_tra::wait_t wait); void TRA_attach_request(Jrd::jrd_tra* transaction, Jrd::jrd_req* request); void TRA_detach_request(Jrd::jrd_req* request); +void TRA_shutdown_sweep(); #endif // JRD_TRA_PROTO_H diff --git a/src/jrd/vio.cpp b/src/jrd/vio.cpp index 5f2e87440a..1158968067 100644 --- a/src/jrd/vio.cpp +++ b/src/jrd/vio.cpp @@ -3705,13 +3705,6 @@ bool VIO_sweep(thread_db* tdbb, jrd_tra* transaction, TraceSweepEvent* traceSwee for (FB_SIZE_T i = 1; (vector = attachment->att_relations) && i < vector->count(); i++) { - if (dbb->dbb_flags & DBB_closing) - { - SPTHR_DEBUG(fprintf(stderr, "VIO_sweep exits\n")); - ret = false; - break; - } - relation = (*vector)[i]; if (relation) relation = MET_lookup_relation_id(tdbb, i, false); @@ -3742,12 +3735,6 @@ bool VIO_sweep(thread_db* tdbb, jrd_tra* transaction, TraceSweepEvent* traceSwee { CCH_RELEASE(tdbb, &rpb.getWindow(tdbb)); - if (dbb->dbb_flags & DBB_closing) - { - SPTHR_DEBUG(fprintf(stderr, "VIO_sweep exits after VIO_next_record\n")); - break; - } - if (relation->rel_flags & REL_deleting) break;