8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-02-02 09:20:39 +01:00

Fixed some errors, changed the logic slightly

This commit is contained in:
Dmitry Yemanov 2020-09-22 15:41:08 +03:00
parent f910c4bc1f
commit a2e08294bc
9 changed files with 158 additions and 146 deletions

View File

@ -4,10 +4,10 @@
database
{
### ORIGIN SIDE SETTINGS
### PRIMARY SIDE SETTINGS
# Plugin used to perform replication.
# Leave it empty to use internal replication.
# Leave it empty to use built-in replication.
#
# plugin =
@ -23,15 +23,11 @@ database
# Boolean parameters describing how replication errors must be handled.
#
# log_on_error = true # Error message shall be written into firebird.log
# disable_on_error = true # Replication shall be disabled till the end of transaction.
# throw_on_error = false # Error shall be returned to user.
# log_errors = true # All errors (and warnings) are written to replication.log
# report_errors = false # Errors are returned to the client application
# disable_on_error = true # Replication is disabled after error
### Parameters specific for internal replication
# Directory to store replication log files.
#
# log_directory =
### Parameters specific for built-in replication
# Size of the local buffer used to accumulate changes that can be
# deferred until the transaction commit/rollback. The bigger this value
@ -44,11 +40,15 @@ database
#
# buffer_size = 1048576 # 1MB
# Directory to store replication log files.
#
# log_directory =
# Prefix for replication log file names. It will be automatically suffixed
# with an ordinal sequential number. If not specified, database filename
# (without path) is used as a prefix.
#
# log_file_prefix
# log_file_prefix =
# Maximum allowed size for a single replication segment.
#
@ -159,24 +159,24 @@ database
#
# Database-specific settings belong here, e.g.
#
# (for the origin side)
# (for the primary side)
#
# database = /your/db.fdb
# {
# (for synchronous replication)
#
# sync_replica = sysdba:masterkey@otherhost:/your/replica.fdb
#
# (for asynchronous replication)
#
# log_directory = /your/db/chlog
# log_archive_directory = /your/db/archlog
# log_archive_timeout = 10
# (for synchronous replication)
#
# sync_replica = sysdba:masterkey@otherhost:/your/replica.fdb
#
# (for asynchronous replication)
#
# log_directory = /your/db/chlog
# log_archive_directory = /your/db/archlog
# log_archive_timeout = 10
# }
#
# (for the replica side)
#
# database = /your/db.fdb
# {
# log_source_directory = /your/db/incominglog
# log_source_directory = /your/db/incominglog
# }

View File

@ -322,6 +322,9 @@ namespace Jrd
bool Database::isReplicating(thread_db* tdbb)
{
if (!replConfig())
return false;
Sync sync(&dbb_repl_sync, FB_FUNCTION);
sync.lock(SYNC_SHARED);

View File

@ -304,6 +304,7 @@ class Database : public pool_alloc<type_dbb>
LockManager* getLockManager();
EventManager* getEventManager();
Replication::Manager* getReplManager(bool create);
const Replication::Config* getReplConfig()
{
return m_replConfig.get();

View File

@ -59,7 +59,7 @@ namespace
void parseLong(const string& input, ULONG& output)
{
char* tail = NULL;
char* tail = nullptr;
auto number = strtol(input.c_str(), &tail, 10);
if (tail && *tail == 0 && number > 0)
output = (ULONG) number;
@ -101,9 +101,9 @@ Config::Config()
applyIdleTimeout(DEFAULT_APPLY_IDLE_TIMEOUT),
applyErrorTimeout(DEFAULT_APPLY_ERROR_TIMEOUT),
pluginName(getPool()),
log_on_error(true),
disable_on_error(true),
throw_on_error(false)
logErrors(true),
reportErrors(false),
disableOnError(true)
{
sourceGuid.alignment = 0;
}
@ -126,10 +126,10 @@ Config::Config(const Config& other)
verboseLogging(other.verboseLogging),
applyIdleTimeout(other.applyIdleTimeout),
applyErrorTimeout(other.applyErrorTimeout),
pluginName(other.pluginName),
log_on_error(other.log_on_error),
disable_on_error(other.disable_on_error),
throw_on_error(other.throw_on_error)
pluginName(getPool(), other.pluginName),
logErrors(other.logErrors),
reportErrors(other.reportErrors),
disableOnError(other.disableOnError)
{
sourceGuid.alignment = 0;
}
@ -246,31 +246,28 @@ Config* Config::get(const PathName& lookupName)
{
config->pluginName = value;
}
else if (key == "log_on_error")
else if (key == "log_errors")
{
parseBoolean(value, config->log_on_error);
parseBoolean(value, config->logErrors);
}
else if (key == "report_errors")
{
parseBoolean(value, config->reportErrors);
}
else if (key == "disable_on_error")
{
parseBoolean(value, config->disable_on_error);
}
else if (key == "throw_on_error")
{
parseBoolean(value, config->throw_on_error);
parseBoolean(value, config->disableOnError);
}
}
}
if (exactMatch)
break;
}
// TODO: As soon as plugin name is moved into RDB$PUBLICATIONS delay config parse until real replication start
if (config->pluginName.hasData())
{
return config.release();
}
if (config->logDirectory.hasData() || config->syncReplicas.hasData())
{
@ -286,7 +283,7 @@ Config* Config::get(const PathName& lookupName)
return config.release();
}
return NULL;
return nullptr;
}
// This routine is used to retrieve the list of replica databases.

View File

@ -58,9 +58,9 @@ namespace Replication
ULONG applyIdleTimeout;
ULONG applyErrorTimeout;
Firebird::string pluginName;
bool log_on_error;
bool disable_on_error;
bool throw_on_error;
bool logErrors;
bool reportErrors;
bool disableOnError;
};
};

View File

@ -119,6 +119,8 @@ Manager::Manager(const string& dbId,
m_changeLog = FB_NEW_POOL(getPool())
ChangeLog(getPool(), dbId, guid, m_sequence, config);
}
else
fb_assert(config->syncReplicas.hasData());
// Attach to synchronous replicas (if any)

View File

@ -48,48 +48,60 @@ namespace
// should be replicated similar to user-defined ones
const int BACKUP_HISTORY_GENERATOR = 9;
const char* LOG_ERROR_MSG = "Replication error";
const char* LOG_WARNING_MSG = "Replication warning";
const char* NO_PLUGIN_ERROR = "Replication plugin %s is not found";
const char* STOP_ERROR = "Replication is stopped due to critical error(s)";
void handleError(thread_db* tdbb, jrd_tra* transaction = NULL, bool canThrow = true)
void logStatus(const Database* dbb, const ISC_STATUS* status, LogMsgType type)
{
string message;
char temp[BUFFER_LARGE];
while (fb_interpret(temp, sizeof(temp), &status))
{
if (!message.isEmpty())
message += "\n\t";
message += temp;
}
logOriginMessage(dbb->dbb_filename.c_str(), message, type);
}
void handleError(thread_db* tdbb, jrd_tra* transaction = nullptr, bool canThrow = true)
{
const auto dbb = tdbb->getDatabase();
const auto attachment = tdbb->getAttachment();
const auto config = dbb->replConfig();
fb_assert(attachment->att_replicator.hasData());
if (transaction && transaction->tra_replicator && config->disable_on_error && canThrow)
const auto config = dbb->replConfig();
fb_assert(config);
if (transaction && transaction->tra_replicator && config->disableOnError)
{
// If we cannot throw then calling routine will take care of the replicator
transaction->tra_replicator->dispose();
transaction->tra_replicator = NULL;
transaction->tra_flags &= ~TRA_replicating;
transaction->tra_replicator = nullptr;
}
const auto status = attachment->att_replicator->getStatus();
const auto state = status->getState();
if (state & IStatus::STATE_WARNINGS)
{
if (config->log_on_error)
{
string msg;
msg.printf("Database: %s\n\t%s", dbb->dbb_filename.c_str(), LOG_WARNING_MSG);
iscLogStatus(msg.c_str(), status);
}
if (config->logErrors)
logStatus(dbb, status->getWarnings(), WARNING_MSG);
}
if (state & IStatus::STATE_ERRORS)
{
if (config->log_on_error)
{
string msg;
msg.printf("Database: %s\n\t%s", dbb->dbb_filename.c_str(), LOG_ERROR_MSG);
iscLogStatus(msg.c_str(), status);
}
if (config->logErrors)
logStatus(dbb, status->getErrors(), ERROR_MSG);
if (config->throw_on_error && canThrow)
{
if (config->disableOnError)
logOriginMessage(dbb->dbb_filename, STOP_ERROR, ERROR_MSG);
if (config->reportErrors && canThrow)
status_exception::raise(status);
}
}
}
@ -100,7 +112,7 @@ namespace
// Disable replication for system attachments
if (attachment->isSystem())
return NULL;
return nullptr;
// Check whether replication is configured and enabled for this database
@ -108,53 +120,61 @@ namespace
if (!dbb->isReplicating(tdbb))
{
attachment->att_replicator = nullptr;
return NULL;
return nullptr;
}
const auto config = dbb->replConfig();
fb_assert(config);
// Create a replicator object, unless it already exists
if (!attachment->att_replicator)
if (attachment->att_replicator.hasData())
{
const auto config = dbb->replConfig();
// If replication must be disabled after errors and the status is dirty,
// then fake the replication being inactive
const auto status = attachment->att_replicator->getStatus();
const auto state = status->getState();
if (config->disableOnError && (state & IStatus::STATE_ERRORS))
return nullptr;
status->init(); // reset the status
}
else
{
if (config->pluginName.empty())
{
if (config->logDirectory.hasData() || config->syncReplicas.hasData())
{
auto& pool = *attachment->att_pool;
const auto manager = dbb->replManager(true);
const auto& guid = dbb->dbb_guid;
const auto& userName = attachment->att_user->getUserName();
attachment->att_replicator = FB_NEW
Replicator(pool, manager, guid, userName);
}
auto& pool = *attachment->att_pool;
const auto manager = dbb->replManager(true);
const auto& guid = dbb->dbb_guid;
const auto& userName = attachment->att_user->getUserName();
attachment->att_replicator = FB_NEW Replicator(pool, manager, guid, userName);
}
else
{
GetPlugins<IReplicatedSession> plugins(IPluginManager::TYPE_REPLICATOR, config->pluginName.c_str());
GetPlugins<IReplicatedSession> plugins(IPluginManager::TYPE_REPLICATOR,
config->pluginName.c_str());
if (!plugins.hasData())
{
string msg;
msg.printf("Replication plugin %s not found\n\t%s", config->pluginName.c_str(), LOG_ERROR_MSG);
msg.printf(NO_PLUGIN_ERROR, config->pluginName.c_str());
logOriginMessage(dbb->dbb_filename, msg, ERROR_MSG);
return nullptr;
}
attachment->att_replicator = plugins.plugin();
}
if (attachment->att_replicator.hasData())
{
attachment->att_replicator->setAttachment(attachment->getInterface());
if (cleanupTransactions)
attachment->att_replicator->cleanupTransaction(0);
}
attachment->att_replicator->setAttachment(attachment->getInterface());
if (cleanupTransactions)
attachment->att_replicator->cleanupTransaction(0);
}
fb_assert(attachment->att_replicator.hasData());
// No need to check for errors here: fresh replicator cannot have any, used one can have dirty status.
return attachment->att_replicator;
}
@ -163,7 +183,7 @@ namespace
// Disable replication for system and read-only transactions
if (transaction->tra_flags & (TRA_system | TRA_readonly))
return NULL;
return nullptr;
// Check parent replicator presense
// (this includes checking for the database-wise replication state)
@ -174,18 +194,19 @@ namespace
if (transaction->tra_replicator)
{
transaction->tra_replicator->dispose();
transaction->tra_replicator = NULL;
transaction->tra_replicator = nullptr;
}
return NULL;
return nullptr;
}
// Create a replicator object, unless it already exists
if (!transaction->tra_replicator &&
(transaction->tra_flags & TRA_replicating))
if (!transaction->tra_replicator && (transaction->tra_flags & TRA_replicating))
{
transaction->tra_replicator = replicator->startTransaction(transaction->getInterface(true), transaction->tra_number);
transaction->tra_replicator =
replicator->startTransaction(transaction->getInterface(true),
transaction->tra_number);
if (!transaction->tra_replicator)
handleError(tdbb, transaction);
@ -271,9 +292,7 @@ namespace
{
public:
ReplicatedRecordImpl(thread_db* tdbb, const jrd_rel* relation, const Record* record)
: //m_tdbb(tdbb),
m_record(record),
m_format(record->getFormat()),
: m_record(record),
m_relation(relation)
{
}
@ -284,17 +303,14 @@ namespace
unsigned getCount() override
{
return m_format->fmt_count;
const auto format = m_record->getFormat();
return format->fmt_count;
}
const char* getName() override
{
jrd_fld* field = MET_get_field(m_relation, m_fieldIndex);
if (field == nullptr)
return nullptr;
return field->fld_name.c_str();
const auto field = MET_get_field(m_relation, m_fieldIndex);
return field ? field->fld_name.c_str() : nullptr;
}
unsigned getType() override
@ -304,12 +320,12 @@ namespace
unsigned getSubType() override
{
return m_format->fmt_desc[m_fieldIndex].getSubType();
return m_desc->getSubType();
}
unsigned getScale() override
{
return m_format->fmt_desc[m_fieldIndex].dsc_scale;
return m_desc->dsc_scale;
}
unsigned getLength() override
@ -319,7 +335,7 @@ namespace
unsigned getCharSet() override
{
return m_format->fmt_desc[m_fieldIndex].getCharSet();
return m_desc->getCharSet();
}
const void* getData() override
@ -327,17 +343,25 @@ namespace
if (m_record->isNull(m_fieldIndex))
return nullptr;
return m_record->getData() + (IPTR)(m_format->fmt_desc[m_fieldIndex].dsc_address);
return m_record->getData() + (IPTR) m_desc->dsc_address;
}
IReplicatedField* getField(unsigned index) override
{
if (index >= m_format->fmt_count)
const auto format = m_record->getFormat();
if (index >= format->fmt_count)
return nullptr;
const auto desc = &format->fmt_desc[index];
if (desc->isUnknown() || !desc->dsc_address)
return nullptr;
m_desc = desc;
m_fieldIndex = index;
SLONG dummySubtype, dummyScale;
m_format->fmt_desc[m_fieldIndex].getSqlInfo(&m_fieldLength, &dummySubtype, &dummyScale, &m_fieldType);
desc->getSqlInfo(&m_fieldLength, &dummySubtype, &dummyScale, &m_fieldType);
return this;
}
@ -353,10 +377,9 @@ namespace
}
private:
//thread_db* const m_tdbb;
const Record* const m_record;
const Format* m_format; // Optimization
const jrd_rel* const m_relation;
const dsc* m_desc = nullptr; // optimization
unsigned m_fieldIndex = 0;
SLONG m_fieldLength = 0;
SLONG m_fieldType = 0;
@ -376,7 +399,7 @@ void REPL_attach(thread_db* tdbb, bool cleanupTransactions)
fb_assert(!attachment->att_repl_matcher);
auto& pool = *attachment->att_pool;
attachment->att_repl_matcher = FB_NEW_POOL(pool)
TableMatcher(pool, replConfig->includeFilter, replConfig->excludeFilter);
TableMatcher(pool, replConfig->includeFilter, replConfig->excludeFilter);
fb_assert(!attachment->att_replicator);
if (cleanupTransactions)
@ -404,11 +427,15 @@ void REPL_trans_commit(thread_db* tdbb, jrd_tra* transaction)
if (!replicator->commit())
{
// Commit is a terminal routine, we cannot throw here
handleError(tdbb, transaction, false);
}
replicator->dispose();
transaction->tra_replicator = nullptr;
if (transaction->tra_replicator)
{
transaction->tra_replicator->dispose();
transaction->tra_replicator = nullptr;
}
}
void REPL_trans_rollback(thread_db* tdbb, jrd_tra* transaction)
@ -423,8 +450,11 @@ void REPL_trans_rollback(thread_db* tdbb, jrd_tra* transaction)
handleError(tdbb, transaction, false);
}
replicator->dispose();
transaction->tra_replicator = nullptr;
if (transaction->tra_replicator)
{
transaction->tra_replicator->dispose();
transaction->tra_replicator = nullptr;
}
}
void REPL_trans_cleanup(Jrd::thread_db* tdbb, TraNumber number)
@ -490,7 +520,7 @@ void REPL_store(thread_db* tdbb, const record_param* rpb, jrd_tra* transaction)
fb_assert(record);
// This temporary auto-pointer is just to delete a temporary record
AutoPtr<Record> cleanupRecord(record != rpb->rpb_record ? record : NULL);
AutoPtr<Record> cleanupRecord(record != rpb->rpb_record ? record : nullptr);
AutoSetRestoreFlag<ULONG> noRecursion(&tdbb->tdbb_flags, TDBB_repl_in_progress, true);
if (!ensureSavepoints(tdbb, transaction))
@ -534,8 +564,8 @@ void REPL_modify(thread_db* tdbb, const record_param* orgRpb,
fb_assert(orgRecord);
// These temporary auto-pointers are just to delete temporary records
AutoPtr<Record> cleanupOrgRecord(orgRecord != orgRpb->rpb_record ? orgRecord : NULL);
AutoPtr<Record> cleanupNewRecord(newRecord != newRpb->rpb_record ? newRecord : NULL);
AutoPtr<Record> cleanupOrgRecord(orgRecord != orgRpb->rpb_record ? orgRecord : nullptr);
AutoPtr<Record> cleanupNewRecord(newRecord != newRpb->rpb_record ? newRecord : nullptr);
const auto orgLength = orgRecord->getLength();
const auto newLength = newRecord->getLength();
@ -588,7 +618,7 @@ void REPL_erase(thread_db* tdbb, const record_param* rpb, jrd_tra* transaction)
fb_assert(record);
// This temporary auto-pointer is just to delete a temporary record
AutoPtr<Record> cleanupRecord(record != rpb->rpb_record ? record : NULL);
AutoPtr<Record> cleanupRecord(record != rpb->rpb_record ? record : nullptr);
AutoSetRestoreFlag<ULONG> noRecursion(&tdbb->tdbb_flags, TDBB_repl_in_progress, true);
if (!ensureSavepoints(tdbb, transaction))
@ -627,7 +657,7 @@ void REPL_gen_id(thread_db* tdbb, SLONG genId, SINT64 value)
MetaName genName;
if (!attachment->att_generators.lookup(genId, genName))
{
MET_lookup_generator_id(tdbb, genId, genName, NULL);
MET_lookup_generator_id(tdbb, genId, genName, nullptr);
attachment->att_generators.store(genId, genName);
}

View File

@ -85,31 +85,11 @@ void Replicator::flush(BatchBlock& block, FlushReason reason, ULONG flags)
block.flushes++;
}
void Replicator::logError(const IStatus* status)
{
string message;
auto statusPtr = status->getErrors();
char temp[BUFFER_LARGE];
while (fb_interpret(temp, sizeof(temp), &statusPtr))
{
if (!message.isEmpty())
message += "\n\t";
message += temp;
}
logOriginMessage(m_config->dbName, message, ERROR_MSG);
}
void Replicator::postError(const Exception& ex)
{
FbLocalStatus tempStatus;
ex.stuffException(&tempStatus);
logError(&tempStatus);
Arg::StatusVector newErrors;
newErrors << Arg::Gds(isc_random) << Arg::Str("Replication error");
newErrors << Arg::StatusVector(tempStatus->getErrors());

View File

@ -264,7 +264,6 @@ namespace Replication
void initialize();
void flush(BatchBlock& txnData, FlushReason reason, ULONG flags = 0);
void logError(const Firebird::IStatus* status);
void postError(const Firebird::Exception& ex);
bool prepareTransaction(Transaction* transaction);