8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 16:43:03 +01:00

Replication plug (#280)

* Support for replication plugins + sample plugin
* Renamed TDBB flag
* Configurable replication errors handling
* Commit in two phases
* Move call to dispose() out of commit/rollback for code simplification
* A comment and manual fix for constant wrongly generated by CLOOP
* Log replication warnings as well
This commit is contained in:
Dimitry Sibiryakov 2020-09-18 10:45:51 +02:00 committed by GitHub
parent acf8062125
commit 1677359553
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1585 additions and 521 deletions

View File

@ -6,16 +6,10 @@ database
{
### ORIGIN SIDE SETTINGS
# Size of the local buffer used to accumulate changes that can be
# deferred until the transaction commit/rollback. The bigger this value
# the less disk access concurrency (related to log IOPS) happens.
# Plugin used to perform replication.
# Leave it empty to use internal replication.
#
# For synchronous replication, it also affects number of network round-trips
# between primary and replica hosts.
# However, a larger buffer costs a longer replication "checkpoints"
# (delay to synchronize the original database with its replica at commit).
#
# buffer_size = 1048576 # 1MB
# plugin =
# Pattern (regular expression) that defines what tables must be included into
# replication. By default, all tables are replicated.
@ -27,10 +21,29 @@ database
#
# exclude_filter =
# Boolean parameters describing how replication errors must be handled.
#
# log_on_error = true # Error message shall be written into firebird.log
# disable_on_error = true # Replication shall be disabled till the end of transaction.
# throw_on_error = false # Error shall be returned to user.
### Parameters specific for internal replication
# Directory to store replication log files.
#
# log_directory =
# Size of the local buffer used to accumulate changes that can be
# deferred until the transaction commit/rollback. The bigger this value
# the less disk access concurrency (related to log IOPS) happens.
#
# For synchronous replication, it also affects number of network round-trips
# between primary and replica hosts.
# However, a larger buffer costs a longer replication "checkpoints"
# (delay to synchronize the original database with its replica at commit).
#
# buffer_size = 1048576 # 1MB
# Prefix for replication log file names. It will be automatically suffixed
# with an ordinal sequential number. If not specified, database filename
# (without path) is used as a prefix.

View File

@ -181,3 +181,4 @@ dbcrypt - a sample of XOR database encryption (do not use in production!!!)
extauth - authentication for cross-server connections based on having same secret key on all servers
replication - a sample or replication. Doesn't do anything useful just write log of calls from the engine.

View File

@ -0,0 +1,572 @@
/*
*
* Sample replication plugin
*
*/
#include <algorithm>
#include <atomic>
#define __USE_MINGW_ANSI_STDIO 1
#include <stdio.h>
#include <time.h>
#include "firebird/Interface.h"
using namespace Firebird;
#define WriteLog(file, ...) fprintf(file, __VA_ARGS__), fflush(file)
class ReplPlugin : public IReplicatedSessionImpl<ReplPlugin, CheckStatusWrapper>
{
public:
ReplPlugin(IPluginConfig* config);
virtual ~ReplPlugin();
// IReferenceCounted implementation
void addRef() override;
int release() override;
// IPluginBase implementation
void setOwner(IReferenceCounted* r) override;
IReferenceCounted* getOwner() override;
// IReplicatedSession implementation
IStatus* getStatus() override;
void setAttachment(IAttachment* attachment) override;
IReplicatedTransaction* startTransaction(ITransaction* transaction, ISC_INT64 number) override;
FB_BOOLEAN cleanupTransaction(ISC_INT64 number) override;
FB_BOOLEAN setSequence(const char* name, ISC_INT64 value) override;
private:
friend class ReplTransaction;
IAttachment* att = nullptr;
FILE* log = nullptr;
IStatus* status = nullptr;
std::atomic_int refCounter;
IReferenceCounted* owner;
void dumpInfo(const unsigned char* buffer, size_t length);
};
class ReplTransaction: public IReplicatedTransactionImpl<ReplTransaction, CheckStatusWrapper>
{
public:
ReplTransaction(ReplPlugin* session, ITransaction* transaction, ISC_INT64 number);
~ReplTransaction();
// IDisposable implementation
void dispose() override;
// IReplicatedTransaction implementation
FB_BOOLEAN prepare() override;
FB_BOOLEAN commit() override;
FB_BOOLEAN rollback() override;
FB_BOOLEAN startSavepoint() override;
FB_BOOLEAN releaseSavepoint() override;
FB_BOOLEAN rollbackSavepoint() override;
FB_BOOLEAN insertRecord(const char* name, IReplicatedRecord* record) override;
FB_BOOLEAN updateRecord(const char* name, IReplicatedRecord* orgRecord, IReplicatedRecord* newRecord) override;
FB_BOOLEAN deleteRecord(const char* name, IReplicatedRecord* record) override;
FB_BOOLEAN executeSql(const char* sql) override;
FB_BOOLEAN executeSqlIntl(unsigned charset, const char* sql) override;
private:
ReplPlugin* parent;
ITransaction* trans;
bool dumpData(IReplicatedRecord* record);
};
IMaster* master = nullptr;
class PluginModule : public IPluginModuleImpl<PluginModule, CheckStatusWrapper>
{
public:
void doClean() override {}
void threadDetach() override {}
} module;
class Factory : public IPluginFactoryImpl<Factory, CheckStatusWrapper>
{
public:
IPluginBase* createPlugin(CheckStatusWrapper* status, IPluginConfig* factoryParameter)
{
IPluginBase* p = new ReplPlugin(factoryParameter);
p->addRef();
return p;
}
} factory;
extern "C"
{
#if defined(__WIN32__)
void __declspec(dllexport) FB_PLUGIN_ENTRY_POINT(IMaster* m);
#else
void FB_PLUGIN_ENTRY_POINT(IMaster* m)
__attribute__((visibility("default")));
#endif // __WIN32__
void FB_PLUGIN_ENTRY_POINT(IMaster* m)
{
master = m;
IPluginManager* pm = m->getPluginManager();
pm->registerModule(&module);
pm->registerPluginFactory(IPluginManager::TYPE_REPLICATOR, "fbSampleReplicator", &factory);
}
}
static std::atomic_int logCounter;
static const ISC_STATUS err[] = { isc_arg_gds, isc_random, isc_arg_string, (ISC_STATUS)"Intolerable integer value", isc_arg_end };
static const ISC_STATUS wrn[] = { isc_arg_gds, isc_random, isc_arg_string, (ISC_STATUS)"Just a warning", isc_arg_end };
ReplPlugin::ReplPlugin(IPluginConfig* conf)
{
char fn[100];
sprintf(fn, "session_%08x_%d.log", (unsigned)time(nullptr), logCounter++);
log = fopen(fn, "w");
WriteLog(log, "%p\tReplicatedSession constructed\n", this);
status = master->getStatus();
}
ReplPlugin::~ReplPlugin()
{
if (log != nullptr)
{
WriteLog(log, "%p\tReplicatedSession destructed\n", this);
fclose(log);
}
if (att != nullptr)
att->release();
if (status != nullptr)
status->dispose();
}
void ReplPlugin::addRef()
{
WriteLog(log, "%p\taddRef() to %d\n", this, ++refCounter);
}
int ReplPlugin::release()
{
WriteLog(log, "%p\trelease at %d\n", this, refCounter.load());
if (--refCounter == 0)
{
delete this;
return 0;
}
return 1;
}
void ReplPlugin::setOwner(IReferenceCounted* r)
{
WriteLog(log, "%p\tsetOwner(%p)\n", this, r);
owner = r;
}
IReferenceCounted* ReplPlugin::getOwner()
{
WriteLog(log, "%p\tgetOwner()\n", this);
return owner;
}
IStatus* ReplPlugin::getStatus()
{
WriteLog(log, "%p\tgetStatus()\n", this);
return status;
}
void ReplPlugin::dumpInfo(const unsigned char* buffer, size_t length)
{
const unsigned char* p = buffer;
while (p < buffer + length)
{
unsigned char item = *p++;
// Handle terminating items fist
if (item == isc_info_end)
{
return;
}
if (item == isc_info_truncated)
{
WriteLog(log, "\t\tDatabase info truncated\n");
return;
}
// Now data items
const unsigned len = p[0] | p[1] << 8;
p += 2;
switch (item)
{
case fb_info_db_guid:
{
WriteLog(log, "\t\tDatabase GUID = %.*s\n", len, p);
break;
}
case isc_info_error:
{
unsigned err = p[1];
for (unsigned i = 1; i < std::min(len, 4U); i++)
{
err |= p[i + 1] << (8 * i);
}
WriteLog(log, "\t\tDatabase info error %u for item %d\n", err, p[0]);
return;
}
default:
WriteLog(log, "\t\tUnexpected info item %d\n", item);
break;
}
p += len;
}
WriteLog(log, "\t\tSuspicious exit from info parse loop\n");
}
void ReplPlugin::setAttachment(IAttachment* attachment)
{
WriteLog(log, "%p\tAssigned attachment %p\n", this, attachment);
att = attachment;
att->addRef();
CheckStatusWrapper ExtStatus(status);
const unsigned char items[] = { fb_info_db_guid };
unsigned char response[80];
att->getInfo(&ExtStatus, sizeof(items), items, sizeof(response), response);
if (status->getState() == 0)
{
dumpInfo(response, sizeof(response));
}
}
IReplicatedTransaction* ReplPlugin::startTransaction(ITransaction* transaction, ISC_INT64 number)
{
WriteLog(log, "%p\tstartTransaction(%p, %lld)\n", this, transaction, number);
return new ReplTransaction(this, transaction, number);
}
FB_BOOLEAN ReplPlugin::cleanupTransaction(ISC_INT64 number)
{
WriteLog(log, "%p\tcleanupTransaction(%lld)\n", this, number);
return FB_TRUE;
}
FB_BOOLEAN ReplPlugin::setSequence(const char* name, ISC_INT64 value)
{
WriteLog(log, "%p\tsetSequence(%s, %lld)\n", this, name, value);
return FB_TRUE;
}
ReplTransaction::ReplTransaction(ReplPlugin* session, ITransaction* transaction, ISC_INT64 number):
parent(session), trans(transaction)
{
parent->addRef(); // Lock parent from disappearing
trans->addRef();
WriteLog(parent->log, "%p\tTransaction started\n", this);
}
ReplTransaction::~ReplTransaction()
{
WriteLog(parent->log, "%p\tTransaction destructed\n", this);
trans->release();
parent->release();
}
void ReplTransaction::dispose()
{
WriteLog(parent->log, "%p\tdispose()\n", this);
delete this;
}
FB_BOOLEAN ReplTransaction::prepare()
{
WriteLog(parent->log, "%p\tprepare()\n", this);
return FB_TRUE;
}
FB_BOOLEAN ReplTransaction::commit()
{
WriteLog(parent->log, "%p\tcommit()\n", this);
return FB_TRUE;
}
FB_BOOLEAN ReplTransaction::rollback()
{
WriteLog(parent->log, "%p\trollback()\n", this);
parent->status->setWarnings(wrn);
return FB_FALSE;
}
FB_BOOLEAN ReplTransaction::startSavepoint()
{
WriteLog(parent->log, "%p\tstartSavepoint()\n", this);
return FB_TRUE;
}
FB_BOOLEAN ReplTransaction::releaseSavepoint()
{
WriteLog(parent->log, "%p\treleaseSavepoint()\n", this);
return FB_TRUE;
}
FB_BOOLEAN ReplTransaction::rollbackSavepoint()
{
WriteLog(parent->log, "%p\trollbackSavepoint()\n", this);
return FB_TRUE;
}
bool ReplTransaction::dumpData(IReplicatedRecord* record)
{
for (unsigned i = 0; i < record->getCount(); i++)
{
IReplicatedField* field = record->getField(i);
if (field == nullptr)
{
WriteLog(parent->log, "\t\tNO FIELD %u FOUND\n", i);
continue;
}
unsigned fieldType = field->getType();
WriteLog(parent->log, "\tfield %u (%s), type %u:\n", i, field->getName(), fieldType);
const void* fieldData = field->getData();
if (fieldData == nullptr)
{
WriteLog(parent->log, "\t\tNULL\n");
}
else
{
switch (fieldType)
{
case SQL_TEXT:
{
unsigned length = field->getLength();
unsigned charSet = field->getCharSet();
if (charSet == 1) // OCTETS
{
WriteLog(parent->log, "\t\tBINARY data length %u: ", length);
const unsigned char* data = reinterpret_cast<const unsigned char*>(fieldData);
for (unsigned j = 0; j < length; j++)
WriteLog(parent->log, "%02u", *data++);
WriteLog(parent->log, "\n");
}
else
WriteLog(parent->log, "\t\tTEXT with charset %u, length %u: \"%.*s\"\n", charSet, length, length, reinterpret_cast<const char*>(fieldData));
break;
}
case SQL_VARYING:
{
unsigned charSet = field->getCharSet();
const paramvary* data = static_cast<const paramvary*>(fieldData);
if (charSet == 1) // OCTETS
{
fprintf(parent->log, "\t\tVARBINARY data length %u: ", data->vary_length);
for (unsigned j = 0; j < data->vary_length; j++)
fprintf(parent->log, "%02u", data->vary_string[j]);
WriteLog(parent->log, "\n");
}
else
WriteLog(parent->log, "\t\tVARCHAR with charset %u, length %u: \"%.*s\"\n", charSet, data->vary_length, data->vary_length, data->vary_string);
break;
}
case SQL_SHORT:
{
WriteLog(parent->log, "\t\tSMALLINT with scale %u: %d\n", field->getScale(), *reinterpret_cast<const int16_t*>(fieldData));
break;
}
case SQL_LONG:
{
int value = *reinterpret_cast<const int32_t*>(fieldData);
WriteLog(parent->log, "\t\tINTEGER with scale %u: %d\n", field->getScale(), value);
if (value == 666)
throw value;
break;
}
case SQL_ARRAY:
{
WriteLog(parent->log, "\t\tARRAY\n");
break;
}
case SQL_BLOB:
{
unsigned subType = field->getSubType();
WriteLog(parent->log, "\t\tBLOB subtype %u:\n", subType);
unsigned charSet = 1;
if (subType == 1)
{
charSet = field->getCharSet();
}
CheckStatusWrapper ExtStatus(parent->status);
ISC_QUAD blobId = *reinterpret_cast<const ISC_QUAD*>(fieldData);
IBlob* blob(parent->att->openBlob(&ExtStatus, trans, &blobId, 0, nullptr));
if (ExtStatus.getState() & IStatus::STATE_ERRORS)
return false;
char buffer[USHRT_MAX];
do
{
unsigned length;
int ret = blob->getSegment(&ExtStatus, sizeof(buffer), buffer, &length);
if (ret == IStatus::RESULT_ERROR)
{
blob->release();
return false;
}
if (ret == IStatus::RESULT_NO_DATA)
break;
if (length > 0)
{
fprintf(parent->log, "\t\t - segment of length %u: ", length);
if (subType != 1 || charSet == 1)
{
for (unsigned j = 0; j < length; j++)
fprintf(parent->log, "%02u", buffer[j]);
WriteLog(parent->log, "\n");
}
else
WriteLog(parent->log, "(charset %u) \"%.*s\"\n", charSet, length, buffer);
}
} while (true);
blob->close(&ExtStatus);
blob->release();
if (ExtStatus.getState() & IStatus::STATE_ERRORS)
return false;
break;
}
case SQL_FLOAT:
{
WriteLog(parent->log, "\t\tFLOAT: %f\n", *reinterpret_cast<const float*>(fieldData));
break;
}
case SQL_DOUBLE:
{
WriteLog(parent->log, "\t\tDOUBLE: %f\n", *reinterpret_cast<const double*>(fieldData));
break;
}
case SQL_TYPE_DATE:
{
ISC_DATE value = *reinterpret_cast<const ISC_DATE*>(fieldData);
IUtil* utl = master->getUtilInterface();
unsigned year, month, day;
utl->decodeDate(value, &year, &month, &day);
WriteLog(parent->log, "\t\tDATE: %04u-%02u-%02u\n", year, month, day);
break;
}
case SQL_TYPE_TIME:
{
ISC_TIME value = *reinterpret_cast<const ISC_TIME*>(fieldData);
IUtil* utl = master->getUtilInterface();
unsigned hours, minutes, seconds, fractions;
utl->decodeTime(value, &hours, &minutes, &seconds, &fractions);
WriteLog(parent->log, "\t\tTIME: %02u:%02u:%02u.%04u\n", hours, minutes, seconds, fractions);
break;
}
case SQL_TIMESTAMP:
{
ISC_TIMESTAMP value = *reinterpret_cast<const ISC_TIMESTAMP*>(fieldData);
IUtil* utl = master->getUtilInterface();
unsigned year, month, day, hours, minutes, seconds, fractions;
utl->decodeDate(value.timestamp_date, &year, &month, &day);
utl->decodeTime(value.timestamp_time, &hours, &minutes, &seconds, &fractions);
WriteLog(parent->log, "\t\tTIMESTAMP: %04u-%02u-%02u %02u:%02u:%02u.%04u\n", year, month, day, hours, minutes, seconds, fractions);
break;
}
case SQL_INT64:
{
WriteLog(parent->log, "\t\tBIGINT with scale %u: %lld\n", field->getScale(), *reinterpret_cast<const int64_t*>(fieldData));
break;
}
case SQL_BOOLEAN:
{
WriteLog(parent->log, "\t\tBOOLEAN: %s\n",
*reinterpret_cast<const FB_BOOLEAN*>(fieldData) == FB_TRUE ?
"true" : "false");
break;
}
case SQL_INT128:
{
char buffer[50];
unsigned scale = field->getScale();
CheckStatusWrapper ExtStatus(parent->status);
master->getUtilInterface()->getInt128(&ExtStatus)->toString(&ExtStatus,
reinterpret_cast<const FB_I128*>(fieldData), scale,
sizeof(buffer), buffer);
WriteLog(parent->log, "\t\tINT128 with scale %u: %s\n", scale, buffer);
break;
}
default:
{
WriteLog(parent->log, "\t\twhatever\n");
}
}
}
}
return true;
}
FB_BOOLEAN ReplTransaction::insertRecord(const char* name, IReplicatedRecord* record)
{
WriteLog(parent->log, "%p\tInsert record into %s\n", this, name);
try
{
return dumpData(record) ? FB_TRUE : FB_FALSE;
}
catch (const int)
{
parent->status->setErrors(err);
return FB_FALSE;
}
}
FB_BOOLEAN ReplTransaction::updateRecord(const char* name, IReplicatedRecord* orgRecord, IReplicatedRecord* newRecord)
{
WriteLog(parent->log, "%p\tUpdate %s\nOldData:\n", this, name);
try
{
if (!dumpData(orgRecord))
return FB_FALSE;
WriteLog(parent->log, "NewData:\n");
return dumpData(newRecord) ? FB_TRUE : FB_FALSE;
}
catch (const int)
{
parent->status->setErrors(err);
return FB_FALSE;
}
}
FB_BOOLEAN ReplTransaction::deleteRecord(const char* name, IReplicatedRecord* record)
{
WriteLog(parent->log, "%p\tDelete from %s\n", this, name);
try
{
return dumpData(record) ? FB_TRUE : FB_FALSE;
}
catch (const int)
{
parent->status->setErrors(err);
return FB_FALSE;
}
}
FB_BOOLEAN ReplTransaction::executeSql(const char* sql)
{
WriteLog(parent->log, "%p\tExecuteSql(%s)\n", this, sql);
return FB_TRUE;
}
FB_BOOLEAN ReplTransaction::executeSqlIntl(unsigned charset, const char* sql)
{
WriteLog(parent->log, "%p\tExecuteSqlIntl(%u, %s)\n", this, charset, sql);
return FB_TRUE;
}

View File

@ -983,7 +983,7 @@ void DsqlDdlRequest::execute(thread_db* tdbb, jrd_tra** traHandle,
try
{
AutoSetRestoreFlag<ULONG> execDdl(&tdbb->tdbb_flags, TDBB_repl_sql, true);
AutoSetRestoreFlag<ULONG> execDdl(&tdbb->tdbb_flags, TDBB_repl_in_progress, true);
node->executeDdl(tdbb, internalScratch, req_transaction);

View File

@ -246,7 +246,8 @@ interface PluginManager : Versioned
const uint TYPE_WIRE_CRYPT = 8;
const uint TYPE_DB_CRYPT = 9;
const uint TYPE_KEY_HOLDER = 10;
const uint TYPE_COUNT = 11; // keep in sync
const uint TYPE_REPLICATOR = 11;
const uint TYPE_COUNT = 12; // keep in sync
//// TODO: TYPE_COUNT is not count. And these constants starts from 1, different than DIR_* ones.
// Main function called by plugin modules in firebird_plugin()
@ -1555,22 +1556,44 @@ interface Int128 : Versioned
// Replication interfaces
interface ReplicatedRecord : Versioned
interface ReplicatedField : Versioned
{
uint getRawLength();
const uchar* getRawData();
// Return name of field. NULL is returned on error
const string getName();
// Return field type or zero if field not exists
uint getType();
// Return subtype if applicable
uint getSubType();
// Return scale fo the field. Behavior is undefined for non-numeric fields
uint getScale();
// Return maximum length of field data in bytes
uint getLength();
// Return character set without collation part for text or CLOB fields
uint getCharSet();
// Return pointer to data. nullptr is returned if value is NULL
const void* getData();
}
interface ReplicatedBlob : Versioned
interface ReplicatedRecord : Versioned
{
uint getLength();
boolean isEof();
uint getSegment(uint length, uchar* buffer);
// Return count of fields
uint getCount();
// Return ReplicatedField with given index. nullptr is returned if index is invalid.
// Returned refeence is valid until next call of getField() or end of the record life.
ReplicatedField getField(uint index);
// Return size of raw data
uint getRawLength();
// Dirty method to return record buffer in internal format
const uchar* getRawData();
}
interface ReplicatedTransaction : Disposable
{
// Last chance to return an error and make commit to be aborted.
// The transaction still can be rolled back on error happened after this call.
boolean prepare();
// An error returned from these methods may be logged but do not abort process
boolean commit();
boolean rollback();
@ -1578,6 +1601,7 @@ interface ReplicatedTransaction : Disposable
boolean releaseSavepoint();
boolean rollbackSavepoint();
// ReplicatedRecords parameters point to local objects, do not ever store the pointer.
boolean insertRecord(const string name,
ReplicatedRecord record);
boolean updateRecord(const string name,
@ -1586,17 +1610,18 @@ interface ReplicatedTransaction : Disposable
boolean deleteRecord(const string name,
ReplicatedRecord record);
boolean storeBlob(ISC_QUAD blobId, ReplicatedBlob blob);
boolean executeSql(const string sql);
boolean executeSqlIntl(uint charset, const string sql);
}
interface ReplicatedSession : Disposable
interface ReplicatedSession : PluginBase
{
// Returned value is guaranteed to be valid only after other method has returned an error.
Status getStatus();
ReplicatedTransaction startTransaction(int64 number);
// Parameter don't need to be released if not used.
void setAttachment(Attachment attachment);
ReplicatedTransaction startTransaction(Transaction transaction, int64 number);
boolean cleanupTransaction(int64 number);
boolean setSequence(const string name, int64 value);

View File

@ -117,8 +117,8 @@ namespace Firebird
class IDecFloat16;
class IDecFloat34;
class IInt128;
class IReplicatedField;
class IReplicatedRecord;
class IReplicatedBlob;
class IReplicatedTransaction;
class IReplicatedSession;
@ -824,7 +824,8 @@ namespace Firebird
static const unsigned TYPE_WIRE_CRYPT = 8;
static const unsigned TYPE_DB_CRYPT = 9;
static const unsigned TYPE_KEY_HOLDER = 10;
static const unsigned TYPE_COUNT = 11;
static const unsigned TYPE_REPLICATOR = 11;
static const unsigned TYPE_COUNT = 12;
void registerPluginFactory(unsigned pluginType, const char* defaultName, IPluginFactory* factory)
{
@ -6031,11 +6032,83 @@ namespace Firebird
}
};
class IReplicatedField : public IVersioned
{
public:
struct VTable : public IVersioned::VTable
{
const char* (CLOOP_CARG *getName)(IReplicatedField* self) throw();
unsigned (CLOOP_CARG *getType)(IReplicatedField* self) throw();
unsigned (CLOOP_CARG *getSubType)(IReplicatedField* self) throw();
unsigned (CLOOP_CARG *getScale)(IReplicatedField* self) throw();
unsigned (CLOOP_CARG *getLength)(IReplicatedField* self) throw();
unsigned (CLOOP_CARG *getCharSet)(IReplicatedField* self) throw();
const void* (CLOOP_CARG *getData)(IReplicatedField* self) throw();
};
protected:
IReplicatedField(DoNotInherit)
: IVersioned(DoNotInherit())
{
}
~IReplicatedField()
{
}
public:
static const unsigned VERSION = 2;
const char* getName()
{
const char* ret = static_cast<VTable*>(this->cloopVTable)->getName(this);
return ret;
}
unsigned getType()
{
unsigned ret = static_cast<VTable*>(this->cloopVTable)->getType(this);
return ret;
}
unsigned getSubType()
{
unsigned ret = static_cast<VTable*>(this->cloopVTable)->getSubType(this);
return ret;
}
unsigned getScale()
{
unsigned ret = static_cast<VTable*>(this->cloopVTable)->getScale(this);
return ret;
}
unsigned getLength()
{
unsigned ret = static_cast<VTable*>(this->cloopVTable)->getLength(this);
return ret;
}
unsigned getCharSet()
{
unsigned ret = static_cast<VTable*>(this->cloopVTable)->getCharSet(this);
return ret;
}
const void* getData()
{
const void* ret = static_cast<VTable*>(this->cloopVTable)->getData(this);
return ret;
}
};
class IReplicatedRecord : public IVersioned
{
public:
struct VTable : public IVersioned::VTable
{
unsigned (CLOOP_CARG *getCount)(IReplicatedRecord* self) throw();
IReplicatedField* (CLOOP_CARG *getField)(IReplicatedRecord* self, unsigned index) throw();
unsigned (CLOOP_CARG *getRawLength)(IReplicatedRecord* self) throw();
const unsigned char* (CLOOP_CARG *getRawData)(IReplicatedRecord* self) throw();
};
@ -6053,6 +6126,18 @@ namespace Firebird
public:
static const unsigned VERSION = 2;
unsigned getCount()
{
unsigned ret = static_cast<VTable*>(this->cloopVTable)->getCount(this);
return ret;
}
IReplicatedField* getField(unsigned index)
{
IReplicatedField* ret = static_cast<VTable*>(this->cloopVTable)->getField(this, index);
return ret;
}
unsigned getRawLength()
{
unsigned ret = static_cast<VTable*>(this->cloopVTable)->getRawLength(this);
@ -6066,48 +6151,6 @@ namespace Firebird
}
};
class IReplicatedBlob : public IVersioned
{
public:
struct VTable : public IVersioned::VTable
{
unsigned (CLOOP_CARG *getLength)(IReplicatedBlob* self) throw();
FB_BOOLEAN (CLOOP_CARG *isEof)(IReplicatedBlob* self) throw();
unsigned (CLOOP_CARG *getSegment)(IReplicatedBlob* self, unsigned length, unsigned char* buffer) throw();
};
protected:
IReplicatedBlob(DoNotInherit)
: IVersioned(DoNotInherit())
{
}
~IReplicatedBlob()
{
}
public:
static const unsigned VERSION = 2;
unsigned getLength()
{
unsigned ret = static_cast<VTable*>(this->cloopVTable)->getLength(this);
return ret;
}
FB_BOOLEAN isEof()
{
FB_BOOLEAN ret = static_cast<VTable*>(this->cloopVTable)->isEof(this);
return ret;
}
unsigned getSegment(unsigned length, unsigned char* buffer)
{
unsigned ret = static_cast<VTable*>(this->cloopVTable)->getSegment(this, length, buffer);
return ret;
}
};
class IReplicatedTransaction : public IDisposable
{
public:
@ -6122,7 +6165,6 @@ namespace Firebird
FB_BOOLEAN (CLOOP_CARG *insertRecord)(IReplicatedTransaction* self, const char* name, IReplicatedRecord* record) throw();
FB_BOOLEAN (CLOOP_CARG *updateRecord)(IReplicatedTransaction* self, const char* name, IReplicatedRecord* orgRecord, IReplicatedRecord* newRecord) throw();
FB_BOOLEAN (CLOOP_CARG *deleteRecord)(IReplicatedTransaction* self, const char* name, IReplicatedRecord* record) throw();
FB_BOOLEAN (CLOOP_CARG *storeBlob)(IReplicatedTransaction* self, ISC_QUAD blobId, IReplicatedBlob* blob) throw();
FB_BOOLEAN (CLOOP_CARG *executeSql)(IReplicatedTransaction* self, const char* sql) throw();
FB_BOOLEAN (CLOOP_CARG *executeSqlIntl)(IReplicatedTransaction* self, unsigned charset, const char* sql) throw();
};
@ -6194,12 +6236,6 @@ namespace Firebird
return ret;
}
FB_BOOLEAN storeBlob(ISC_QUAD blobId, IReplicatedBlob* blob)
{
FB_BOOLEAN ret = static_cast<VTable*>(this->cloopVTable)->storeBlob(this, blobId, blob);
return ret;
}
FB_BOOLEAN executeSql(const char* sql)
{
FB_BOOLEAN ret = static_cast<VTable*>(this->cloopVTable)->executeSql(this, sql);
@ -6213,20 +6249,21 @@ namespace Firebird
}
};
class IReplicatedSession : public IDisposable
class IReplicatedSession : public IPluginBase
{
public:
struct VTable : public IDisposable::VTable
struct VTable : public IPluginBase::VTable
{
IStatus* (CLOOP_CARG *getStatus)(IReplicatedSession* self) throw();
IReplicatedTransaction* (CLOOP_CARG *startTransaction)(IReplicatedSession* self, ISC_INT64 number) throw();
void (CLOOP_CARG *setAttachment)(IReplicatedSession* self, IAttachment* attachment) throw();
IReplicatedTransaction* (CLOOP_CARG *startTransaction)(IReplicatedSession* self, ITransaction* transaction, ISC_INT64 number) throw();
FB_BOOLEAN (CLOOP_CARG *cleanupTransaction)(IReplicatedSession* self, ISC_INT64 number) throw();
FB_BOOLEAN (CLOOP_CARG *setSequence)(IReplicatedSession* self, const char* name, ISC_INT64 value) throw();
};
protected:
IReplicatedSession(DoNotInherit)
: IDisposable(DoNotInherit())
: IPluginBase(DoNotInherit())
{
}
@ -6235,7 +6272,7 @@ namespace Firebird
}
public:
static const unsigned VERSION = 3;
static const unsigned VERSION = 4;
IStatus* getStatus()
{
@ -6243,9 +6280,14 @@ namespace Firebird
return ret;
}
IReplicatedTransaction* startTransaction(ISC_INT64 number)
void setAttachment(IAttachment* attachment)
{
IReplicatedTransaction* ret = static_cast<VTable*>(this->cloopVTable)->startTransaction(this, number);
static_cast<VTable*>(this->cloopVTable)->setAttachment(this, attachment);
}
IReplicatedTransaction* startTransaction(ITransaction* transaction, ISC_INT64 number)
{
IReplicatedTransaction* ret = static_cast<VTable*>(this->cloopVTable)->startTransaction(this, transaction, number);
return ret;
}
@ -18487,6 +18529,146 @@ namespace Firebird
virtual void fromString(StatusType* status, int scale, const char* from, FB_I128* to) = 0;
};
template <typename Name, typename StatusType, typename Base>
class IReplicatedFieldBaseImpl : public Base
{
public:
typedef IReplicatedField Declaration;
IReplicatedFieldBaseImpl(DoNotInherit = DoNotInherit())
{
static struct VTableImpl : Base::VTable
{
VTableImpl()
{
this->version = Base::VERSION;
this->getName = &Name::cloopgetNameDispatcher;
this->getType = &Name::cloopgetTypeDispatcher;
this->getSubType = &Name::cloopgetSubTypeDispatcher;
this->getScale = &Name::cloopgetScaleDispatcher;
this->getLength = &Name::cloopgetLengthDispatcher;
this->getCharSet = &Name::cloopgetCharSetDispatcher;
this->getData = &Name::cloopgetDataDispatcher;
}
} vTable;
this->cloopVTable = &vTable;
}
static const char* CLOOP_CARG cloopgetNameDispatcher(IReplicatedField* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::getName();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<const char*>(0);
}
}
static unsigned CLOOP_CARG cloopgetTypeDispatcher(IReplicatedField* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::getType();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<unsigned>(0);
}
}
static unsigned CLOOP_CARG cloopgetSubTypeDispatcher(IReplicatedField* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::getSubType();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<unsigned>(0);
}
}
static unsigned CLOOP_CARG cloopgetScaleDispatcher(IReplicatedField* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::getScale();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<unsigned>(0);
}
}
static unsigned CLOOP_CARG cloopgetLengthDispatcher(IReplicatedField* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::getLength();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<unsigned>(0);
}
}
static unsigned CLOOP_CARG cloopgetCharSetDispatcher(IReplicatedField* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::getCharSet();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<unsigned>(0);
}
}
static const void* CLOOP_CARG cloopgetDataDispatcher(IReplicatedField* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::getData();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<const void*>(0);
}
}
};
template <typename Name, typename StatusType, typename Base = IVersionedImpl<Name, StatusType, Inherit<IReplicatedField> > >
class IReplicatedFieldImpl : public IReplicatedFieldBaseImpl<Name, StatusType, Base>
{
protected:
IReplicatedFieldImpl(DoNotInherit = DoNotInherit())
{
}
public:
virtual ~IReplicatedFieldImpl()
{
}
virtual const char* getName() = 0;
virtual unsigned getType() = 0;
virtual unsigned getSubType() = 0;
virtual unsigned getScale() = 0;
virtual unsigned getLength() = 0;
virtual unsigned getCharSet() = 0;
virtual const void* getData() = 0;
};
template <typename Name, typename StatusType, typename Base>
class IReplicatedRecordBaseImpl : public Base
{
@ -18500,6 +18682,8 @@ namespace Firebird
VTableImpl()
{
this->version = Base::VERSION;
this->getCount = &Name::cloopgetCountDispatcher;
this->getField = &Name::cloopgetFieldDispatcher;
this->getRawLength = &Name::cloopgetRawLengthDispatcher;
this->getRawData = &Name::cloopgetRawDataDispatcher;
}
@ -18508,6 +18692,32 @@ namespace Firebird
this->cloopVTable = &vTable;
}
static unsigned CLOOP_CARG cloopgetCountDispatcher(IReplicatedRecord* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::getCount();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<unsigned>(0);
}
}
static IReplicatedField* CLOOP_CARG cloopgetFieldDispatcher(IReplicatedRecord* self, unsigned index) throw()
{
try
{
return static_cast<Name*>(self)->Name::getField(index);
}
catch (...)
{
StatusType::catchException(0);
return static_cast<IReplicatedField*>(0);
}
}
static unsigned CLOOP_CARG cloopgetRawLengthDispatcher(IReplicatedRecord* self) throw()
{
try
@ -18548,90 +18758,12 @@ namespace Firebird
{
}
virtual unsigned getCount() = 0;
virtual IReplicatedField* getField(unsigned index) = 0;
virtual unsigned getRawLength() = 0;
virtual const unsigned char* getRawData() = 0;
};
template <typename Name, typename StatusType, typename Base>
class IReplicatedBlobBaseImpl : public Base
{
public:
typedef IReplicatedBlob Declaration;
IReplicatedBlobBaseImpl(DoNotInherit = DoNotInherit())
{
static struct VTableImpl : Base::VTable
{
VTableImpl()
{
this->version = Base::VERSION;
this->getLength = &Name::cloopgetLengthDispatcher;
this->isEof = &Name::cloopisEofDispatcher;
this->getSegment = &Name::cloopgetSegmentDispatcher;
}
} vTable;
this->cloopVTable = &vTable;
}
static unsigned CLOOP_CARG cloopgetLengthDispatcher(IReplicatedBlob* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::getLength();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<unsigned>(0);
}
}
static FB_BOOLEAN CLOOP_CARG cloopisEofDispatcher(IReplicatedBlob* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::isEof();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<FB_BOOLEAN>(0);
}
}
static unsigned CLOOP_CARG cloopgetSegmentDispatcher(IReplicatedBlob* self, unsigned length, unsigned char* buffer) throw()
{
try
{
return static_cast<Name*>(self)->Name::getSegment(length, buffer);
}
catch (...)
{
StatusType::catchException(0);
return static_cast<unsigned>(0);
}
}
};
template <typename Name, typename StatusType, typename Base = IVersionedImpl<Name, StatusType, Inherit<IReplicatedBlob> > >
class IReplicatedBlobImpl : public IReplicatedBlobBaseImpl<Name, StatusType, Base>
{
protected:
IReplicatedBlobImpl(DoNotInherit = DoNotInherit())
{
}
public:
virtual ~IReplicatedBlobImpl()
{
}
virtual unsigned getLength() = 0;
virtual FB_BOOLEAN isEof() = 0;
virtual unsigned getSegment(unsigned length, unsigned char* buffer) = 0;
};
template <typename Name, typename StatusType, typename Base>
class IReplicatedTransactionBaseImpl : public Base
{
@ -18655,7 +18787,6 @@ namespace Firebird
this->insertRecord = &Name::cloopinsertRecordDispatcher;
this->updateRecord = &Name::cloopupdateRecordDispatcher;
this->deleteRecord = &Name::cloopdeleteRecordDispatcher;
this->storeBlob = &Name::cloopstoreBlobDispatcher;
this->executeSql = &Name::cloopexecuteSqlDispatcher;
this->executeSqlIntl = &Name::cloopexecuteSqlIntlDispatcher;
}
@ -18781,19 +18912,6 @@ namespace Firebird
}
}
static FB_BOOLEAN CLOOP_CARG cloopstoreBlobDispatcher(IReplicatedTransaction* self, ISC_QUAD blobId, IReplicatedBlob* blob) throw()
{
try
{
return static_cast<Name*>(self)->Name::storeBlob(blobId, blob);
}
catch (...)
{
StatusType::catchException(0);
return static_cast<FB_BOOLEAN>(0);
}
}
static FB_BOOLEAN CLOOP_CARG cloopexecuteSqlDispatcher(IReplicatedTransaction* self, const char* sql) throw()
{
try
@ -18855,7 +18973,6 @@ namespace Firebird
virtual FB_BOOLEAN insertRecord(const char* name, IReplicatedRecord* record) = 0;
virtual FB_BOOLEAN updateRecord(const char* name, IReplicatedRecord* orgRecord, IReplicatedRecord* newRecord) = 0;
virtual FB_BOOLEAN deleteRecord(const char* name, IReplicatedRecord* record) = 0;
virtual FB_BOOLEAN storeBlob(ISC_QUAD blobId, IReplicatedBlob* blob) = 0;
virtual FB_BOOLEAN executeSql(const char* sql) = 0;
virtual FB_BOOLEAN executeSqlIntl(unsigned charset, const char* sql) = 0;
};
@ -18873,8 +18990,12 @@ namespace Firebird
VTableImpl()
{
this->version = Base::VERSION;
this->dispose = &Name::cloopdisposeDispatcher;
this->addRef = &Name::cloopaddRefDispatcher;
this->release = &Name::cloopreleaseDispatcher;
this->setOwner = &Name::cloopsetOwnerDispatcher;
this->getOwner = &Name::cloopgetOwnerDispatcher;
this->getStatus = &Name::cloopgetStatusDispatcher;
this->setAttachment = &Name::cloopsetAttachmentDispatcher;
this->startTransaction = &Name::cloopstartTransactionDispatcher;
this->cleanupTransaction = &Name::cloopcleanupTransactionDispatcher;
this->setSequence = &Name::cloopsetSequenceDispatcher;
@ -18897,11 +19018,23 @@ namespace Firebird
}
}
static IReplicatedTransaction* CLOOP_CARG cloopstartTransactionDispatcher(IReplicatedSession* self, ISC_INT64 number) throw()
static void CLOOP_CARG cloopsetAttachmentDispatcher(IReplicatedSession* self, IAttachment* attachment) throw()
{
try
{
return static_cast<Name*>(self)->Name::startTransaction(number);
static_cast<Name*>(self)->Name::setAttachment(attachment);
}
catch (...)
{
StatusType::catchException(0);
}
}
static IReplicatedTransaction* CLOOP_CARG cloopstartTransactionDispatcher(IReplicatedSession* self, ITransaction* transaction, ISC_INT64 number) throw()
{
try
{
return static_cast<Name*>(self)->Name::startTransaction(transaction, number);
}
catch (...)
{
@ -18936,20 +19069,58 @@ namespace Firebird
}
}
static void CLOOP_CARG cloopdisposeDispatcher(IDisposable* self) throw()
static void CLOOP_CARG cloopsetOwnerDispatcher(IPluginBase* self, IReferenceCounted* r) throw()
{
try
{
static_cast<Name*>(self)->Name::dispose();
static_cast<Name*>(self)->Name::setOwner(r);
}
catch (...)
{
StatusType::catchException(0);
}
}
static IReferenceCounted* CLOOP_CARG cloopgetOwnerDispatcher(IPluginBase* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::getOwner();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<IReferenceCounted*>(0);
}
}
static void CLOOP_CARG cloopaddRefDispatcher(IReferenceCounted* self) throw()
{
try
{
static_cast<Name*>(self)->Name::addRef();
}
catch (...)
{
StatusType::catchException(0);
}
}
static int CLOOP_CARG cloopreleaseDispatcher(IReferenceCounted* self) throw()
{
try
{
return static_cast<Name*>(self)->Name::release();
}
catch (...)
{
StatusType::catchException(0);
return static_cast<int>(0);
}
}
};
template <typename Name, typename StatusType, typename Base = IDisposableImpl<Name, StatusType, Inherit<IVersionedImpl<Name, StatusType, Inherit<IReplicatedSession> > > > >
template <typename Name, typename StatusType, typename Base = IPluginBaseImpl<Name, StatusType, Inherit<IReferenceCountedImpl<Name, StatusType, Inherit<IVersionedImpl<Name, StatusType, Inherit<IReplicatedSession> > > > > > >
class IReplicatedSessionImpl : public IReplicatedSessionBaseImpl<Name, StatusType, Base>
{
protected:
@ -18963,7 +19134,8 @@ namespace Firebird
}
virtual IStatus* getStatus() = 0;
virtual IReplicatedTransaction* startTransaction(ISC_INT64 number) = 0;
virtual void setAttachment(IAttachment* attachment) = 0;
virtual IReplicatedTransaction* startTransaction(ITransaction* transaction, ISC_INT64 number) = 0;
virtual FB_BOOLEAN cleanupTransaction(ISC_INT64 number) = 0;
virtual FB_BOOLEAN setSequence(const char* name, ISC_INT64 value) = 0;
};

View File

@ -107,8 +107,8 @@ type
IDecFloat16 = class;
IDecFloat34 = class;
IInt128 = class;
IReplicatedField = class;
IReplicatedRecord = class;
IReplicatedBlob = class;
IReplicatedTransaction = class;
IReplicatedSession = class;
@ -666,11 +666,17 @@ type
IDecFloat34_fromStringPtr = procedure(this: IDecFloat34; status: IStatus; from: PAnsiChar; to_: FB_DEC34Ptr); cdecl;
IInt128_toStringPtr = procedure(this: IInt128; status: IStatus; from: FB_I128Ptr; scale: Integer; bufferLength: Cardinal; buffer: PAnsiChar); cdecl;
IInt128_fromStringPtr = procedure(this: IInt128; status: IStatus; scale: Integer; from: PAnsiChar; to_: FB_I128Ptr); cdecl;
IReplicatedField_getNamePtr = function(this: IReplicatedField): PAnsiChar; cdecl;
IReplicatedField_getTypePtr = function(this: IReplicatedField): Cardinal; cdecl;
IReplicatedField_getSubTypePtr = function(this: IReplicatedField): Cardinal; cdecl;
IReplicatedField_getScalePtr = function(this: IReplicatedField): Cardinal; cdecl;
IReplicatedField_getLengthPtr = function(this: IReplicatedField): Cardinal; cdecl;
IReplicatedField_getCharSetPtr = function(this: IReplicatedField): Cardinal; cdecl;
IReplicatedField_getDataPtr = function(this: IReplicatedField): Pointer; cdecl;
IReplicatedRecord_getCountPtr = function(this: IReplicatedRecord): Cardinal; cdecl;
IReplicatedRecord_getFieldPtr = function(this: IReplicatedRecord; index: Cardinal): IReplicatedField; cdecl;
IReplicatedRecord_getRawLengthPtr = function(this: IReplicatedRecord): Cardinal; cdecl;
IReplicatedRecord_getRawDataPtr = function(this: IReplicatedRecord): BytePtr; cdecl;
IReplicatedBlob_getLengthPtr = function(this: IReplicatedBlob): Cardinal; cdecl;
IReplicatedBlob_isEofPtr = function(this: IReplicatedBlob): Boolean; cdecl;
IReplicatedBlob_getSegmentPtr = function(this: IReplicatedBlob; length: Cardinal; buffer: BytePtr): Cardinal; cdecl;
IReplicatedTransaction_preparePtr = function(this: IReplicatedTransaction): Boolean; cdecl;
IReplicatedTransaction_commitPtr = function(this: IReplicatedTransaction): Boolean; cdecl;
IReplicatedTransaction_rollbackPtr = function(this: IReplicatedTransaction): Boolean; cdecl;
@ -680,11 +686,11 @@ type
IReplicatedTransaction_insertRecordPtr = function(this: IReplicatedTransaction; name: PAnsiChar; record_: IReplicatedRecord): Boolean; cdecl;
IReplicatedTransaction_updateRecordPtr = function(this: IReplicatedTransaction; name: PAnsiChar; orgRecord: IReplicatedRecord; newRecord: IReplicatedRecord): Boolean; cdecl;
IReplicatedTransaction_deleteRecordPtr = function(this: IReplicatedTransaction; name: PAnsiChar; record_: IReplicatedRecord): Boolean; cdecl;
IReplicatedTransaction_storeBlobPtr = function(this: IReplicatedTransaction; blobId: ISC_QUAD; blob: IReplicatedBlob): Boolean; cdecl;
IReplicatedTransaction_executeSqlPtr = function(this: IReplicatedTransaction; sql: PAnsiChar): Boolean; cdecl;
IReplicatedTransaction_executeSqlIntlPtr = function(this: IReplicatedTransaction; charset: Cardinal; sql: PAnsiChar): Boolean; cdecl;
IReplicatedSession_getStatusPtr = function(this: IReplicatedSession): IStatus; cdecl;
IReplicatedSession_startTransactionPtr = function(this: IReplicatedSession; number: Int64): IReplicatedTransaction; cdecl;
IReplicatedSession_setAttachmentPtr = procedure(this: IReplicatedSession; attachment: IAttachment); cdecl;
IReplicatedSession_startTransactionPtr = function(this: IReplicatedSession; transaction: ITransaction; number: Int64): IReplicatedTransaction; cdecl;
IReplicatedSession_cleanupTransactionPtr = function(this: IReplicatedSession; number: Int64): Boolean; cdecl;
IReplicatedSession_setSequencePtr = function(this: IReplicatedSession; name: PAnsiChar; value: Int64): Boolean; cdecl;
@ -1053,7 +1059,8 @@ type
const TYPE_WIRE_CRYPT = Cardinal(8);
const TYPE_DB_CRYPT = Cardinal(9);
const TYPE_KEY_HOLDER = Cardinal(10);
const TYPE_COUNT = Cardinal(11);
const TYPE_REPLICATOR = Cardinal(11);
const TYPE_COUNT = Cardinal(12);
procedure registerPluginFactory(pluginType: Cardinal; defaultName: PAnsiChar; factory: IPluginFactory);
procedure registerModule(cleanup: IPluginModule);
@ -3524,7 +3531,43 @@ type
procedure fromString(status: IStatus; scale: Integer; from: PAnsiChar; to_: FB_I128Ptr); virtual; abstract;
end;
ReplicatedFieldVTable = class(VersionedVTable)
getName: IReplicatedField_getNamePtr;
getType: IReplicatedField_getTypePtr;
getSubType: IReplicatedField_getSubTypePtr;
getScale: IReplicatedField_getScalePtr;
getLength: IReplicatedField_getLengthPtr;
getCharSet: IReplicatedField_getCharSetPtr;
getData: IReplicatedField_getDataPtr;
end;
IReplicatedField = class(IVersioned)
const VERSION = 2;
function getName(): PAnsiChar;
function getType(): Cardinal;
function getSubType(): Cardinal;
function getScale(): Cardinal;
function getLength(): Cardinal;
function getCharSet(): Cardinal;
function getData(): Pointer;
end;
IReplicatedFieldImpl = class(IReplicatedField)
constructor create;
function getName(): PAnsiChar; virtual; abstract;
function getType(): Cardinal; virtual; abstract;
function getSubType(): Cardinal; virtual; abstract;
function getScale(): Cardinal; virtual; abstract;
function getLength(): Cardinal; virtual; abstract;
function getCharSet(): Cardinal; virtual; abstract;
function getData(): Pointer; virtual; abstract;
end;
ReplicatedRecordVTable = class(VersionedVTable)
getCount: IReplicatedRecord_getCountPtr;
getField: IReplicatedRecord_getFieldPtr;
getRawLength: IReplicatedRecord_getRawLengthPtr;
getRawData: IReplicatedRecord_getRawDataPtr;
end;
@ -3532,6 +3575,8 @@ type
IReplicatedRecord = class(IVersioned)
const VERSION = 2;
function getCount(): Cardinal;
function getField(index: Cardinal): IReplicatedField;
function getRawLength(): Cardinal;
function getRawData(): BytePtr;
end;
@ -3539,32 +3584,12 @@ type
IReplicatedRecordImpl = class(IReplicatedRecord)
constructor create;
function getCount(): Cardinal; virtual; abstract;
function getField(index: Cardinal): IReplicatedField; virtual; abstract;
function getRawLength(): Cardinal; virtual; abstract;
function getRawData(): BytePtr; virtual; abstract;
end;
ReplicatedBlobVTable = class(VersionedVTable)
getLength: IReplicatedBlob_getLengthPtr;
isEof: IReplicatedBlob_isEofPtr;
getSegment: IReplicatedBlob_getSegmentPtr;
end;
IReplicatedBlob = class(IVersioned)
const VERSION = 2;
function getLength(): Cardinal;
function isEof(): Boolean;
function getSegment(length: Cardinal; buffer: BytePtr): Cardinal;
end;
IReplicatedBlobImpl = class(IReplicatedBlob)
constructor create;
function getLength(): Cardinal; virtual; abstract;
function isEof(): Boolean; virtual; abstract;
function getSegment(length: Cardinal; buffer: BytePtr): Cardinal; virtual; abstract;
end;
ReplicatedTransactionVTable = class(DisposableVTable)
prepare: IReplicatedTransaction_preparePtr;
commit: IReplicatedTransaction_commitPtr;
@ -3575,7 +3600,6 @@ type
insertRecord: IReplicatedTransaction_insertRecordPtr;
updateRecord: IReplicatedTransaction_updateRecordPtr;
deleteRecord: IReplicatedTransaction_deleteRecordPtr;
storeBlob: IReplicatedTransaction_storeBlobPtr;
executeSql: IReplicatedTransaction_executeSqlPtr;
executeSqlIntl: IReplicatedTransaction_executeSqlIntlPtr;
end;
@ -3592,7 +3616,6 @@ type
function insertRecord(name: PAnsiChar; record_: IReplicatedRecord): Boolean;
function updateRecord(name: PAnsiChar; orgRecord: IReplicatedRecord; newRecord: IReplicatedRecord): Boolean;
function deleteRecord(name: PAnsiChar; record_: IReplicatedRecord): Boolean;
function storeBlob(blobId: ISC_QUAD; blob: IReplicatedBlob): Boolean;
function executeSql(sql: PAnsiChar): Boolean;
function executeSqlIntl(charset: Cardinal; sql: PAnsiChar): Boolean;
end;
@ -3610,23 +3633,24 @@ type
function insertRecord(name: PAnsiChar; record_: IReplicatedRecord): Boolean; virtual; abstract;
function updateRecord(name: PAnsiChar; orgRecord: IReplicatedRecord; newRecord: IReplicatedRecord): Boolean; virtual; abstract;
function deleteRecord(name: PAnsiChar; record_: IReplicatedRecord): Boolean; virtual; abstract;
function storeBlob(blobId: ISC_QUAD; blob: IReplicatedBlob): Boolean; virtual; abstract;
function executeSql(sql: PAnsiChar): Boolean; virtual; abstract;
function executeSqlIntl(charset: Cardinal; sql: PAnsiChar): Boolean; virtual; abstract;
end;
ReplicatedSessionVTable = class(DisposableVTable)
ReplicatedSessionVTable = class(PluginBaseVTable)
getStatus: IReplicatedSession_getStatusPtr;
setAttachment: IReplicatedSession_setAttachmentPtr;
startTransaction: IReplicatedSession_startTransactionPtr;
cleanupTransaction: IReplicatedSession_cleanupTransactionPtr;
setSequence: IReplicatedSession_setSequencePtr;
end;
IReplicatedSession = class(IDisposable)
const VERSION = 3;
IReplicatedSession = class(IPluginBase)
const VERSION = 4;
function getStatus(): IStatus;
function startTransaction(number: Int64): IReplicatedTransaction;
procedure setAttachment(attachment: IAttachment);
function startTransaction(transaction: ITransaction; number: Int64): IReplicatedTransaction;
function cleanupTransaction(number: Int64): Boolean;
function setSequence(name: PAnsiChar; value: Int64): Boolean;
end;
@ -3634,9 +3658,13 @@ type
IReplicatedSessionImpl = class(IReplicatedSession)
constructor create;
procedure dispose(); virtual; abstract;
procedure addRef(); virtual; abstract;
function release(): Integer; virtual; abstract;
procedure setOwner(r: IReferenceCounted); virtual; abstract;
function getOwner(): IReferenceCounted; virtual; abstract;
function getStatus(): IStatus; virtual; abstract;
function startTransaction(number: Int64): IReplicatedTransaction; virtual; abstract;
procedure setAttachment(attachment: IAttachment); virtual; abstract;
function startTransaction(transaction: ITransaction; number: Int64): IReplicatedTransaction; virtual; abstract;
function cleanupTransaction(number: Int64): Boolean; virtual; abstract;
function setSequence(name: PAnsiChar; value: Int64): Boolean; virtual; abstract;
end;
@ -8099,6 +8127,51 @@ begin
FbException.checkException(status);
end;
function IReplicatedField.getName(): PAnsiChar;
begin
Result := ReplicatedFieldVTable(vTable).getName(Self);
end;
function IReplicatedField.getType(): Cardinal;
begin
Result := ReplicatedFieldVTable(vTable).getType(Self);
end;
function IReplicatedField.getSubType(): Cardinal;
begin
Result := ReplicatedFieldVTable(vTable).getSubType(Self);
end;
function IReplicatedField.getScale(): Cardinal;
begin
Result := ReplicatedFieldVTable(vTable).getScale(Self);
end;
function IReplicatedField.getLength(): Cardinal;
begin
Result := ReplicatedFieldVTable(vTable).getLength(Self);
end;
function IReplicatedField.getCharSet(): Cardinal;
begin
Result := ReplicatedFieldVTable(vTable).getCharSet(Self);
end;
function IReplicatedField.getData(): Pointer;
begin
Result := ReplicatedFieldVTable(vTable).getData(Self);
end;
function IReplicatedRecord.getCount(): Cardinal;
begin
Result := ReplicatedRecordVTable(vTable).getCount(Self);
end;
function IReplicatedRecord.getField(index: Cardinal): IReplicatedField;
begin
Result := ReplicatedRecordVTable(vTable).getField(Self, index);
end;
function IReplicatedRecord.getRawLength(): Cardinal;
begin
Result := ReplicatedRecordVTable(vTable).getRawLength(Self);
@ -8109,21 +8182,6 @@ begin
Result := ReplicatedRecordVTable(vTable).getRawData(Self);
end;
function IReplicatedBlob.getLength(): Cardinal;
begin
Result := ReplicatedBlobVTable(vTable).getLength(Self);
end;
function IReplicatedBlob.isEof(): Boolean;
begin
Result := ReplicatedBlobVTable(vTable).isEof(Self);
end;
function IReplicatedBlob.getSegment(length: Cardinal; buffer: BytePtr): Cardinal;
begin
Result := ReplicatedBlobVTable(vTable).getSegment(Self, length, buffer);
end;
function IReplicatedTransaction.prepare(): Boolean;
begin
Result := ReplicatedTransactionVTable(vTable).prepare(Self);
@ -8169,11 +8227,6 @@ begin
Result := ReplicatedTransactionVTable(vTable).deleteRecord(Self, name, record_);
end;
function IReplicatedTransaction.storeBlob(blobId: ISC_QUAD; blob: IReplicatedBlob): Boolean;
begin
Result := ReplicatedTransactionVTable(vTable).storeBlob(Self, blobId, blob);
end;
function IReplicatedTransaction.executeSql(sql: PAnsiChar): Boolean;
begin
Result := ReplicatedTransactionVTable(vTable).executeSql(Self, sql);
@ -8189,9 +8242,14 @@ begin
Result := ReplicatedSessionVTable(vTable).getStatus(Self);
end;
function IReplicatedSession.startTransaction(number: Int64): IReplicatedTransaction;
procedure IReplicatedSession.setAttachment(attachment: IAttachment);
begin
Result := ReplicatedSessionVTable(vTable).startTransaction(Self, number);
ReplicatedSessionVTable(vTable).setAttachment(Self, attachment);
end;
function IReplicatedSession.startTransaction(transaction: ITransaction; number: Int64): IReplicatedTransaction;
begin
Result := ReplicatedSessionVTable(vTable).startTransaction(Self, transaction, number);
end;
function IReplicatedSession.cleanupTransaction(number: Int64): Boolean;
@ -14233,6 +14291,95 @@ begin
vTable := IInt128Impl_vTable;
end;
function IReplicatedFieldImpl_getNameDispatcher(this: IReplicatedField): PAnsiChar; cdecl;
begin
try
Result := IReplicatedFieldImpl(this).getName();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedFieldImpl_getTypeDispatcher(this: IReplicatedField): Cardinal; cdecl;
begin
try
Result := IReplicatedFieldImpl(this).getType();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedFieldImpl_getSubTypeDispatcher(this: IReplicatedField): Cardinal; cdecl;
begin
try
Result := IReplicatedFieldImpl(this).getSubType();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedFieldImpl_getScaleDispatcher(this: IReplicatedField): Cardinal; cdecl;
begin
try
Result := IReplicatedFieldImpl(this).getScale();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedFieldImpl_getLengthDispatcher(this: IReplicatedField): Cardinal; cdecl;
begin
try
Result := IReplicatedFieldImpl(this).getLength();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedFieldImpl_getCharSetDispatcher(this: IReplicatedField): Cardinal; cdecl;
begin
try
Result := IReplicatedFieldImpl(this).getCharSet();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedFieldImpl_getDataDispatcher(this: IReplicatedField): Pointer; cdecl;
begin
try
Result := IReplicatedFieldImpl(this).getData();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
var
IReplicatedFieldImpl_vTable: ReplicatedFieldVTable;
constructor IReplicatedFieldImpl.create;
begin
vTable := IReplicatedFieldImpl_vTable;
end;
function IReplicatedRecordImpl_getCountDispatcher(this: IReplicatedRecord): Cardinal; cdecl;
begin
try
Result := IReplicatedRecordImpl(this).getCount();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedRecordImpl_getFieldDispatcher(this: IReplicatedRecord; index: Cardinal): IReplicatedField; cdecl;
begin
try
Result := IReplicatedRecordImpl(this).getField(index);
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedRecordImpl_getRawLengthDispatcher(this: IReplicatedRecord): Cardinal; cdecl;
begin
try
@ -14259,41 +14406,6 @@ begin
vTable := IReplicatedRecordImpl_vTable;
end;
function IReplicatedBlobImpl_getLengthDispatcher(this: IReplicatedBlob): Cardinal; cdecl;
begin
try
Result := IReplicatedBlobImpl(this).getLength();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedBlobImpl_isEofDispatcher(this: IReplicatedBlob): Boolean; cdecl;
begin
try
Result := IReplicatedBlobImpl(this).isEof();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedBlobImpl_getSegmentDispatcher(this: IReplicatedBlob; length: Cardinal; buffer: BytePtr): Cardinal; cdecl;
begin
try
Result := IReplicatedBlobImpl(this).getSegment(length, buffer);
except
on e: Exception do FbException.catchException(nil, e);
end
end;
var
IReplicatedBlobImpl_vTable: ReplicatedBlobVTable;
constructor IReplicatedBlobImpl.create;
begin
vTable := IReplicatedBlobImpl_vTable;
end;
procedure IReplicatedTransactionImpl_disposeDispatcher(this: IReplicatedTransaction); cdecl;
begin
try
@ -14384,15 +14496,6 @@ begin
end
end;
function IReplicatedTransactionImpl_storeBlobDispatcher(this: IReplicatedTransaction; blobId: ISC_QUAD; blob: IReplicatedBlob): Boolean; cdecl;
begin
try
Result := IReplicatedTransactionImpl(this).storeBlob(blobId, blob);
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedTransactionImpl_executeSqlDispatcher(this: IReplicatedTransaction; sql: PAnsiChar): Boolean; cdecl;
begin
try
@ -14419,10 +14522,37 @@ begin
vTable := IReplicatedTransactionImpl_vTable;
end;
procedure IReplicatedSessionImpl_disposeDispatcher(this: IReplicatedSession); cdecl;
procedure IReplicatedSessionImpl_addRefDispatcher(this: IReplicatedSession); cdecl;
begin
try
IReplicatedSessionImpl(this).dispose();
IReplicatedSessionImpl(this).addRef();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedSessionImpl_releaseDispatcher(this: IReplicatedSession): Integer; cdecl;
begin
try
Result := IReplicatedSessionImpl(this).release();
except
on e: Exception do FbException.catchException(nil, e);
end
end;
procedure IReplicatedSessionImpl_setOwnerDispatcher(this: IReplicatedSession; r: IReferenceCounted); cdecl;
begin
try
IReplicatedSessionImpl(this).setOwner(r);
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedSessionImpl_getOwnerDispatcher(this: IReplicatedSession): IReferenceCounted; cdecl;
begin
try
Result := IReplicatedSessionImpl(this).getOwner();
except
on e: Exception do FbException.catchException(nil, e);
end
@ -14437,10 +14567,19 @@ begin
end
end;
function IReplicatedSessionImpl_startTransactionDispatcher(this: IReplicatedSession; number: Int64): IReplicatedTransaction; cdecl;
procedure IReplicatedSessionImpl_setAttachmentDispatcher(this: IReplicatedSession; attachment: IAttachment); cdecl;
begin
try
Result := IReplicatedSessionImpl(this).startTransaction(number);
IReplicatedSessionImpl(this).setAttachment(attachment);
except
on e: Exception do FbException.catchException(nil, e);
end
end;
function IReplicatedSessionImpl_startTransactionDispatcher(this: IReplicatedSession; transaction: ITransaction; number: Int64): IReplicatedTransaction; cdecl;
begin
try
Result := IReplicatedSessionImpl(this).startTransaction(transaction, number);
except
on e: Exception do FbException.catchException(nil, e);
end
@ -15378,17 +15517,23 @@ initialization
IInt128Impl_vTable.toString := @IInt128Impl_toStringDispatcher;
IInt128Impl_vTable.fromString := @IInt128Impl_fromStringDispatcher;
IReplicatedFieldImpl_vTable := ReplicatedFieldVTable.create;
IReplicatedFieldImpl_vTable.version := 2;
IReplicatedFieldImpl_vTable.getName := @IReplicatedFieldImpl_getNameDispatcher;
IReplicatedFieldImpl_vTable.getType := @IReplicatedFieldImpl_getTypeDispatcher;
IReplicatedFieldImpl_vTable.getSubType := @IReplicatedFieldImpl_getSubTypeDispatcher;
IReplicatedFieldImpl_vTable.getScale := @IReplicatedFieldImpl_getScaleDispatcher;
IReplicatedFieldImpl_vTable.getLength := @IReplicatedFieldImpl_getLengthDispatcher;
IReplicatedFieldImpl_vTable.getCharSet := @IReplicatedFieldImpl_getCharSetDispatcher;
IReplicatedFieldImpl_vTable.getData := @IReplicatedFieldImpl_getDataDispatcher;
IReplicatedRecordImpl_vTable := ReplicatedRecordVTable.create;
IReplicatedRecordImpl_vTable.version := 2;
IReplicatedRecordImpl_vTable.getCount := @IReplicatedRecordImpl_getCountDispatcher;
IReplicatedRecordImpl_vTable.getField := @IReplicatedRecordImpl_getFieldDispatcher;
IReplicatedRecordImpl_vTable.getRawLength := @IReplicatedRecordImpl_getRawLengthDispatcher;
IReplicatedRecordImpl_vTable.getRawData := @IReplicatedRecordImpl_getRawDataDispatcher;
IReplicatedBlobImpl_vTable := ReplicatedBlobVTable.create;
IReplicatedBlobImpl_vTable.version := 2;
IReplicatedBlobImpl_vTable.getLength := @IReplicatedBlobImpl_getLengthDispatcher;
IReplicatedBlobImpl_vTable.isEof := @IReplicatedBlobImpl_isEofDispatcher;
IReplicatedBlobImpl_vTable.getSegment := @IReplicatedBlobImpl_getSegmentDispatcher;
IReplicatedTransactionImpl_vTable := ReplicatedTransactionVTable.create;
IReplicatedTransactionImpl_vTable.version := 3;
IReplicatedTransactionImpl_vTable.dispose := @IReplicatedTransactionImpl_disposeDispatcher;
@ -15401,14 +15546,17 @@ initialization
IReplicatedTransactionImpl_vTable.insertRecord := @IReplicatedTransactionImpl_insertRecordDispatcher;
IReplicatedTransactionImpl_vTable.updateRecord := @IReplicatedTransactionImpl_updateRecordDispatcher;
IReplicatedTransactionImpl_vTable.deleteRecord := @IReplicatedTransactionImpl_deleteRecordDispatcher;
IReplicatedTransactionImpl_vTable.storeBlob := @IReplicatedTransactionImpl_storeBlobDispatcher;
IReplicatedTransactionImpl_vTable.executeSql := @IReplicatedTransactionImpl_executeSqlDispatcher;
IReplicatedTransactionImpl_vTable.executeSqlIntl := @IReplicatedTransactionImpl_executeSqlIntlDispatcher;
IReplicatedSessionImpl_vTable := ReplicatedSessionVTable.create;
IReplicatedSessionImpl_vTable.version := 3;
IReplicatedSessionImpl_vTable.dispose := @IReplicatedSessionImpl_disposeDispatcher;
IReplicatedSessionImpl_vTable.version := 4;
IReplicatedSessionImpl_vTable.addRef := @IReplicatedSessionImpl_addRefDispatcher;
IReplicatedSessionImpl_vTable.release := @IReplicatedSessionImpl_releaseDispatcher;
IReplicatedSessionImpl_vTable.setOwner := @IReplicatedSessionImpl_setOwnerDispatcher;
IReplicatedSessionImpl_vTable.getOwner := @IReplicatedSessionImpl_getOwnerDispatcher;
IReplicatedSessionImpl_vTable.getStatus := @IReplicatedSessionImpl_getStatusDispatcher;
IReplicatedSessionImpl_vTable.setAttachment := @IReplicatedSessionImpl_setAttachmentDispatcher;
IReplicatedSessionImpl_vTable.startTransaction := @IReplicatedSessionImpl_startTransactionDispatcher;
IReplicatedSessionImpl_vTable.cleanupTransaction := @IReplicatedSessionImpl_cleanupTransactionDispatcher;
IReplicatedSessionImpl_vTable.setSequence := @IReplicatedSessionImpl_setSequenceDispatcher;
@ -15505,8 +15653,8 @@ finalization
IDecFloat16Impl_vTable.destroy;
IDecFloat34Impl_vTable.destroy;
IInt128Impl_vTable.destroy;
IReplicatedFieldImpl_vTable.destroy;
IReplicatedRecordImpl_vTable.destroy;
IReplicatedBlobImpl_vTable.destroy;
IReplicatedTransactionImpl_vTable.destroy;
IReplicatedSessionImpl_vTable.destroy;

View File

@ -472,7 +472,7 @@ public:
USHORT att_original_timezone;
USHORT att_current_timezone;
Firebird::IReplicatedSession* att_replicator;
Firebird::RefPtr<Firebird::IReplicatedSession> att_replicator;
Firebird::AutoPtr<Replication::TableMatcher> att_repl_matcher;
Firebird::AutoPtr<Applier> att_repl_applier;

View File

@ -322,9 +322,6 @@ namespace Jrd
bool Database::isReplicating(thread_db* tdbb)
{
if (!replManager())
return false;
Sync sync(&dbb_repl_sync, FB_FUNCTION);
sync.lock(SYNC_SHARED);
@ -499,12 +496,12 @@ namespace Jrd
return m_eventMgr;
}
Replication::Manager* Database::GlobalObjectHolder::getReplManager()
Replication::Manager* Database::GlobalObjectHolder::getReplManager(bool create)
{
if (!m_replConfig)
return nullptr;
if (!m_replMgr)
if (!m_replMgr && create)
{
MutexLockGuard guard(m_mutex, FB_FUNCTION);

View File

@ -303,7 +303,11 @@ class Database : public pool_alloc<type_dbb>
LockManager* getLockManager();
EventManager* getEventManager();
Replication::Manager* getReplManager();
Replication::Manager* getReplManager(bool create);
const Replication::Config* getReplConfig()
{
return m_replConfig.get();
}
private:
const Firebird::string m_id;
@ -657,9 +661,14 @@ public:
return dbb_gblobj_holder->getEventManager();
}
Replication::Manager* replManager()
Replication::Manager* replManager(bool create = false)
{
return dbb_gblobj_holder->getReplManager();
return dbb_gblobj_holder->getReplManager(create);
}
const Replication::Config* replConfig()
{
return dbb_gblobj_holder->getReplConfig();
}
private:

View File

@ -7410,8 +7410,7 @@ void release_attachment(thread_db* tdbb, Jrd::Attachment* attachment)
if (!attachment)
return;
if (attachment->att_replicator)
attachment->att_replicator->dispose();
attachment->att_replicator = nullptr;
if (attachment->att_repl_applier)
attachment->att_repl_applier->shutdown(tdbb);

View File

@ -489,7 +489,7 @@ const ULONG TDBB_wait_cancel_disable = 512; // don't cancel current waiting ope
const ULONG TDBB_cache_unwound = 1024; // page cache was unwound
const ULONG TDBB_reset_stack = 2048; // stack should be reset after stack overflow exception
const ULONG TDBB_dfw_cleanup = 4096; // DFW cleanup phase is active
const ULONG TDBB_repl_sql = 8192; // SQL statement is being replicated
const ULONG TDBB_repl_in_progress = 8192; // Prevent recursion in replication
const ULONG TDBB_replicator = 16384; // Replicator
class thread_db : public Firebird::ThreadData

View File

@ -99,7 +99,11 @@ Config::Config()
logSourceDirectory(getPool()),
verboseLogging(false),
applyIdleTimeout(DEFAULT_APPLY_IDLE_TIMEOUT),
applyErrorTimeout(DEFAULT_APPLY_ERROR_TIMEOUT)
applyErrorTimeout(DEFAULT_APPLY_ERROR_TIMEOUT),
pluginName(getPool()),
log_on_error(true),
disable_on_error(true),
throw_on_error(false)
{
sourceGuid.alignment = 0;
}
@ -121,7 +125,11 @@ Config::Config(const Config& other)
logSourceDirectory(getPool(), other.logSourceDirectory),
verboseLogging(other.verboseLogging),
applyIdleTimeout(other.applyIdleTimeout),
applyErrorTimeout(other.applyErrorTimeout)
applyErrorTimeout(other.applyErrorTimeout),
pluginName(other.pluginName),
log_on_error(other.log_on_error),
disable_on_error(other.disable_on_error),
throw_on_error(other.throw_on_error)
{
sourceGuid.alignment = 0;
}
@ -166,6 +174,8 @@ Config* Config::get(const PathName& lookupName)
if (dbName != lookupName)
continue;
config->dbName = dbName;
exactMatch = true;
}
@ -232,27 +242,48 @@ Config* Config::get(const PathName& lookupName)
{
parseLong(value, config->logArchiveTimeout);
}
else if (key == "plugin")
{
config->pluginName = value;
}
else if (key == "log_on_error")
{
parseBoolean(value, config->log_on_error);
}
else if (key == "disable_on_error")
{
parseBoolean(value, config->disable_on_error);
}
else if (key == "throw_on_error")
{
parseBoolean(value, config->throw_on_error);
}
}
}
if (!exactMatch)
continue;
if (exactMatch)
break;
if (config->logDirectory.hasData() || config->syncReplicas.hasData())
}
// TODO: As soon as plugin name is moved into RDB$PUBLICATIONS delay config parse until real replication start
if (config->pluginName.hasData())
{
return config.release();
}
if (config->logDirectory.hasData() || config->syncReplicas.hasData())
{
// If log_directory is specified, then replication is enabled
if (config->logFilePrefix.isEmpty())
{
// If log_directory is specified, then replication is enabled
if (config->logFilePrefix.isEmpty())
{
PathName db_directory, db_filename;
PathUtils::splitLastComponent(db_directory, db_filename, dbName);
config->logFilePrefix = db_filename;
}
config->dbName = dbName;
return config.release();
PathName db_directory, db_filename;
PathUtils::splitLastComponent(db_directory, db_filename, config->dbName);
config->logFilePrefix = db_filename;
}
return config.release();
}
return NULL;

View File

@ -57,6 +57,10 @@ namespace Replication
bool verboseLogging;
ULONG applyIdleTimeout;
ULONG applyErrorTimeout;
Firebird::string pluginName;
bool log_on_error;
bool disable_on_error;
bool throw_on_error;
};
};

View File

@ -71,12 +71,6 @@ namespace Replication
const Replication::Config* config);
~Manager();
TableMatcher* createTableMatcher(MemoryPool& pool)
{
return FB_NEW_POOL(pool)
TableMatcher(pool, m_config->includeFilter, m_config->excludeFilter);
}
Firebird::UCharBuffer* getBuffer();
void releaseBuffer(Firebird::UCharBuffer* buffer);

View File

@ -48,26 +48,48 @@ namespace
// should be replicated similar to user-defined ones
const int BACKUP_HISTORY_GENERATOR = 9;
const char* LOG_ERROR_MSG = "Replication is stopped due to critical error(s)";
const char* LOG_ERROR_MSG = "Replication error";
const char* LOG_WARNING_MSG = "Replication warning";
void handleError(thread_db* tdbb, jrd_tra* transaction = NULL)
void handleError(thread_db* tdbb, jrd_tra* transaction = NULL, bool canThrow = true)
{
const auto dbb = tdbb->getDatabase();
const auto attachment = tdbb->getAttachment();
fb_assert(attachment->att_replicator);
const auto config = dbb->replConfig();
fb_assert(attachment->att_replicator.hasData());
if (transaction && transaction->tra_replicator)
if (transaction && transaction->tra_replicator && config->disable_on_error && canThrow)
{
// If we cannot throw then calling routine will take care of the replicator
transaction->tra_replicator->dispose();
transaction->tra_replicator = NULL;
transaction->tra_flags &= ~TRA_replicating;
}
const auto status = attachment->att_replicator->getStatus();
if (status->getState() & IStatus::STATE_ERRORS)
const auto state = status->getState();
if (state & IStatus::STATE_WARNINGS)
{
string msg;
msg.printf("Database: %s\n\t%s", dbb->dbb_filename.c_str(), LOG_ERROR_MSG);
iscLogStatus(msg.c_str(), status);
if (config->log_on_error)
{
string msg;
msg.printf("Database: %s\n\t%s", dbb->dbb_filename.c_str(), LOG_WARNING_MSG);
iscLogStatus(msg.c_str(), status);
}
}
if (state & IStatus::STATE_ERRORS)
{
if (config->log_on_error)
{
string msg;
msg.printf("Database: %s\n\t%s", dbb->dbb_filename.c_str(), LOG_ERROR_MSG);
iscLogStatus(msg.c_str(), status);
}
if (config->throw_on_error && canThrow)
{
status_exception::raise(status);
}
}
}
@ -85,11 +107,7 @@ namespace
const auto dbb = tdbb->getDatabase();
if (!dbb->isReplicating(tdbb))
{
if (attachment->att_replicator)
{
attachment->att_replicator->dispose();
attachment->att_replicator = NULL;
}
attachment->att_replicator = nullptr;
return NULL;
}
@ -98,22 +116,44 @@ namespace
if (!attachment->att_replicator)
{
auto& pool = *attachment->att_pool;
const auto manager = dbb->replManager();
const auto& guid = dbb->dbb_guid;
const auto& userName = attachment->att_user->getUserName();
const auto config = dbb->replConfig();
attachment->att_replicator = (IReplicatedSession*) FB_NEW_POOL(pool)
Replicator(pool, manager, guid, userName, cleanupTransactions);
if (config->pluginName.empty())
{
if (config->logDirectory.hasData() || config->syncReplicas.hasData())
{
auto& pool = *attachment->att_pool;
const auto manager = dbb->replManager(true);
const auto& guid = dbb->dbb_guid;
const auto& userName = attachment->att_user->getUserName();
attachment->att_replicator = FB_NEW
Replicator(pool, manager, guid, userName);
}
}
else
{
GetPlugins<IReplicatedSession> plugins(IPluginManager::TYPE_REPLICATOR, config->pluginName.c_str());
if (!plugins.hasData())
{
string msg;
msg.printf("Replication plugin %s not found\n\t%s", config->pluginName.c_str(), LOG_ERROR_MSG);
logOriginMessage(dbb->dbb_filename, msg, ERROR_MSG);
return nullptr;
}
attachment->att_replicator = plugins.plugin();
}
if (attachment->att_replicator.hasData())
{
attachment->att_replicator->setAttachment(attachment->getInterface());
if (cleanupTransactions)
attachment->att_replicator->cleanupTransaction(0);
}
}
fb_assert(attachment->att_replicator);
fb_assert(attachment->att_replicator.hasData());
// Disable replication after errors
const auto status = attachment->att_replicator->getStatus();
if (status->getState() & IStatus::STATE_ERRORS)
return NULL;
// No need to check for errors here: fresh replicator cannot have any, used one can have dirty status.
return attachment->att_replicator;
}
@ -145,7 +185,7 @@ namespace
if (!transaction->tra_replicator &&
(transaction->tra_flags & TRA_replicating))
{
transaction->tra_replicator = replicator->startTransaction(transaction->tra_number);
transaction->tra_replicator = replicator->startTransaction(transaction->getInterface(true), transaction->tra_number);
if (!transaction->tra_replicator)
handleError(tdbb, transaction);
@ -226,12 +266,15 @@ namespace
}
class ReplicatedRecordImpl :
public AutoIface<IReplicatedRecordImpl<ReplicatedRecordImpl, CheckStatusWrapper> >
public AutoIface<IReplicatedRecordImpl<ReplicatedRecordImpl, CheckStatusWrapper> >,
public IReplicatedFieldImpl<ReplicatedRecordImpl, CheckStatusWrapper>
{
public:
ReplicatedRecordImpl(thread_db* tdbb, const Record* record)
ReplicatedRecordImpl(thread_db* tdbb, const jrd_rel* relation, const Record* record)
: //m_tdbb(tdbb),
m_record(record)
m_record(record),
m_format(record->getFormat()),
m_relation(relation)
{
}
@ -239,12 +282,72 @@ namespace
{
}
unsigned getRawLength()
unsigned getCount() override
{
return m_format->fmt_count;
}
const char* getName() override
{
jrd_fld* field = MET_get_field(m_relation, m_fieldIndex);
if (field == nullptr)
return nullptr;
return field->fld_name.c_str();
}
unsigned getType() override
{
return m_fieldType;
}
unsigned getSubType() override
{
return m_format->fmt_desc[m_fieldIndex].getSubType();
}
unsigned getScale() override
{
return m_format->fmt_desc[m_fieldIndex].dsc_scale;
}
unsigned getLength() override
{
return m_fieldLength;
}
unsigned getCharSet() override
{
return m_format->fmt_desc[m_fieldIndex].getCharSet();
}
const void* getData() override
{
if (m_record->isNull(m_fieldIndex))
return nullptr;
return m_record->getData() + (IPTR)(m_format->fmt_desc[m_fieldIndex].dsc_address);
}
IReplicatedField* getField(unsigned index) override
{
if (index >= m_format->fmt_count)
return nullptr;
m_fieldIndex = index;
SLONG dummySubtype, dummyScale;
m_format->fmt_desc[m_fieldIndex].getSqlInfo(&m_fieldLength, &dummySubtype, &dummyScale, &m_fieldType);
return this;
}
unsigned getRawLength() override
{
return m_record->getLength();
}
const unsigned char* getRawData()
const unsigned char* getRawData() override
{
return m_record->getData();
}
@ -252,48 +355,11 @@ namespace
private:
//thread_db* const m_tdbb;
const Record* const m_record;
};
class ReplicatedBlobImpl :
public AutoIface<IReplicatedBlobImpl<ReplicatedBlobImpl, CheckStatusWrapper> >
{
public:
ReplicatedBlobImpl(thread_db* tdbb, jrd_tra* transaction, const bid* blobId) :
m_tdbb(tdbb), m_blob(blb::open(tdbb, transaction, blobId))
{
}
~ReplicatedBlobImpl()
{
m_blob->BLB_close(m_tdbb);
}
unsigned getLength()
{
return m_blob->blb_length;
}
FB_BOOLEAN isEof()
{
return (m_blob->blb_flags & BLB_eof);
}
unsigned getSegment(unsigned length, unsigned char* buffer)
{
auto p = buffer;
if (length && !(m_blob->blb_flags & BLB_eof))
{
const auto n = (USHORT) MIN(length, MAX_USHORT);
p += m_blob->BLB_get_segment(m_tdbb, p, n);
}
return (unsigned) (p - buffer);
}
private:
thread_db* const m_tdbb;
blb* const m_blob;
const Format* m_format; // Optimization
const jrd_rel* const m_relation;
unsigned m_fieldIndex = 0;
SLONG m_fieldLength = 0;
SLONG m_fieldType = 0;
};
}
@ -303,21 +369,26 @@ void REPL_attach(thread_db* tdbb, bool cleanupTransactions)
const auto dbb = tdbb->getDatabase();
const auto attachment = tdbb->getAttachment();
const auto replMgr = dbb->replManager();
if (!replMgr)
const auto replConfig = dbb->replConfig();
if (!replConfig)
return;
fb_assert(!attachment->att_repl_matcher);
auto& pool = *attachment->att_pool;
attachment->att_repl_matcher = replMgr->createTableMatcher(pool);
attachment->att_repl_matcher = FB_NEW_POOL(pool)
TableMatcher(pool, replConfig->includeFilter, replConfig->excludeFilter);
fb_assert(!attachment->att_replicator);
getReplicator(tdbb, cleanupTransactions);
if (cleanupTransactions)
getReplicator(tdbb, cleanupTransactions);
// else defer creation of replicator till really needed
}
void REPL_trans_prepare(thread_db* tdbb, jrd_tra* transaction)
{
const auto replicator = getReplicator(tdbb, transaction);
// There is no need to call getReplicator() and make it to create a new ReplicatedTransaction
// just to end it up at once.
const auto replicator = transaction->tra_replicator;
if (!replicator)
return;
@ -327,26 +398,33 @@ void REPL_trans_prepare(thread_db* tdbb, jrd_tra* transaction)
void REPL_trans_commit(thread_db* tdbb, jrd_tra* transaction)
{
const auto replicator = getReplicator(tdbb, transaction);
const auto replicator = transaction->tra_replicator;
if (!replicator)
return;
if (!replicator->commit())
handleError(tdbb, transaction);
{
handleError(tdbb, transaction, false);
}
transaction->tra_replicator = NULL;
replicator->dispose();
transaction->tra_replicator = nullptr;
}
void REPL_trans_rollback(thread_db* tdbb, jrd_tra* transaction)
{
const auto replicator = getReplicator(tdbb, transaction);
const auto replicator = transaction->tra_replicator;
if (!replicator)
return;
if (!replicator->rollback())
handleError(tdbb, transaction);
{
// Rollback is a terminal routine, we cannot throw here
handleError(tdbb, transaction, false);
}
transaction->tra_replicator = NULL;
replicator->dispose();
transaction->tra_replicator = nullptr;
}
void REPL_trans_cleanup(Jrd::thread_db* tdbb, TraNumber number)
@ -362,16 +440,16 @@ void REPL_trans_cleanup(Jrd::thread_db* tdbb, TraNumber number)
void REPL_save_cleanup(thread_db* tdbb, jrd_tra* transaction,
const Savepoint* savepoint, bool undo)
{
if (tdbb->tdbb_flags & (TDBB_dont_post_dfw | TDBB_repl_sql))
return;
const auto replicator = getReplicator(tdbb, transaction);
if (!replicator)
if (tdbb->tdbb_flags & (TDBB_dont_post_dfw | TDBB_repl_in_progress))
return;
if (!transaction->tra_save_point->isReplicated())
return;
const auto replicator = transaction->tra_replicator;
if (!replicator)
return;
if (undo)
{
if (!replicator->rollbackSavepoint())
@ -386,7 +464,7 @@ void REPL_save_cleanup(thread_db* tdbb, jrd_tra* transaction,
void REPL_store(thread_db* tdbb, const record_param* rpb, jrd_tra* transaction)
{
if (tdbb->tdbb_flags & (TDBB_dont_post_dfw | TDBB_repl_sql))
if (tdbb->tdbb_flags & (TDBB_dont_post_dfw | TDBB_repl_in_progress))
return;
const auto relation = rpb->rpb_relation;
@ -413,37 +491,12 @@ void REPL_store(thread_db* tdbb, const record_param* rpb, jrd_tra* transaction)
// This temporary auto-pointer is just to delete a temporary record
AutoPtr<Record> cleanupRecord(record != rpb->rpb_record ? record : NULL);
const auto format = record->getFormat();
UCharBuffer buffer;
for (auto id = 0; id < format->fmt_count; id++)
{
dsc desc;
if (DTYPE_IS_BLOB(format->fmt_desc[id].dsc_dtype) &&
EVL_field(NULL, record, id, &desc))
{
const auto destination = (bid*) desc.dsc_address;
if (!destination->isEmpty())
{
const auto blobId = *(ISC_QUAD*) desc.dsc_address;
ReplicatedBlobImpl replBlob(tdbb, transaction, destination);
if (!replicator->storeBlob(blobId, &replBlob))
{
handleError(tdbb, transaction);
return;
}
}
}
}
AutoSetRestoreFlag<ULONG> noRecursion(&tdbb->tdbb_flags, TDBB_repl_in_progress, true);
if (!ensureSavepoints(tdbb, transaction))
return;
ReplicatedRecordImpl replRecord(tdbb, record);
ReplicatedRecordImpl replRecord(tdbb, relation, record);
if (!replicator->insertRecord(relation->rel_name.c_str(), &replRecord))
handleError(tdbb, transaction);
@ -452,7 +505,7 @@ void REPL_store(thread_db* tdbb, const record_param* rpb, jrd_tra* transaction)
void REPL_modify(thread_db* tdbb, const record_param* orgRpb,
const record_param* newRpb, jrd_tra* transaction)
{
if (tdbb->tdbb_flags & (TDBB_dont_post_dfw | TDBB_repl_sql))
if (tdbb->tdbb_flags & (TDBB_dont_post_dfw | TDBB_repl_in_progress))
return;
const auto relation = newRpb->rpb_relation;
@ -494,37 +547,13 @@ void REPL_modify(thread_db* tdbb, const record_param* orgRpb,
return;
}
const auto format = newRecord->getFormat();
UCharBuffer buffer;
for (auto id = 0; id < format->fmt_count; id++)
{
dsc desc;
if (DTYPE_IS_BLOB(format->fmt_desc[id].dsc_dtype) &&
EVL_field(NULL, newRecord, id, &desc))
{
const auto destination = (bid*) desc.dsc_address;
if (!destination->isEmpty())
{
const auto blobId = *(ISC_QUAD*) desc.dsc_address;
ReplicatedBlobImpl replBlob(tdbb, transaction, destination);
if (!replicator->storeBlob(blobId, &replBlob))
{
handleError(tdbb, transaction);
return;
}
}
}
}
AutoSetRestoreFlag<ULONG> noRecursion(&tdbb->tdbb_flags, TDBB_repl_in_progress, true);
if (!ensureSavepoints(tdbb, transaction))
return;
ReplicatedRecordImpl replOrgRecord(tdbb, orgRecord);
ReplicatedRecordImpl replNewRecord(tdbb, newRecord);
ReplicatedRecordImpl replOrgRecord(tdbb, relation, orgRecord);
ReplicatedRecordImpl replNewRecord(tdbb, relation, newRecord);
if (!replicator->updateRecord(relation->rel_name.c_str(), &replOrgRecord, &replNewRecord))
handleError(tdbb, transaction);
@ -533,7 +562,7 @@ void REPL_modify(thread_db* tdbb, const record_param* orgRpb,
void REPL_erase(thread_db* tdbb, const record_param* rpb, jrd_tra* transaction)
{
if (tdbb->tdbb_flags & (TDBB_dont_post_dfw | TDBB_repl_sql))
if (tdbb->tdbb_flags & (TDBB_dont_post_dfw | TDBB_repl_in_progress))
return;
const auto relation = rpb->rpb_relation;
@ -560,11 +589,12 @@ void REPL_erase(thread_db* tdbb, const record_param* rpb, jrd_tra* transaction)
// This temporary auto-pointer is just to delete a temporary record
AutoPtr<Record> cleanupRecord(record != rpb->rpb_record ? record : NULL);
AutoSetRestoreFlag<ULONG> noRecursion(&tdbb->tdbb_flags, TDBB_repl_in_progress, true);
if (!ensureSavepoints(tdbb, transaction))
return;
ReplicatedRecordImpl replRecord(tdbb, record);
ReplicatedRecordImpl replRecord(tdbb, relation, record);
if (!replicator->deleteRecord(relation->rel_name.c_str(), &replRecord))
handleError(tdbb, transaction);
@ -572,7 +602,7 @@ void REPL_erase(thread_db* tdbb, const record_param* rpb, jrd_tra* transaction)
void REPL_gen_id(thread_db* tdbb, SLONG genId, SINT64 value)
{
if (tdbb->tdbb_flags & (TDBB_dont_post_dfw | TDBB_repl_sql))
if (tdbb->tdbb_flags & (TDBB_dont_post_dfw | TDBB_repl_in_progress))
return;
if (genId == 0) // special case: ignore RDB$GENERATORS
@ -601,13 +631,15 @@ void REPL_gen_id(thread_db* tdbb, SLONG genId, SINT64 value)
attachment->att_generators.store(genId, genName);
}
AutoSetRestoreFlag<ULONG> noRecursion(&tdbb->tdbb_flags, TDBB_repl_in_progress, true);
if (!replicator->setSequence(genName.c_str(), value))
handleError(tdbb);
}
void REPL_exec_sql(thread_db* tdbb, jrd_tra* transaction, const string& sql)
{
fb_assert(tdbb->tdbb_flags & TDBB_repl_sql);
fb_assert(tdbb->tdbb_flags & TDBB_repl_in_progress);
if (tdbb->tdbb_flags & TDBB_dont_post_dfw)
return;
@ -622,6 +654,8 @@ void REPL_exec_sql(thread_db* tdbb, jrd_tra* transaction, const string& sql)
const auto attachment = tdbb->getAttachment();
const auto charset = attachment->att_charset;
// This place is already protected from recursion in calling code
if (!replicator->executeSqlIntl(charset, sql.c_str()))
handleError(tdbb, transaction);
}

View File

@ -22,6 +22,7 @@
#include "firebird.h"
#include "../jrd/jrd.h"
#include "../../common/classes/BlobWrapper.h"
#include "Config.h"
#include "Replicator.h"
@ -35,10 +36,8 @@ using namespace Replication;
Replicator::Replicator(MemoryPool& pool,
Manager* manager,
const Guid& guid,
const MetaString& user,
bool cleanupTransactions)
: PermanentStorage(pool),
m_manager(manager),
const MetaString& user)
: m_manager(manager),
m_config(manager->getConfig()),
m_guid(guid),
m_user(user),
@ -46,8 +45,6 @@ Replicator::Replicator(MemoryPool& pool,
m_generators(pool),
m_status(pool)
{
if (cleanupTransactions)
cleanupTransaction(0);
}
void Replicator::flush(BatchBlock& block, FlushReason reason, ULONG flags)
@ -121,7 +118,7 @@ void Replicator::postError(const Exception& ex)
// IReplicatedSession implementation
IReplicatedTransaction* Replicator::startTransaction(SINT64 number)
IReplicatedTransaction* Replicator::startTransaction(ITransaction* trans, SINT64 number)
{
AutoPtr<Transaction> transaction;
@ -130,7 +127,7 @@ IReplicatedTransaction* Replicator::startTransaction(SINT64 number)
MutexLockGuard guard(m_mutex, FB_FUNCTION);
MemoryPool& pool = getPool();
transaction = FB_NEW_POOL(pool) Transaction(this);
transaction = FB_NEW_POOL(pool) Transaction(this, trans);
m_transactions.add(transaction);
auto& txnData = transaction->getData();
@ -213,7 +210,6 @@ bool Replicator::commitTransaction(Transaction* transaction)
return false;
}
transaction->dispose();
return true;
}
@ -241,7 +237,6 @@ bool Replicator::rollbackTransaction(Transaction* transaction)
return false;
}
transaction->dispose();
return true;
}
@ -258,7 +253,7 @@ void Replicator::releaseTransaction(Transaction* transaction)
if (m_transactions.find(transaction, pos))
m_transactions.remove(pos);
}
catch (const Exception& ex)
catch (const Exception&)
{} // no-op
}
@ -333,6 +328,27 @@ bool Replicator::insertRecord(Transaction* transaction,
{
try
{
for (unsigned id = 0; id < record->getCount(); id++)
{
IReplicatedField* field = record->getField(id);
if (field != nullptr)
{
auto type = field->getType();
if (type == SQL_ARRAY || type == SQL_BLOB)
{
const auto blobId = (ISC_QUAD*) field->getData();
if (blobId != nullptr)
{
if (!storeBlob(transaction, *blobId))
{
return false;
}
}
}
}
}
MutexLockGuard guard(m_mutex, FB_FUNCTION);
const auto length = record->getRawLength();
@ -363,6 +379,27 @@ bool Replicator::updateRecord(Transaction* transaction,
{
try
{
for (unsigned id = 0; id < newRecord->getCount(); id++)
{
IReplicatedField* field = newRecord->getField(id);
if (field != nullptr)
{
auto type = field->getType();
if (type == SQL_ARRAY || type == SQL_BLOB)
{
const auto blobId = (ISC_QUAD*) field->getData();
if (blobId != nullptr)
{
if (!storeBlob(transaction, *blobId))
{
return false;
}
}
}
}
}
MutexLockGuard guard(m_mutex, FB_FUNCTION);
const auto orgLength = orgRecord->getRawLength();
@ -420,13 +457,18 @@ bool Replicator::deleteRecord(Transaction* transaction,
}
bool Replicator::storeBlob(Transaction* transaction,
ISC_QUAD blobId,
IReplicatedBlob* blob)
ISC_QUAD blobId)
{
try
{
MutexLockGuard guard(m_mutex, FB_FUNCTION);
BlobWrapper blob(&m_status);
if (!blob.open(m_attachment, transaction->getInterface(), blobId))
{
return false;
}
UCharBuffer buffer;
const auto bufferLength = MAX_USHORT;
auto data = buffer.getBuffer(bufferLength);
@ -434,12 +476,11 @@ bool Replicator::storeBlob(Transaction* transaction,
auto& txnData = transaction->getData();
bool newOp = true;
while (!blob->isEof())
FB_SIZE_T segmentLength;
while (blob.getSegment(bufferLength, data, segmentLength))
{
const auto segmentLength = blob->getSegment(bufferLength, data);
if (!segmentLength)
break;
continue; // Zero-length segments are unusual but ok.
if (newOp)
{
@ -458,6 +499,11 @@ bool Replicator::storeBlob(Transaction* transaction,
}
}
if (m_status->getState() & IStatus::STATE_ERRORS)
return false;
blob.close();
if (newOp)
{
txnData.putTag(opStoreBlob);

View File

@ -36,8 +36,7 @@
namespace Replication
{
class Replicator :
public Firebird::AutoIface<Firebird::IReplicatedSessionImpl<Replicator, Firebird::CheckStatusWrapper> >,
private Firebird::PermanentStorage
public Firebird::StdPlugin<Firebird::IReplicatedSessionImpl<Replicator, Firebird::CheckStatusWrapper> >
{
typedef Firebird::Array<Firebird::MetaString> MetadataCache;
typedef Firebird::HalfStaticArray<SavNumber, 16> SavepointStack;
@ -120,8 +119,8 @@ namespace Replication
public Firebird::AutoIface<Firebird::IReplicatedTransactionImpl<Transaction, Firebird::CheckStatusWrapper> >
{
public:
explicit Transaction(Replicator* replicator)
: m_replicator(replicator), m_data(replicator->getPool())
explicit Transaction(Replicator* replicator, Firebird::ITransaction* trans)
: m_replicator(replicator), m_transaction(trans), m_data(replicator->getPool())
{}
BatchBlock& getData()
@ -129,6 +128,11 @@ namespace Replication
return m_data;
}
Firebird::ITransaction* getInterface()
{
return m_transaction.getPtr();
}
// IDisposable methods
void dispose()
@ -184,11 +188,6 @@ namespace Replication
return m_replicator->deleteRecord(this, name, record) ? FB_TRUE : FB_FALSE;
}
FB_BOOLEAN storeBlob(ISC_QUAD blobId, Firebird::IReplicatedBlob* blob)
{
return m_replicator->storeBlob(this, blobId, blob) ? FB_TRUE : FB_FALSE;
}
FB_BOOLEAN executeSql(const char* sql)
{
return m_replicator->executeSql(this, sql) ? FB_TRUE : FB_FALSE;
@ -201,6 +200,7 @@ namespace Replication
private:
Replicator* const m_replicator;
Firebird::RefPtr<Firebird::ITransaction> m_transaction;
BatchBlock m_data;
};
@ -223,14 +223,17 @@ namespace Replication
Replicator(Firebird::MemoryPool& pool,
Manager* manager,
const Firebird::Guid& dbGuid,
const Firebird::MetaString& userName,
bool cleanupTransactions);
const Firebird::MetaString& userName);
// IDisposable methods
void dispose()
// IReferenceCounted methods
int release()
{
delete this;
if (--refCounter == 0)
{
delete this;
return 0;
}
return 1;
}
// IReplicatedSession methods
@ -240,7 +243,11 @@ namespace Replication
return &m_status;
}
Firebird::IReplicatedTransaction* startTransaction(SINT64 number);
void setAttachment(Firebird::IAttachment* att) override
{
m_attachment = att;
}
Firebird::IReplicatedTransaction* startTransaction(Firebird::ITransaction* trans, SINT64 number) override;
FB_BOOLEAN cleanupTransaction(SINT64 number);
FB_BOOLEAN setSequence(const char* name, SINT64 value);
@ -253,6 +260,7 @@ namespace Replication
GeneratorCache m_generators;
Firebird::Mutex m_mutex;
Firebird::FbLocalStatus m_status;
Firebird::RefPtr<Firebird::IAttachment> m_attachment;
void initialize();
void flush(BatchBlock& txnData, FlushReason reason, ULONG flags = 0);
@ -279,9 +287,9 @@ namespace Replication
const char* name,
Firebird::IReplicatedRecord* record);
// Blob id is passed by value because BlobWrapper requires reference to non-const ISC_QUAD
bool storeBlob(Transaction* transaction,
ISC_QUAD blobId,
Firebird::IReplicatedBlob* blob);
ISC_QUAD blobId);
bool executeSql(Transaction* transaction,
const char* sql)

View File

@ -78,7 +78,7 @@ Lock* RLCK_reserve_relation(thread_db* tdbb, jrd_tra* transaction, jrd_rel* rela
if (dbb->isReplica(REPLICA_READ_ONLY) &&
!(tdbb->tdbb_flags & TDBB_replicator) &&
!(tdbb->tdbb_flags & TDBB_repl_sql))
!(tdbb->tdbb_flags & TDBB_repl_in_progress))
{
// This condition is a workaround for nbackup
if (relation->rel_id != rel_backup_history)

View File

@ -465,6 +465,14 @@ void TRA_commit(thread_db* tdbb, jrd_tra* transaction, const bool retaining_flag
Jrd::ContextPoolHolder context(tdbb, transaction->tra_pool);
// Get rid of all user savepoints
while (transaction->tra_save_point && !transaction->tra_save_point->isRoot())
transaction->rollforwardSavepoint(tdbb);
// Let replicator perform heavy and error-prone part of work
REPL_trans_prepare(tdbb, transaction);
// Perform any meta data work deferred
if (!(transaction->tra_flags & TRA_prepared))
@ -495,7 +503,7 @@ void TRA_commit(thread_db* tdbb, jrd_tra* transaction, const bool retaining_flag
if (transaction->tra_flags & TRA_write)
{
// Get rid of user savepoints to allow intermediate garbage collection
// Get rid of the rest of savepoints to allow intermediate garbage collection
// in indices and BLOBs after in-place updates
while (transaction->tra_save_point)
transaction->rollforwardSavepoint(tdbb);
@ -1295,7 +1303,10 @@ void TRA_release_transaction(thread_db* tdbb, jrd_tra* transaction, Jrd::TraceTr
// Destroy the replicated transaction reference
if (transaction->tra_replicator)
{
transaction->tra_replicator->dispose();
transaction->tra_replicator = nullptr;
}
// Release transaction's under-modification-rpb list