8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 20:03:03 +01:00

Avoid long blocking of att_mutex and make sure it is always released when attachment deleted.

Move active requests cancellation at engine shutdown from Y-valve into engine and let them stop execution before deletion of attachments.
Make sweep run in separate thread despite of engine architecture.
This commit is contained in:
hvlad 2008-05-04 12:49:29 +00:00
parent 6ae7d3fde7
commit 9280a572a0
8 changed files with 79 additions and 64 deletions

View File

@ -152,6 +152,7 @@ namespace
#ifdef WIN_NT
ModuleLoader::Module* ibUtilModule = NULL;
#endif
bool engineShuttingDown = false;
class EngineStartup
{
@ -242,11 +243,14 @@ namespace
Jrd::ContextPoolHolder(arg, arg->getDatabase()->dbb_permanent),
tdbb(arg)
{
if (lockAtt && tdbb->getAttachment())
Attachment *attachment = tdbb->getAttachment();
if (lockAtt && attachment)
{
attLocked = tdbb->getAttachment()->att_mutex.tryEnter();
if (!attLocked)
Firebird::MutexLockGuard attGuard(attachment->att_mutex);
if ((attachment->att_flags & ATT_busy) || engineShuttingDown)
Firebird::status_exception::raise(isc_att_handle_busy, 0);
attachment->att_flags |= ATT_busy;
attLocked = true;
}
else
attLocked = false;
@ -264,9 +268,11 @@ namespace
}
Attachment* attachment = tdbb->getAttachment();
if (attLocked && attachment)
attachment->att_mutex.leave();
{
Firebird::MutexLockGuard attGuard(attachment->att_mutex);
attachment->att_flags &= ~ATT_busy;
}
}
private:
@ -451,6 +457,7 @@ public:
void get(const UCHAR*, USHORT, bool&);
};
static void cancel_attachments();
static void check_database(thread_db* tdbb);
static void check_transaction(thread_db*, jrd_tra*);
static void commit(thread_db*, jrd_tra*, const bool);
@ -488,6 +495,39 @@ static bool shutdown_dbb(thread_db*, Database*);
static THREAD_ENTRY_DECLARE shutdown_thread(THREAD_ENTRY_PARAM);
static void cancel_attachments()
{
engineShuttingDown = true;
Firebird::MutexLockGuard guard(databases_mutex);
for (Database *dbb = databases; dbb; dbb = dbb->dbb_next)
if ( !(dbb->dbb_flags & (DBB_bugcheck | DBB_not_in_use)) )
{
ISC_STATUS_ARRAY status;
for (Attachment *att = dbb->dbb_attachments; att; att = att->att_next)
{
while (true)
{
{
Firebird::MutexLockGuard attGuard(att->att_mutex);
if (!(att->att_flags & ATT_busy))
{
att->att_flags |= ATT_busy;
break;
}
}
if (!(att->att_flags & fb_cancel_disable))
{
jrd8_cancel_operation(status, &att, fb_cancel_enable);
jrd8_cancel_operation(status, &att, fb_cancel_raise);
}
THREAD_YIELD();
}
}
}
}
//____________________________________________________________
//
// check whether we need to perform an autocommit;
@ -753,6 +793,7 @@ ISC_STATUS GDS_ATTACH_DATABASE(ISC_STATUS* user_status,
attachment->att_remote_address = options.dpb_remote_address;
attachment->att_remote_pid = options.dpb_remote_pid;
attachment->att_remote_process = options.dpb_remote_process;
attachment->att_flags |= ATT_busy;
attachment->att_next = dbb->dbb_attachments;
dbb->dbb_attachments = attachment;
@ -1185,10 +1226,11 @@ ISC_STATUS GDS_ATTACH_DATABASE(ISC_STATUS* user_status,
// if there was an error, the status vector is all set
databases_mutex->leave();
if (options.dpb_sweep & isc_dpb_records)
{
if (!(TRA_sweep(tdbb, 0)))
{
if (!(TRA_sweep(tdbb, 0))) {
ERR_punt();
}
}
@ -1230,9 +1272,10 @@ ISC_STATUS GDS_ATTACH_DATABASE(ISC_STATUS* user_status,
}
}
databases_mutex->leave();
// databases_mutex->leave();
*handle = attachment;
attachment->att_flags &= ~ATT_busy;
} // try
catch (const DelayFailedLogin& ex)
@ -3101,6 +3144,8 @@ int GDS_SHUTDOWN(unsigned int timeout)
attach_count, database_count);
}
cancel_attachments();
if (timeout)
{
Firebird::Semaphore shutdown_semaphore;
@ -4851,7 +4896,6 @@ static void release_attachment(thread_db* tdbb, Attachment* attachment)
}
}
attachment->att_mutex.leave();
tdbb->setAttachment(NULL);
}
@ -5767,7 +5811,6 @@ static ISC_STATUS unwindAttach(const Firebird::Exception& ex,
if (attachment)
{
attachment->att_mutex.enter(); // will be unlocked in release_attachment
release_attachment(tdbb, attachment);
}

View File

@ -328,6 +328,7 @@ const ULONG ATT_cancel_disable = 16384; // Disable cancel operations
const ULONG ATT_gfix_attachment = 32768; // Indicate a GFIX attachment
const ULONG ATT_gstat_attachment = 65536; // Indicate a GSTAT attachment
const ULONG ATT_no_db_triggers = 131072; // Don't execute database triggers
const ULONG ATT_busy = 262144; // Attachment is busy handling user request
inline bool Attachment::locksmith() const

View File

@ -105,10 +105,8 @@ static SLONG inventory_page(thread_db*, SLONG);
static SSHORT limbo_transaction(thread_db*, SLONG);
static void link_transaction(thread_db*, jrd_tra*);
static void restart_requests(thread_db*, jrd_tra*);
#ifdef SWEEP_THREAD
static void start_sweeper(thread_db*, Database*);
static THREAD_ENTRY_DECLARE sweep_database(THREAD_ENTRY_PARAM);
#endif
static void transaction_options(thread_db*, jrd_tra*, const UCHAR*, USHORT);
static jrd_tra* transaction_start(thread_db* tdbb, jrd_tra* temp);
@ -1642,7 +1640,7 @@ bool TRA_sweep(thread_db* tdbb, jrd_tra* trans)
/* No point trying to sweep a ReadOnly database */
if (dbb->dbb_flags & DBB_read_only)
return false;
return true;
if (dbb->dbb_flags & DBB_sweep_in_progress)
return true;
@ -1658,7 +1656,7 @@ bool TRA_sweep(thread_db* tdbb, jrd_tra* trans)
temp_lock.lck_parent = dbb->dbb_lock;
temp_lock.lck_length = sizeof(SLONG);
if (!LCK_lock(tdbb, &temp_lock, LCK_EX, (trans) ? LCK_NO_WAIT : LCK_WAIT))
if (!LCK_lock(tdbb, &temp_lock, LCK_EX, LCK_NO_WAIT))
{
// clear lock error from status vector
tdbb->tdbb_status_vector[0] = isc_arg_gds;
@ -2604,7 +2602,6 @@ static void retain_context(thread_db* tdbb, jrd_tra* transaction,
}
#ifdef SWEEP_THREAD
static void start_sweeper(thread_db* tdbb, Database* dbb)
{
/**************************************
@ -2636,6 +2633,11 @@ static void start_sweeper(thread_db* tdbb, Database* dbb)
if (!LCK_lock(tdbb, &temp_lock, LCK_EX, LCK_NO_WAIT))
{
// clear lock error from status vector
tdbb->tdbb_status_vector[0] = isc_arg_gds;
tdbb->tdbb_status_vector[1] = 0;
tdbb->tdbb_status_vector[2] = isc_arg_end;
return; // false;
}
@ -2700,7 +2702,6 @@ static THREAD_ENTRY_DECLARE sweep_database(THREAD_ENTRY_PARAM database)
gds__free(database);
return 0;
}
#endif
static void transaction_options(thread_db* tdbb,
@ -3410,13 +3411,8 @@ static jrd_tra* transaction_start(thread_db* tdbb, jrd_tra* temp)
(trans->tra_oldest_active - trans->tra_oldest >
dbb->dbb_sweep_interval) && oldest_state != tra_limbo)
{
#ifdef SWEEP_THREAD
// Why nobody checks the result? Changed the function to return nothing.
start_sweeper(tdbb, dbb);
#else
// force a sweep
TRA_sweep(tdbb, trans);
#endif
}
/* Check in with external file system */

View File

@ -2922,10 +2922,10 @@ bool VIO_sweep(thread_db* tdbb, jrd_tra* transaction)
if (relation->rel_flags & REL_deleting) {
break;
}
#ifdef SUPERSERVER
if (--tdbb->tdbb_quantum < 0) {
JRD_reschedule(tdbb, SWEEP_QUANTUM, true);
}
#ifdef SUPERSERVER
transaction->tra_oldest_active = dbb->dbb_oldest_snapshot;
#endif
}

View File

@ -6043,37 +6043,6 @@ int API_ROUTINE fb_shutdown(unsigned int timeout)
// shutdown yValve
shutdownStarted = true; // since this moment no new thread will be able to enter yValve
const FB_UINT64 timeLimit = getMilliTime() + timeout;
for (;;)
{
{ // scope - cancel running requests
Firebird::MutexLockGuard guard(attachmentsMutex);
for (unsigned int i = 0; i < attachments().getCount(); ++i)
{
Attachment* att = attachments()[i];
CALL(PROC_CANCEL_OPERATION, att->implementation) (status, &att->handle, fb_cancel_enable);
CALL(PROC_CANCEL_OPERATION, att->implementation) (status, &att->handle, fb_cancel_raise);
}
}
if (isc_enter_count.value() < 2)
{
break;
}
if (timeout)
{
if (getMilliTime() > timeLimit)
{
Firebird::status_exception::raise(isc_shutdown_timeout, isc_arg_end);
}
}
THD_sleep(1);
}
// Shutdown providers
for (int n = 0; n < SUBSYSTEMS; ++n)
{

View File

@ -319,7 +319,7 @@ static void unhook_port(rem_port*, rem_port*);
static int xdrinet_create(XDR *, rem_port*, UCHAR *, USHORT, enum xdr_op);
static bool setNoNagleOption(rem_port*);
static FPTR_VOID tryStopMainThread = 0;
static int shut_preproviders();
static int shut_postproviders();
@ -880,8 +880,11 @@ rem_port* INET_connect(const TEXT* name,
(struct sockaddr *) &address, &l);
const int inetErrNo = INET_ERRNO;
if (s == INVALID_SOCKET) {
inet_error(port, "accept", isc_net_connect_err, inetErrNo);
disconnect(port);
if (!INET_shutting_down)
{
inet_error(port, "accept", isc_net_connect_err, inetErrNo);
disconnect(port);
}
return NULL;
}
#ifdef WIN_NT
@ -1215,7 +1218,7 @@ static rem_port* alloc_port( rem_port* parent)
gds__log(" Info: Remote Buffer Size set to %ld", INET_remote_buffer);
#endif
fb_shutdown_callback(0, shut_preproviders, fb_shut_preproviders);
fb_shutdown_callback(0, shut_postproviders, fb_shut_postproviders);
INET_initialized = true;
@ -3451,7 +3454,7 @@ void setStopMainThread(FPTR_VOID func)
tryStopMainThread = func;
}
static int shut_preproviders()
static int shut_postproviders()
{
INET_shutting_down = true;
return 0;

View File

@ -224,7 +224,6 @@ int WINAPI WinMain(HINSTANCE hThisInst,
if (port)
{
SRVR_multi_thread(port, server_flag);
SRVR_shutdown();
port = NULL;
}
}
@ -236,6 +235,8 @@ int WINAPI WinMain(HINSTANCE hThisInst,
if (port) {
service_connection(port);
}
fb_shutdown(0);
}
else if (!(server_flag & SRVR_non_service))
{
@ -321,7 +322,9 @@ static THREAD_ENTRY_DECLARE inet_connect_wait_thread(THREAD_ENTRY_PARAM)
rem_port* port = INET_connect(protocol_inet, NULL, status_vector, server_flag, 0);
if (!port) {
gds__log_status(0, status_vector);
if (status_vector[1]) {
gds__log_status(0, status_vector);
}
break;
}
if (server_flag & SRVR_multi_client) {
@ -424,8 +427,6 @@ static THREAD_ENTRY_DECLARE start_connections_thread(THREAD_ENTRY_PARAM)
*
**************************************/
fb_shutdown_callback(0, SRVR_shutdown, fb_shut_postproviders);
if (server_flag & SRVR_inet) {
gds__thread_start(inet_connect_wait_thread, 0, THREAD_medium, 0, 0);
}

View File

@ -572,7 +572,7 @@ void SRVR_multi_thread( rem_port* main_port, USHORT flags)
port->port_requests_queued);
fflush(stdout);
#endif
if (!Worker::wakeUp()) {
if (!shutting_down && !Worker::wakeUp()) {
gds__thread_start(loopThread, (void*)(IPTR) flags,
THREAD_medium, THREAD_ast, 0);
}
@ -4749,8 +4749,10 @@ void set_server( rem_port* port, USHORT flags)
}
}
if (!server) {
if (!server)
{
servers = server = new srvr(servers, port, flags);
fb_shutdown_callback(0, SRVR_shutdown, fb_shut_postproviders);
}
port->port_server = server;