From 46284645e5db94bbf528a6c8508eb41e65d54054 Mon Sep 17 00:00:00 2001 From: Dmitry Yemanov Date: Sun, 23 Aug 2020 18:16:52 +0300 Subject: [PATCH] Avoid replication of empty (read-only) transactions. Fixed memory leak. Misc corrections. --- src/jrd/replication/Manager.h | 2 +- src/jrd/replication/Protocol.h | 2 +- src/jrd/replication/Replicator.cpp | 82 +++++++++++++++++------------- src/jrd/replication/Replicator.h | 8 ++- 4 files changed, 55 insertions(+), 39 deletions(-) diff --git a/src/jrd/replication/Manager.h b/src/jrd/replication/Manager.h index 89c8c924f7..3372857c8f 100644 --- a/src/jrd/replication/Manager.h +++ b/src/jrd/replication/Manager.h @@ -78,6 +78,7 @@ namespace Replication } Firebird::UCharBuffer* getBuffer(); + void releaseBuffer(Firebird::UCharBuffer* buffer); void flush(Firebird::UCharBuffer* buffer, bool sync); @@ -94,7 +95,6 @@ namespace Replication private: void logError(const Firebird::IStatus* status); - void releaseBuffer(Firebird::UCharBuffer* buffer); void bgWriter(); diff --git a/src/jrd/replication/Protocol.h b/src/jrd/replication/Protocol.h index 05689639fd..7c3e1f6ee3 100644 --- a/src/jrd/replication/Protocol.h +++ b/src/jrd/replication/Protocol.h @@ -49,7 +49,7 @@ namespace Replication const ULONG BLOCK_BEGIN_TRANS = 1; const ULONG BLOCK_END_TRANS = 2; - enum Operation + enum Operation: UCHAR { opStartTransaction = 1, opPrepareTransaction = 2, diff --git a/src/jrd/replication/Replicator.cpp b/src/jrd/replication/Replicator.cpp index ab6ce38af3..1af4c56a89 100644 --- a/src/jrd/replication/Replicator.cpp +++ b/src/jrd/replication/Replicator.cpp @@ -119,20 +119,6 @@ void Replicator::postError(const Exception& ex) newErrors.copyTo(&m_status); } -// IDisposable implementation - -void Replicator::dispose() -{ - try - { - delete this; - } - catch (const Exception& ex) - { - postError(ex); - } -} - // IReplicatedSession implementation IReplicatedTransaction* Replicator::startTransaction(SINT64 number) @@ -194,25 +180,32 @@ bool Replicator::commitTransaction(Transaction* transaction) auto& txnData = transaction->getData(); - for (const auto& generator : m_generators) + // Do not replicate this transaction if it's de-facto read-only. + // If there were no flushes yet and the buffer contains just one tag + // (this should be opStartTransaction), it means nothing was changed. + + const auto dataLength = txnData.buffer->getCount() - sizeof(Block); + + if (txnData.flushes || dataLength > sizeof(UCHAR)) { - fb_assert(generator.name.hasData()); + for (const auto& generator : m_generators) + { + fb_assert(generator.name.hasData()); - txnData.putTag(opSetSequence); - txnData.putMetaName(generator.name.c_str()); - txnData.putBigInt(generator.value); + txnData.putTag(opSetSequence); + txnData.putMetaName(generator.name.c_str()); + txnData.putBigInt(generator.value); + } + + m_generators.clear(); + + txnData.putTag(opCommitTransaction); + flush(txnData, FLUSH_SYNC, BLOCK_END_TRANS); + } + else + { + fb_assert((*txnData.buffer)[sizeof(Block)] == opStartTransaction); } - - m_generators.clear(); - - txnData.putTag(opCommitTransaction); - flush(txnData, FLUSH_SYNC, BLOCK_END_TRANS); - - FB_SIZE_T pos; - if (m_transactions.find(transaction, pos)) - m_transactions.remove(pos); - - transaction->dispose(); } catch (const Exception& ex) { @@ -220,6 +213,7 @@ bool Replicator::commitTransaction(Transaction* transaction) return false; } + transaction->dispose(); return true; } @@ -236,12 +230,10 @@ bool Replicator::rollbackTransaction(Transaction* transaction) txnData.putTag(opRollbackTransaction); flush(txnData, FLUSH_SYNC, BLOCK_END_TRANS); } - - FB_SIZE_T pos; - if (m_transactions.find(transaction, pos)) - m_transactions.remove(pos); - - transaction->dispose(); + else + { + fb_assert((*txnData.buffer)[sizeof(Block)] == opStartTransaction); + } } catch (const Exception& ex) { @@ -249,9 +241,27 @@ bool Replicator::rollbackTransaction(Transaction* transaction) return false; } + transaction->dispose(); return true; } +void Replicator::releaseTransaction(Transaction* transaction) +{ + try + { + MutexLockGuard guard(m_mutex, FB_FUNCTION); + + auto& txnData = transaction->getData(); + m_manager->releaseBuffer(txnData.buffer); + + FB_SIZE_T pos; + if (m_transactions.find(transaction, pos)) + m_transactions.remove(pos); + } + catch (const Exception& ex) + {} // no-op +} + bool Replicator::startSavepoint(Transaction* transaction) { try diff --git a/src/jrd/replication/Replicator.h b/src/jrd/replication/Replicator.h index a87f9cca0e..692ac903f8 100644 --- a/src/jrd/replication/Replicator.h +++ b/src/jrd/replication/Replicator.h @@ -133,6 +133,7 @@ namespace Replication void dispose() { + m_replicator->releaseTransaction(this); delete this; } @@ -226,7 +227,11 @@ namespace Replication bool cleanupTransactions); // IDisposable methods - void dispose(); + + void dispose() + { + delete this; + } // IReplicatedSession methods @@ -257,6 +262,7 @@ namespace Replication bool prepareTransaction(Transaction* transaction); bool commitTransaction(Transaction* transaction); bool rollbackTransaction(Transaction* transaction); + void releaseTransaction(Transaction* transaction); bool startSavepoint(Transaction* transaction); bool releaseSavepoint(Transaction* transaction);