8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 14:03:07 +01:00

Allow multiple appliers per attachment

This commit is contained in:
Dmitry Yemanov 2021-05-01 18:10:57 +03:00
parent ae643883dc
commit 5c0d3f82f1
6 changed files with 62 additions and 52 deletions

View File

@ -253,6 +253,7 @@ Jrd::Attachment::Attachment(MemoryPool* pool, Database* dbb, JProvider* provider
att_dest_bind(&att_bindings), att_dest_bind(&att_bindings),
att_original_timezone(TimeZoneUtil::getSystemTimeZone()), att_original_timezone(TimeZoneUtil::getSystemTimeZone()),
att_current_timezone(att_original_timezone), att_current_timezone(att_original_timezone),
att_repl_appliers(*pool),
att_utility(UTIL_NONE), att_utility(UTIL_NONE),
att_procedures(*pool), att_procedures(*pool),
att_functions(*pool), att_functions(*pool),

View File

@ -483,7 +483,7 @@ public:
Firebird::RefPtr<Firebird::IReplicatedSession> att_replicator; Firebird::RefPtr<Firebird::IReplicatedSession> att_replicator;
Firebird::AutoPtr<Replication::TableMatcher> att_repl_matcher; Firebird::AutoPtr<Replication::TableMatcher> att_repl_matcher;
Firebird::AutoPtr<Applier> att_repl_applier; Firebird::Array<Applier*> att_repl_appliers;
enum UtilType { UTIL_NONE, UTIL_GBAK, UTIL_GFIX, UTIL_GSTAT }; enum UtilType { UTIL_NONE, UTIL_GBAK, UTIL_GFIX, UTIL_GSTAT };

View File

@ -41,6 +41,7 @@ class StableAttachmentPart;
class Attachment; class Attachment;
class Service; class Service;
class UserId; class UserId;
class Applier;
// forward declarations // forward declarations
class JStatement; class JStatement;
@ -234,19 +235,25 @@ public:
void close(Firebird::CheckStatusWrapper* status) override; void close(Firebird::CheckStatusWrapper* status) override;
public: public:
JReplicator(StableAttachmentPart* sa); JReplicator(Applier* appl, StableAttachmentPart* sa);
StableAttachmentPart* getAttachment() StableAttachmentPart* getAttachment()
{ {
return sAtt; return sAtt;
} }
JReplicator* getHandle() throw() Applier* getHandle() throw()
{ {
return this; return applier;
}
void resetHandle()
{
applier = NULL;
} }
private: private:
Applier* applier;
Firebird::RefPtr<StableAttachmentPart> sAtt; Firebird::RefPtr<StableAttachmentPart> sAtt;
void freeEngineData(Firebird::CheckStatusWrapper* status); void freeEngineData(Firebird::CheckStatusWrapper* status);

View File

@ -702,12 +702,12 @@ namespace
validateHandle(tdbb, batch->getAttachment()); validateHandle(tdbb, batch->getAttachment());
} }
inline void validateHandle(thread_db* tdbb, JReplicator* const replicator) inline void validateHandle(thread_db* tdbb, Applier* const applier)
{ {
if (!replicator) if (!applier)
status_exception::raise(Arg::Gds(isc_bad_repl_handle)); status_exception::raise(Arg::Gds(isc_bad_repl_handle));
validateHandle(tdbb, replicator->getAttachment()->getHandle()); validateHandle(tdbb, applier->getAttachment());
} }
class AttachmentHolder class AttachmentHolder
@ -5089,13 +5089,11 @@ IReplicator* JAttachment::createReplicator(CheckStatusWrapper* user_status)
try try
{ {
const auto att = tdbb->getAttachment(); const auto applier = Applier::create(tdbb);
if (!att->att_repl_applier) jr = FB_NEW JReplicator(applier, getStable());
att->att_repl_applier = Applier::create(tdbb);
jr = FB_NEW JReplicator(getStable());
jr->addRef(); jr->addRef();
applier->setInterfacePtr(jr);
} }
catch (const Exception& ex) catch (const Exception& ex)
{ {
@ -6196,8 +6194,8 @@ void JBatch::cancel(CheckStatusWrapper* status)
} }
JReplicator::JReplicator(StableAttachmentPart* sa) JReplicator::JReplicator(Applier* appl, StableAttachmentPart* sa)
: sAtt(sa) : applier(appl), sAtt(sa)
{ } { }
@ -6207,10 +6205,13 @@ int JReplicator::release()
if (rc != 0) if (rc != 0)
return rc; return rc;
if (applier)
{
LocalStatus status; LocalStatus status;
CheckStatusWrapper statusWrapper(&status); CheckStatusWrapper statusWrapper(&status);
freeEngineData(&statusWrapper); freeEngineData(&statusWrapper);
}
delete this; delete this;
return 0; return 0;
@ -6226,9 +6227,9 @@ void JReplicator::freeEngineData(Firebird::CheckStatusWrapper* user_status)
try try
{ {
const auto att = sAtt->getHandle(); AutoPtr<Applier> cleanupApplier(applier);
if (att) cleanupApplier->shutdown(tdbb);
att->att_repl_applier.reset(); fb_assert(!applier);
} }
catch (const Exception& ex) catch (const Exception& ex)
{ {
@ -6255,8 +6256,7 @@ void JReplicator::process(CheckStatusWrapper* status, unsigned length, const UCH
try try
{ {
const auto att = sAtt->getHandle(); applier->process(tdbb, length, data);
att->att_repl_applier->process(tdbb, length, data);
} }
catch (const Exception& ex) catch (const Exception& ex)
{ {
@ -6276,34 +6276,9 @@ void JReplicator::process(CheckStatusWrapper* status, unsigned length, const UCH
} }
void JReplicator::close(CheckStatusWrapper* status) void JReplicator::close(CheckStatusWrapper* user_status)
{ {
try freeEngineData(user_status);
{
EngineContextHolder tdbb(status, this, FB_FUNCTION);
check_database(tdbb);
try
{
const auto att = sAtt->getHandle();
att->att_repl_applier->shutdown(tdbb);
att->att_repl_applier.reset();
}
catch (const Exception& ex)
{
transliterateException(tdbb, ex, status, "JReplicator::close");
return;
}
trace_warning(tdbb, status, "JReplicator::close");
}
catch (const Exception& ex)
{
ex.stuffException(status);
return;
}
successful_completion(status);
} }
@ -7380,8 +7355,11 @@ void release_attachment(thread_db* tdbb, Jrd::Attachment* attachment)
attachment->att_replicator = nullptr; attachment->att_replicator = nullptr;
if (attachment->att_repl_applier) while (attachment->att_repl_appliers.hasData())
attachment->att_repl_applier->shutdown(tdbb); {
AutoPtr<Applier> cleanupApplier(attachment->att_repl_appliers.pop());
cleanupApplier->shutdown(tdbb);
}
if (dbb->dbb_crypto_manager) if (dbb->dbb_crypto_manager)
dbb->dbb_crypto_manager->detach(tdbb, attachment); dbb->dbb_crypto_manager->detach(tdbb, attachment);

View File

@ -221,11 +221,16 @@ Applier* Applier::create(thread_db* tdbb)
request->req_attachment = attachment; request->req_attachment = attachment;
auto& att_pool = *attachment->att_pool; auto& att_pool = *attachment->att_pool;
return FB_NEW_POOL(att_pool) Applier(att_pool, dbb->dbb_filename, request); const auto applier = FB_NEW_POOL(att_pool) Applier(att_pool, dbb->dbb_filename, request);
attachment->att_repl_appliers.add(applier);
return applier;
} }
void Applier::shutdown(thread_db* tdbb) void Applier::shutdown(thread_db* tdbb)
{ {
const auto attachment = tdbb->getAttachment();
cleanupTransactions(tdbb); cleanupTransactions(tdbb);
CMP_release(tdbb, m_request); CMP_release(tdbb, m_request);
@ -233,6 +238,14 @@ void Applier::shutdown(thread_db* tdbb)
m_record = NULL; m_record = NULL;
m_bitmap->clear(); m_bitmap->clear();
attachment->att_repl_appliers.findAndRemove(this);
if (m_interface)
{
m_interface->resetHandle();
m_interface = nullptr;
}
} }
void Applier::process(thread_db* tdbb, ULONG length, const UCHAR* data) void Applier::process(thread_db* tdbb, ULONG length, const UCHAR* data)

View File

@ -137,12 +137,23 @@ namespace Jrd
void shutdown(thread_db* tdbb); void shutdown(thread_db* tdbb);
Attachment* getAttachment() const
{
return m_request ? m_request->req_attachment : nullptr;
}
void setInterfacePtr(JReplicator* interfacePtr)
{
m_interface = interfacePtr;
}
private: private:
TransactionMap m_txnMap; TransactionMap m_txnMap;
const Firebird::PathName m_database; const Firebird::PathName m_database;
jrd_req* m_request; jrd_req* m_request;
Firebird::AutoPtr<RecordBitmap> m_bitmap; Firebird::AutoPtr<RecordBitmap> m_bitmap;
Record* m_record; Record* m_record;
JReplicator* m_interface;
void startTransaction(thread_db* tdbb, TraNumber traNum); void startTransaction(thread_db* tdbb, TraNumber traNum);
void prepareTransaction(thread_db* tdbb, TraNumber traNum); void prepareTransaction(thread_db* tdbb, TraNumber traNum);