mirror of
https://github.com/FirebirdSQL/firebird.git
synced 2025-02-02 09:20:39 +01:00
Better shutdown approach for the replication server
Some checks reported errors
continuous-integration/drone/push Build encountered an error
Some checks reported errors
continuous-integration/drone/push Build encountered an error
This commit is contained in:
parent
b7c3ceeafc
commit
d7fedd3ea2
@ -75,8 +75,26 @@ namespace
|
||||
const USHORT CTL_VERSION1 = 1;
|
||||
const USHORT CTL_CURRENT_VERSION = CTL_VERSION1;
|
||||
|
||||
volatile bool* shutdownPtr = NULL;
|
||||
volatile bool shutdownFlag = false;
|
||||
AtomicCounter activeThreads;
|
||||
Semaphore shutdownSemaphore;
|
||||
|
||||
int shutdownHandler(const int, const int, void*)
|
||||
{
|
||||
if (activeThreads.value())
|
||||
{
|
||||
gds__log("Shutting down the replication server with %d replicated database(s)",
|
||||
(int) activeThreads.value());
|
||||
|
||||
shutdownFlag = true;
|
||||
shutdownSemaphore.release(activeThreads.value() + 1);
|
||||
|
||||
while (activeThreads.value())
|
||||
Thread::sleep(10);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct ActiveTransaction
|
||||
{
|
||||
@ -626,7 +644,7 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
enum ProcessStatus { PROCESS_SUSPEND, PROCESS_CONTINUE, PROCESS_ERROR };
|
||||
enum ProcessStatus { PROCESS_SUSPEND, PROCESS_CONTINUE, PROCESS_ERROR, PROCESS_SHUTDOWN };
|
||||
|
||||
ProcessStatus process_archive(MemoryPool& pool, Target* target)
|
||||
{
|
||||
@ -645,6 +663,9 @@ namespace
|
||||
for (iter = PathUtils::newDirIterator(pool, config->sourceDirectory);
|
||||
*iter; ++(*iter))
|
||||
{
|
||||
if (shutdownFlag)
|
||||
return PROCESS_SHUTDOWN;
|
||||
|
||||
const auto filename = **iter;
|
||||
|
||||
#ifdef PRESERVE_LOG
|
||||
@ -755,6 +776,9 @@ namespace
|
||||
|
||||
for (Segment** iter = queue.begin(); iter != queue.end(); ++iter)
|
||||
{
|
||||
if (shutdownFlag)
|
||||
return PROCESS_SHUTDOWN;
|
||||
|
||||
Segment* const segment = *iter;
|
||||
const FB_UINT64 sequence = segment->header.hdr_sequence;
|
||||
const Guid& guid = segment->header.hdr_guid;
|
||||
@ -845,6 +869,9 @@ namespace
|
||||
ULONG totalLength = sizeof(SegmentHeader);
|
||||
while (totalLength < segment->header.hdr_length)
|
||||
{
|
||||
if (shutdownFlag)
|
||||
return PROCESS_SHUTDOWN;
|
||||
|
||||
Block header;
|
||||
if (read(file, &header, sizeof(Block)) != sizeof(Block))
|
||||
raiseError("Journal file %s read failed (error %d)", segment->filename.c_str(), ERRNO);
|
||||
@ -959,14 +986,12 @@ namespace
|
||||
|
||||
THREAD_ENTRY_DECLARE process_thread(THREAD_ENTRY_PARAM arg)
|
||||
{
|
||||
fb_assert(shutdownPtr);
|
||||
|
||||
AutoPtr<Target> target(static_cast<Target*>(arg));
|
||||
const auto config = target->getConfig();
|
||||
|
||||
target->verbose("Started replication thread");
|
||||
|
||||
while (!*shutdownPtr)
|
||||
while (!shutdownFlag)
|
||||
{
|
||||
AutoMemoryPool workingPool(MemoryPool::createPool());
|
||||
ContextPoolHolder threadContext(workingPool);
|
||||
@ -978,42 +1003,47 @@ namespace
|
||||
|
||||
target->shutdown();
|
||||
|
||||
if (!*shutdownPtr)
|
||||
if (ret == PROCESS_SHUTDOWN)
|
||||
break;
|
||||
|
||||
if (!shutdownFlag)
|
||||
{
|
||||
const ULONG timeout =
|
||||
(ret == PROCESS_SUSPEND) ? config->applyIdleTimeout : config->applyErrorTimeout;
|
||||
|
||||
Thread::sleep(timeout * 1000);
|
||||
shutdownSemaphore.tryEnter(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
target->verbose("Finished replication thread");
|
||||
|
||||
--activeThreads;
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
bool REPL_server(CheckStatusWrapper* status, bool wait, bool* aShutdownPtr)
|
||||
|
||||
bool REPL_server(CheckStatusWrapper* status, bool wait)
|
||||
{
|
||||
try
|
||||
{
|
||||
shutdownPtr = aShutdownPtr;
|
||||
fb_shutdown_callback(0, shutdownHandler, fb_shut_finish, 0);
|
||||
|
||||
TargetList targets;
|
||||
readConfig(targets);
|
||||
|
||||
for (auto target : targets)
|
||||
{
|
||||
Thread::start(process_thread, target, THREAD_medium, NULL);
|
||||
++activeThreads;
|
||||
Thread::start((ThreadEntryPoint*) process_thread, target, THREAD_medium, NULL);
|
||||
}
|
||||
|
||||
if (wait)
|
||||
{
|
||||
shutdownSemaphore.enter();
|
||||
|
||||
do {
|
||||
Thread::sleep(100);
|
||||
Thread::sleep(10);
|
||||
} while (activeThreads.value());
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,6 @@
|
||||
#ifndef UTIL_REPL_SERVER_H
|
||||
#define UTIL_REPL_SERVER_H
|
||||
|
||||
bool REPL_server(Firebird::CheckStatusWrapper*, bool, bool*);
|
||||
bool REPL_server(Firebird::CheckStatusWrapper*, bool);
|
||||
|
||||
#endif // UTIL_REPL_SERVER_H
|
||||
|
@ -119,8 +119,6 @@ static void signal_handler(int);
|
||||
static TEXT protocol[128];
|
||||
static int INET_SERVER_start = 0;
|
||||
|
||||
static bool serverClosing = false;
|
||||
|
||||
#if defined(HAVE_SETRLIMIT) && defined(HAVE_GETRLIMIT)
|
||||
#define FB_RAISE_LIMITS 1
|
||||
static void raiseLimit(int resource);
|
||||
@ -483,7 +481,7 @@ int CLIB_ROUTINE main( int argc, char** argv)
|
||||
fb_shutdown_callback(NULL, closePort, fb_shut_exit, port);
|
||||
|
||||
Firebird::FbLocalStatus localStatus;
|
||||
if (!REPL_server(&localStatus, false, &serverClosing))
|
||||
if (!REPL_server(&localStatus, false))
|
||||
{
|
||||
const char* errorMsg = "Replication server initialization error";
|
||||
iscLogStatus(errorMsg, localStatus->getErrors());
|
||||
|
@ -527,7 +527,7 @@ static THREAD_ENTRY_DECLARE start_connections_thread(THREAD_ENTRY_PARAM)
|
||||
}
|
||||
|
||||
FbLocalStatus localStatus;
|
||||
if (!REPL_server(&localStatus, false, &server_shutdown))
|
||||
if (!REPL_server(&localStatus, false))
|
||||
{
|
||||
const char* errorMsg = "Replication server initialization error";
|
||||
iscLogStatus(errorMsg, localStatus->getErrors());
|
||||
|
Loading…
Reference in New Issue
Block a user