8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 21:23:04 +01:00

Frontported fix for #7673: Make async replication reliable on Linux CS (replica side)
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Dmitry Yemanov 2023-07-13 10:01:52 +03:00
parent 631b039c1e
commit 9aaeab2d4b
6 changed files with 132 additions and 63 deletions

View File

@ -333,7 +333,7 @@ Config* Config::get(const PathName& lookupName)
// This routine is used to retrieve the list of replica databases.
// Therefore it checks only the necessary settings.
void Config::enumerate(Firebird::Array<Config*>& replicas)
void Config::enumerate(ReplicaList& replicas)
{
PathName dbName;

View File

@ -33,11 +33,13 @@ namespace Replication
{
struct Config : public Firebird::GlobalStorage
{
typedef Firebird::HalfStaticArray<Config*, 4> ReplicaList;
Config();
Config(const Config& other);
static Config* get(const Firebird::PathName& dbName);
static void enumerate(Firebird::Array<Config*>& replicas);
static void enumerate(ReplicaList& replicas);
Firebird::PathName dbName;
ULONG bufferSize;

View File

@ -81,11 +81,8 @@ namespace
int shutdownHandler(const int, const int, void*)
{
if (activeThreads.value())
if (!shutdownFlag && activeThreads.value())
{
gds__log("Shutting down the replication server with %d replicated database(s)",
(int) activeThreads.value());
shutdownFlag = true;
shutdownSemaphore.release(activeThreads.value() + 1);
@ -579,15 +576,6 @@ namespace
return value;
}
void readConfig(TargetList& targets)
{
Array<Replication::Config*> replicas;
Replication::Config::enumerate(replicas);
for (auto replica : replicas)
targets.add(FB_NEW Target(replica));
}
bool validateHeader(const SegmentHeader* header)
{
if (strcmp(header->hdr_signature, CHANGELOG_SIGNATURE))
@ -988,14 +976,15 @@ namespace
{
AutoPtr<Target> target(static_cast<Target*>(arg));
const auto config = target->getConfig();
const auto dbName = config->dbName.c_str();
target->verbose("Started replication thread");
AutoMemoryPool workingPool(MemoryPool::createPool());
ContextPoolHolder threadContext(workingPool);
target->verbose("Started replication for database %s", dbName);
while (!shutdownFlag)
{
AutoMemoryPool workingPool(MemoryPool::createPool());
ContextPoolHolder threadContext(workingPool);
const ProcessStatus ret = process_archive(*workingPool, target);
if (ret == PROCESS_CONTINUE)
@ -1003,10 +992,7 @@ namespace
target->shutdown();
if (ret == PROCESS_SHUTDOWN)
break;
if (!shutdownFlag)
if (ret != PROCESS_SHUTDOWN)
{
const ULONG timeout =
(ret == PROCESS_SUSPEND) ? config->applyIdleTimeout : config->applyErrorTimeout;
@ -1015,7 +1001,7 @@ namespace
}
}
target->verbose("Finished replication thread");
target->verbose("Finished replication for database %s", dbName);
--activeThreads;
return 0;
@ -1023,17 +1009,15 @@ namespace
}
bool REPL_server(CheckStatusWrapper* status, bool wait)
bool REPL_server(CheckStatusWrapper* status, const ReplicaList& replicas, bool wait)
{
try
{
fb_shutdown_callback(0, shutdownHandler, fb_shut_finish, 0);
fb_shutdown_callback(0, shutdownHandler, fb_shut_preproviders, 0);
TargetList targets;
readConfig(targets);
for (auto target : targets)
for (const auto replica : replicas)
{
const auto target = FB_NEW Target(replica);
Thread::start(process_thread, target, THREAD_medium, NULL);
++activeThreads;
}

View File

@ -23,6 +23,6 @@
#ifndef UTIL_REPL_SERVER_H
#define UTIL_REPL_SERVER_H
bool REPL_server(Firebird::CheckStatusWrapper*, bool);
bool REPL_server(Firebird::CheckStatusWrapper*, const ReplicaList&, bool);
#endif // UTIL_REPL_SERVER_H

View File

@ -80,6 +80,7 @@
#include "../remote/remote.h"
#include "../jrd/license.h"
#include "../jrd/replication/Config.h"
#include "../common/file_params.h"
#include "../remote/inet_proto.h"
#include "../remote/server/serve_proto.h"
@ -87,6 +88,7 @@
#include "../yvalve/gds_proto.h"
#include "../common/utils_proto.h"
#include "../common/classes/fb_string.h"
#include "../common/classes/semaphore.h"
#include "firebird/Interface.h"
#include "../common/classes/ImplementHelper.h"
@ -109,8 +111,6 @@
#endif
#include "../common/classes/semaphore.h"
const char* TEMP_DIR = "/tmp";
static void set_signal(int, void (*)(int));
@ -124,6 +124,9 @@ static int INET_SERVER_start = 0;
static void raiseLimit(int resource);
#endif
using namespace Firebird;
static void logSecurityDatabaseError(const char* path, ISC_STATUS* status)
{
// If I/O error happened then rather likely we just miss standard security DB
@ -131,7 +134,7 @@ static void logSecurityDatabaseError(const char* path, ISC_STATUS* status)
if (fb_utils::containsErrorCode(status, isc_io_error))
return;
Firebird::Syslog::Record(Firebird::Syslog::Error, "Security database error");
Syslog::Record(Syslog::Error, "Security database error");
gds__log_status(path, status);
if (isatty(2))
{
@ -185,6 +188,8 @@ int CLIB_ROUTINE main( int argc, char** argv)
bool standaloneClassic = false;
bool super = false;
int replPid = 0;
// It's very easy to detect that we are spawned - just check fd 0 to be a socket.
const int channel = 0;
struct STAT stat0;
@ -279,7 +284,7 @@ int CLIB_ROUTINE main( int argc, char** argv)
}
}
if (Firebird::Config::getServerMode() == Firebird::MODE_CLASSIC)
if (Config::getServerMode() == MODE_CLASSIC)
{
if (!classic)
standaloneClassic = true;
@ -289,14 +294,15 @@ int CLIB_ROUTINE main( int argc, char** argv)
if (classic)
{
gds__log("Server misconfigured - to start it from (x)inetd add ServerMode=Classic to firebird.conf");
Firebird::Syslog::Record(Firebird::Syslog::Error, "Server misconfigured - add ServerMode=Classic to firebird.conf");
Syslog::Record(Syslog::Error, "Server misconfigured - add ServerMode=Classic to firebird.conf");
exit(STARTUP_ERROR);
}
INET_SERVER_flag |= SRVR_multi_client;
super = true;
}
{ // scope
Firebird::MasterInterfacePtr master;
MasterInterfacePtr master;
master->serverMode(super ? 1 : 0);
}
@ -327,7 +333,7 @@ int CLIB_ROUTINE main( int argc, char** argv)
#endif
#if !(defined(DEV_BUILD))
if (Firebird::Config::getBugcheckAbort())
if (Config::getBugcheckAbort())
#endif
{
// try to force core files creation
@ -371,15 +377,15 @@ int CLIB_ROUTINE main( int argc, char** argv)
}
// check firebird.conf presence - must be for server
if (Firebird::Config::missFirebirdConf())
if (Config::missFirebirdConf())
{
Firebird::Syslog::Record(Firebird::Syslog::Error, "Missing master config file firebird.conf");
Syslog::Record(Syslog::Error, "Missing master config file firebird.conf");
exit(STARTUP_ERROR);
}
if (!debug)
{
const char* redirection_file = Firebird::Config::getOutputRedirectionFile();
const char* redirection_file = Config::getOutputRedirectionFile();
int stdout_no = fileno(stdout);
int stderr_no = fileno(stderr);
@ -417,20 +423,88 @@ int CLIB_ROUTINE main( int argc, char** argv)
}
}
Replication::Config::ReplicaList replicas;
Replication::Config::enumerate(replicas);
if (super || standaloneClassic)
{
if (standaloneClassic && replicas.hasData())
{
// Start the replication server now (in the forked process),
// because INET_connect() never returns for the standalone Classic
if ((replPid = fork()) <= 0)
{
try
{
if (replPid) // failed fork attempt
system_error::raise("fork", replPid);
// We've been forked successfully
FbLocalStatus localStatus;
if (!REPL_server(&localStatus, replicas, true))
localStatus.check();
}
catch (const Exception& ex)
{
const char* const errorMsg = "Replication server startup error";
iscLogException(errorMsg, ex);
Syslog::Record(Syslog::Error, errorMsg);
}
if (!replPid)
{
fb_shutdown(10000, fb_shutrsn_exit_called);
return FINI_OK;
}
}
}
// Start the network listener
try
{
port = INET_connect(protocol, 0, INET_SERVER_flag, 0, NULL);
}
catch (const Firebird::Exception& ex)
catch (const Exception& ex)
{
iscLogException("startup:INET_connect:", ex);
Firebird::StaticStatusVector st;
ex.stuffException(st);
gds__print_status(st.begin());
iscLogException("INET server startup error", ex);
exit(STARTUP_ERROR);
}
// If INET_connect() returns NULL for the standalone classic, then game is over.
// Signal the forked replication server process to terminate and then exit.
if (!port && replPid > 0) // this implies standaloneClassic being true
{
if (!kill(replPid, SIGTERM))
{
int status = 0;
// Wait up to one second for the replicator process to finish gracefully
for (unsigned n = 0; n < 10; n++)
{
Thread::sleep(100); // milliseconds
const auto res = waitpid(replPid, &status, WNOHANG);
if (res == replPid) // process is terminated
break;
if (res < 0 && !SYSCALL_INTERRUPTED(errno)) // error
break;
// continue waiting otherwise
}
// Force terminating the replicator process if it's still alive
if (!WIFEXITED(status))
kill(replPid, SIGKILL);
}
fb_shutdown(10000, fb_shutrsn_exit_called);
return FINI_OK;
}
}
if (classic)
@ -439,13 +513,13 @@ int CLIB_ROUTINE main( int argc, char** argv)
if (!port)
{
gds__log("Unable to start INET_server");
Firebird::Syslog::Record(Firebird::Syslog::Error, "Unable to start INET_server");
Syslog::Record(Syslog::Error, "Unable to start INET_server");
exit(STARTUP_ERROR);
}
}
{ // scope for interface ptr
Firebird::PluginManagerInterfacePtr pi;
PluginManagerInterfacePtr pi;
Auth::registerSrpServer(pi);
}
@ -459,7 +533,7 @@ int CLIB_ROUTINE main( int argc, char** argv)
ISC_STATUS_ARRAY status;
isc_db_handle db_handle = 0L;
const Firebird::RefPtr<const Firebird::Config> defConf(Firebird::Config::getDefaultConfig());
const RefPtr<const Config> defConf(Config::getDefaultConfig());
const char* path = defConf->getSecurityDatabase();
const char dpb[] = {isc_dpb_version1, isc_dpb_sec_attach, 1, 1, isc_dpb_address_path, 0};
@ -478,16 +552,18 @@ int CLIB_ROUTINE main( int argc, char** argv)
}
} // end scope
fb_shutdown_callback(NULL, closePort, fb_shut_exit, port);
// Start replication server
Firebird::FbLocalStatus localStatus;
if (!REPL_server(&localStatus, false))
FbLocalStatus localStatus;
if (!REPL_server(&localStatus, replicas, false))
{
const char* errorMsg = "Replication server initialization error";
const char* const errorMsg = "Replication server startup error";
iscLogStatus(errorMsg, localStatus->getErrors());
Firebird::Syslog::Record(Firebird::Syslog::Error, errorMsg);
Syslog::Record(Syslog::Error, errorMsg);
}
fb_shutdown_callback(NULL, closePort, fb_shut_exit, port);
SRVR_multi_thread(port, INET_SERVER_flag);
// perform atexit shutdown here when all globals in embedded library are active
@ -496,9 +572,9 @@ int CLIB_ROUTINE main( int argc, char** argv)
return FINI_OK;
}
catch (const Firebird::Exception& ex)
catch (const Exception& ex)
{
Firebird::StaticStatusVector st;
StaticStatusVector st;
ex.stuffException(st);
char s[100];
@ -506,8 +582,8 @@ int CLIB_ROUTINE main( int argc, char** argv)
fb_interpret(s, sizeof(s), &status);
iscLogException("Firebird startup error:", ex);
Firebird::Syslog::Record(Firebird::Syslog::Error, "Firebird startup error");
Firebird::Syslog::Record(Firebird::Syslog::Error, s);
Syslog::Record(Syslog::Error, "Firebird startup error");
Syslog::Record(Syslog::Error, s);
exit(STARTUP_ERROR);
}

View File

@ -96,6 +96,7 @@
#include "../common/classes/semaphore.h"
#include "../common/classes/FpeControl.h"
#include "../jrd/license.h"
#include "../jrd/replication/Config.h"
#include "../utilities/install/install_nt.h"
#include "../remote/remote.h"
#include "../remote/server/os/win32/cntl_proto.h"
@ -530,12 +531,18 @@ static THREAD_ENTRY_DECLARE start_connections_thread(THREAD_ENTRY_PARAM)
}
}
FbLocalStatus localStatus;
if (!REPL_server(&localStatus, false))
Replication::Config::ReplicaList replicas;
Replication::Config::enumerate(replicas);
if (replicas.hasData())
{
const char* errorMsg = "Replication server initialization error";
iscLogStatus(errorMsg, localStatus->getErrors());
Syslog::Record(Syslog::Error, errorMsg);
FbLocalStatus localStatus;
if (!REPL_server(&localStatus, replicas, false))
{
const char* errorMsg = "Replication server initialization error";
iscLogStatus(errorMsg, localStatus->getErrors());
Syslog::Record(Syslog::Error, errorMsg);
}
}
return 0;