8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 21:23:03 +01:00

Backported last fix for CORE-5197

This commit is contained in:
AlexPeshkoff 2020-12-27 16:30:56 +03:00
parent b653313621
commit c42c34c370
8 changed files with 190 additions and 151 deletions

View File

@ -118,7 +118,7 @@ class ThreadFinishSync
public:
typedef void ThreadRoutine(TA);
ThreadFinishSync(Firebird::MemoryPool& pool, ThreadRoutine* routine, int priority_arg)
ThreadFinishSync(Firebird::MemoryPool& pool, ThreadRoutine* routine, int priority_arg = THREAD_medium)
:
#ifdef USE_FINI_SEM
fini(pool),
@ -126,7 +126,8 @@ public:
threadHandle(0),
#endif
threadRoutine(routine),
threadPriority(priority_arg)
threadPriority(priority_arg),
closing(false)
{ }
void run(TA arg)
@ -140,13 +141,26 @@ public:
);
}
bool tryWait()
{
if (closing)
{
waitForCompletion();
return true;
}
return false;
}
void waitForCompletion()
{
#ifdef USE_FINI_SEM
fini.enter();
#else
Thread::waitForCompletion(threadHandle);
threadHandle = 0;
if (threadHandle)
{
Thread::waitForCompletion(threadHandle);
threadHandle = 0;
}
#endif
}
@ -160,6 +174,7 @@ private:
TA threadArg;
ThreadRoutine* threadRoutine;
int threadPriority;
bool closing;
static THREAD_ENTRY_DECLARE internalRun(THREAD_ENTRY_PARAM arg)
{
@ -188,6 +203,7 @@ private:
threadArg->exceptionHandler(ex, threadRoutine);
}
#endif
closing = true;
}
};

View File

@ -1346,7 +1346,7 @@ namespace Jrd {
bool CryptoManager::down() const
{
return flDown || (dbb.dbb_flags & DBB_closing);
return flDown;
}
void CryptoManager::addClumplet(string& signature, ClumpletReader& block, UCHAR tag)

View File

@ -141,6 +141,7 @@ namespace Jrd
{
SPTHR_DEBUG(fprintf(stderr, "blocking_ast_sweep true %p\n", dbb));
dbb->dbb_thread_mutex.leave();
LCK_release(tdbb, dbb->dbb_sweep_lock);
break;
}
}
@ -178,13 +179,6 @@ namespace Jrd
return false;
}
if (dbb_flags & DBB_closing)
{
SPTHR_DEBUG(fprintf(stderr, "allowSweepThread false, dbb closing\n"));
dbb_thread_mutex.leave();
return false;
}
while (true)
{
AtomicCounter::counter_type old = dbb_flags;
@ -239,7 +233,7 @@ namespace Jrd
{
SPTHR_DEBUG(fprintf(stderr, "allowSweepRun %p\n", this));
if (readOnly() || (dbb_flags & DBB_closing))
if (readOnly())
return false;
Jrd::Attachment* const attachment = tdbb->getAttachment();

View File

@ -224,7 +224,7 @@ const ULONG DBB_no_fs_cache = 0x40000L; // Not using file system cache
const ULONG DBB_sweep_starting = 0x80000L; // Auto-sweep is starting
const ULONG DBB_creating = 0x100000L; // Database creation is in progress
const ULONG DBB_shared = 0x200000L; // Database object is shared among connections
const ULONG DBB_closing = 0x400000L; // Database closing, special backgroud threads should exit
//const ULONG DBB_closing = 0x400000L; // Database closing, special backgroud threads should exit
//
// dbb_ast_flags
@ -474,7 +474,6 @@ public:
CryptoManager* dbb_crypto_manager;
Firebird::RefPtr<ExistenceRefMutex> dbb_init_fini;
Firebird::XThreadMutex dbb_thread_mutex; // special threads start/stop mutex
Thread::Handle dbb_sweep_thread;
Firebird::RefPtr<Linger> dbb_linger_timer;
unsigned dbb_linger_seconds;
time_t dbb_linger_end;
@ -530,7 +529,6 @@ private:
dbb_external_file_directory_list(NULL),
dbb_shared_counter(shared),
dbb_init_fini(FB_NEW_POOL(*getDefaultMemoryPool()) ExistenceRefMutex()),
dbb_sweep_thread(0),
dbb_linger_seconds(0),
dbb_linger_end(0),
dbb_plugin_config(pConf)

View File

@ -6396,6 +6396,54 @@ static void release_attachment(thread_db* tdbb, Jrd::Attachment* attachment)
if (dbb->dbb_crypto_manager)
dbb->dbb_crypto_manager->detach(tdbb, attachment);
Sync sync(&dbb->dbb_sync, "jrd.cpp: release_attachment");
// avoid races with special threads
XThreadEnsureUnlock threadGuard(dbb->dbb_thread_mutex, FB_FUNCTION);
threadGuard.enter();
sync.lock(SYNC_EXCLUSIVE);
// stop special threads if and only if we release last regular attachment
bool other = false;
{ // checkout scope
EngineCheckout checkout(tdbb, FB_FUNCTION);
SPTHR_DEBUG(fprintf(stderr, "\nrelease attachment=%p\n", attachment));
for (Jrd::Attachment* att = dbb->dbb_attachments; att; att = att->att_next)
{
SPTHR_DEBUG(fprintf(stderr, "att=%p FromThr=%c ", att, att->att_flags & ATT_from_thread ? '1' : '0'));
if (att == attachment)
{
SPTHR_DEBUG(fprintf(stderr, "self\n"));
continue;
}
if (att->att_flags & ATT_from_thread)
{
SPTHR_DEBUG(fprintf(stderr, "found special att=%p\n", att));
continue;
}
// Found attachment that is not current (to be released) and is not special
other = true;
SPTHR_DEBUG(fprintf(stderr, "other\n"));
break;
}
// Notify special threads
threadGuard.leave();
// Sync with special threads
sync.unlock();
if (!other)
{
// crypt thread
if (dbb->dbb_crypto_manager)
dbb->dbb_crypto_manager->terminateCryptThread(tdbb, true);
}
} // EngineCheckout scope
Monitoring::cleanupAttachment(tdbb);
dbb->dbb_extManager.closeAttachment(tdbb, attachment);
@ -6451,69 +6499,8 @@ static void release_attachment(thread_db* tdbb, Jrd::Attachment* attachment)
attachment->mergeStats();
Sync sync(&dbb->dbb_sync, "jrd.cpp: release_attachment");
// avoid races with special threads
XThreadEnsureUnlock threadGuard(dbb->dbb_thread_mutex, FB_FUNCTION);
threadGuard.enter();
sync.lock(SYNC_EXCLUSIVE);
// stop special threads if and only if we release last regular attachment
bool other = false;
{ // checkout scope
EngineCheckout checkout(tdbb, FB_FUNCTION);
SPTHR_DEBUG(fprintf(stderr, "\nrelease attachment=%p\n", attachment));
for (Jrd::Attachment* att = dbb->dbb_attachments; att; att = att->att_next)
{
SPTHR_DEBUG(fprintf(stderr, "att=%p FromThr=%c ", att, att->att_flags & ATT_from_thread ? '1' : '0'));
if (att == attachment)
{
SPTHR_DEBUG(fprintf(stderr, "self\n"));
continue;
}
if (att->att_flags & ATT_from_thread)
{
SPTHR_DEBUG(fprintf(stderr, "found special att=%p\n", att));
continue;
}
// Found attachment that is not current (to be released) and is not special
other = true;
SPTHR_DEBUG(fprintf(stderr, "other\n"));
break;
}
// Notify special threads
if (!other)
dbb->dbb_flags |= DBB_closing;
threadGuard.leave();
// Sync with special threads
if (!other)
{
sync.unlock();
// crypt thread
if (dbb->dbb_crypto_manager)
dbb->dbb_crypto_manager->terminateCryptThread(tdbb, true);
// sweep thread
if (dbb->dbb_sweep_thread)
{
Thread::waitForCompletion(dbb->dbb_sweep_thread);
dbb->dbb_sweep_thread = 0;
}
}
} // EngineCheckout scope
// restore database lock if needed
if (!other)
sync.lock(SYNC_EXCLUSIVE);
// remove the attachment block from the dbb linked list
sync.lock(SYNC_EXCLUSIVE);
for (Jrd::Attachment** ptr = &dbb->dbb_attachments; *ptr; ptr = &(*ptr)->att_next)
{
if (*ptr == attachment)
@ -7552,6 +7539,7 @@ static THREAD_ENTRY_DECLARE shutdown_thread(THREAD_ENTRY_PARAM arg)
// Extra shutdown operations
Service::shutdownServices();
TRA_shutdown_sweep();
}
catch (const Exception& ex)
{

View File

@ -102,7 +102,7 @@ static void release_temp_tables(thread_db*, jrd_tra*);
static void retain_temp_tables(thread_db*, jrd_tra*, TraNumber);
static void restart_requests(thread_db*, jrd_tra*);
static void start_sweeper(thread_db*);
static THREAD_ENTRY_DECLARE sweep_database(THREAD_ENTRY_PARAM);
//static THREAD_ENTRY_DECLARE sweep_database(THREAD_ENTRY_PARAM);
static void transaction_flush(thread_db* tdbb, USHORT flush_flag, TraNumber tra_number);
static void transaction_options(thread_db*, jrd_tra*, const UCHAR*, USHORT);
static void transaction_start(thread_db* tdbb, jrd_tra* temp);
@ -1789,13 +1789,6 @@ void TRA_sweep(thread_db* tdbb)
try {
// Avoid races with release_attachment()
XThreadEnsureUnlock releaseAttGuard(dbb->dbb_thread_mutex, FB_FUNCTION);
releaseAttGuard.enter();
if (dbb->dbb_flags & DBB_closing)
return;
// Identify ourselves as a sweeper thread. This accomplishes two goals:
// 1) Sweep transaction is started "precommitted" and
// 2) Execution is throttled in JRD_reschedule() by
@ -1824,11 +1817,6 @@ void TRA_sweep(thread_db* tdbb)
attachment->att_flags &= ~ATT_notify_gc;
// Mark our attachment as special one
attachment->att_flags |= ATT_from_thread;
releaseAttGuard.leave();
if (VIO_sweep(tdbb, transaction, &traceSweep))
{
// At this point, we know that no record versions belonging to dead
@ -2669,6 +2657,90 @@ static void retain_context(thread_db* tdbb, jrd_tra* transaction, bool commit, i
}
namespace {
class SweepParameter : public GlobalStorage
{
public:
SweepParameter(Database* d)
: dbb(d)
{ }
void waitForStartup()
{
sem.enter();
}
static void runSweep(SweepParameter* par)
{
FbLocalStatus status;
PathName dbName(par->dbb->dbb_database_name);
// reference is needed to guarantee that provider exists
// between semaphore release and attach database
AutoPlugin<JProvider> prov(JProvider::getInstance());
par->sem.release();
AutoDispose<IXpbBuilder> dpb(UtilInterfacePtr()->getXpbBuilder(&status, IXpbBuilder::DPB, NULL, 0));
status.check();
dpb->insertString(&status, isc_dpb_user_name, "sweeper");
status.check();
UCHAR byte = isc_dpb_records;
dpb->insertBytes(&status, isc_dpb_sweep, &byte, 1);
status.check();
const UCHAR* dpbBytes = dpb->getBuffer(&status);
status.check();
unsigned dpbLen = dpb->getBufferLength(&status);
status.check();
AutoRelease<IAttachment> att(prov->attachDatabase(&status, dbName.c_str(), dpbLen, dpbBytes));
status.check();
}
void exceptionHandler(const Exception& ex, ThreadFinishSync<SweepParameter*>::ThreadRoutine*)
{
FbLocalStatus st;
ex.stuffException(&st);
if (st->getErrors()[1] != isc_att_shutdown)
iscLogException("Automatic sweep error", ex);
}
private:
Semaphore sem;
Database* dbb;
};
typedef ThreadFinishSync<SweepParameter*> SweepSync;
typedef HalfStaticArray<SweepSync*, 16> SweepThreads;
InitInstance<SweepThreads> sweepThreads;
GlobalPtr<Mutex> swThrMutex;
bool sweepDown = false;
}
void TRA_shutdown_sweep()
{
/**************************************
*
* T R A _ s h u t d o w n _ s w e e p
*
**************************************
*
* Functional description
* Wait for sweep threads exit.
*
**************************************/
MutexLockGuard g(swThrMutex, FB_FUNCTION);
if (sweepDown)
return;
sweepDown = true;
SweepThreads& swThr(sweepThreads());
for (unsigned n = 0; n < swThr.getCount(); ++n)
swThr[n]->waitForCompletion();
swThr.clear();
}
static void start_sweeper(thread_db* tdbb)
{
/**************************************
@ -2683,65 +2755,45 @@ static void start_sweeper(thread_db* tdbb)
**************************************/
SET_TDBB(tdbb);
Database* const dbb = tdbb->getDatabase();
bool started = false;
if (!dbb->allowSweepThread(tdbb))
return;
TRA_update_counters(tdbb, dbb);
// pass dbb to sweep thread - if allowSweepThread() returned TRUE that is safe
try
{
Thread::start(sweep_database, dbb, THREAD_medium, &dbb->dbb_sweep_thread);
return;
}
catch (const Firebird::Exception& ex)
{
iscLogException("cannot start sweep thread", ex);
dbb->clearSweepFlags(tdbb);
}
}
MutexLockGuard g(swThrMutex, FB_FUNCTION);
if (sweepDown)
return;
// perform housekeeping
SweepThreads& swThr(sweepThreads());
for (unsigned n = 0; n < swThr.getCount(); )
{
if (swThr[n]->tryWait())
{
delete swThr[n];
swThr.remove(n);
}
else
++n;
}
static THREAD_ENTRY_DECLARE sweep_database(THREAD_ENTRY_PARAM d)
{
/**************************************
*
* s w e e p _ d a t a b a s e
*
**************************************
*
* Functional description
* Sweep database.
*
**************************************/
// determine database name
// taking into an account that thread is started successfully
// we should take care about parameters reference counter and DBB flags
Database* dbb = (Database*) d;
try
{
ISC_STATUS_ARRAY status_vector = {0};
isc_db_handle db_handle = 0;
Firebird::ClumpletWriter dpb(Firebird::ClumpletReader::dpbList, MAX_DPB_SIZE);
dpb.insertByte(isc_dpb_sweep, isc_dpb_records);
// use embedded authentication to attach database
const char* szAuthenticator = "sweeper";
dpb.insertString(isc_dpb_user_name, szAuthenticator, fb_strlen(szAuthenticator));
isc_attach_database(status_vector, 0, dbb->dbb_database_name.c_str(),
&db_handle, dpb.getBufferLength(),
reinterpret_cast<const char*>(dpb.getBuffer()));
if (db_handle)
isc_detach_database(status_vector, &db_handle);
AutoPtr<SweepSync> sweepSync(FB_NEW SweepSync(*getDefaultMemoryPool(), SweepParameter::runSweep));
SweepParameter swPar(dbb);
sweepSync->run(&swPar);
started = true;
swPar.waitForStartup();
sweepThreads().add(sweepSync.release());
}
catch (const Exception&)
{ }
dbb->clearSweepStarting(); // actually needed here only for classic,
// but do danger calling for super
return 0;
{
if (!started)
dbb->clearSweepStarting();
throw;
}
}
@ -3513,6 +3565,9 @@ static void transaction_start(thread_db* tdbb, jrd_tra* trans)
// If the transaction block is getting out of hand, force a sweep
SPTHR_DEBUG(fprintf(stderr, "dbb_sweep_interval=%d tra_oldest_active=%d oldest=%d oldest_state=%d\n",
dbb->dbb_sweep_interval, trans->tra_oldest_active, oldest, oldest_state));
if (dbb->dbb_sweep_interval &&
(trans->tra_oldest_active > oldest) &&
(trans->tra_oldest_active - oldest > dbb->dbb_sweep_interval) &&

View File

@ -64,5 +64,6 @@ void TRA_update_counters(Jrd::thread_db*, Jrd::Database*);
int TRA_wait(Jrd::thread_db* tdbb, Jrd::jrd_tra* trans, TraNumber number, Jrd::jrd_tra::wait_t wait);
void TRA_attach_request(Jrd::jrd_tra* transaction, Jrd::jrd_req* request);
void TRA_detach_request(Jrd::jrd_req* request);
void TRA_shutdown_sweep();
#endif // JRD_TRA_PROTO_H

View File

@ -3705,13 +3705,6 @@ bool VIO_sweep(thread_db* tdbb, jrd_tra* transaction, TraceSweepEvent* traceSwee
for (FB_SIZE_T i = 1; (vector = attachment->att_relations) && i < vector->count(); i++)
{
if (dbb->dbb_flags & DBB_closing)
{
SPTHR_DEBUG(fprintf(stderr, "VIO_sweep exits\n"));
ret = false;
break;
}
relation = (*vector)[i];
if (relation)
relation = MET_lookup_relation_id(tdbb, i, false);
@ -3742,12 +3735,6 @@ bool VIO_sweep(thread_db* tdbb, jrd_tra* transaction, TraceSweepEvent* traceSwee
{
CCH_RELEASE(tdbb, &rpb.getWindow(tdbb));
if (dbb->dbb_flags & DBB_closing)
{
SPTHR_DEBUG(fprintf(stderr, "VIO_sweep exits after VIO_next_record\n"));
break;
}
if (relation->rel_flags & REL_deleting)
break;