diff --git a/src/remote/server/ReplServer.cpp b/src/remote/server/ReplServer.cpp index de32854f83..3b12feee39 100644 --- a/src/remote/server/ReplServer.cpp +++ b/src/remote/server/ReplServer.cpp @@ -574,17 +574,20 @@ namespace return true; } + enum ActionType { REPLICATE, REPLAY, FAST_FORWARD }; + void replicate(Target* target, TransactionList& transactions, FB_UINT64 sequence, ULONG offset, ULONG length, const UCHAR* data, - bool rewind) + ActionType action) { const Block* const header = (Block*) data; const auto traNumber = header->traNumber; - if (!rewind || !traNumber || transactions.exist(traNumber)) + if (action == REPLICATE || + (action == REPLAY && (!traNumber || transactions.exist(traNumber)))) { target->replicate(sequence, offset, length, data); } @@ -597,7 +600,7 @@ namespace if (transactions.find(traNumber, pos)) transactions.remove(pos); } - else if (!rewind) + else if (action != REPLAY) { transactions.clear(); } @@ -606,7 +609,7 @@ namespace { fb_assert(traNumber); - if (!rewind && !transactions.exist(traNumber)) + if (action != REPLAY && !transactions.exist(traNumber)) transactions.add(ActiveTransaction(traNumber, sequence)); } } @@ -737,6 +740,7 @@ namespace const FB_UINT64 max_sequence = queue.back()->header.hdr_sequence; FB_UINT64 next_sequence = 0; const bool restart = target->isShutdown(); + auto action = REPLICATE; for (auto segment : queue) { @@ -754,21 +758,48 @@ namespace const FB_UINT64 db_sequence = target->initReplica(); const FB_UINT64 last_db_sequence = control.getDbSequence(); - if (sequence <= db_sequence) - { - target->verbose("Deleting segment %" UQUADFORMAT " due to fast forward", sequence); - segment->remove(); - continue; - } - if (db_sequence != last_db_sequence) { - target->verbose("Resetting replication to continue from segment %" UQUADFORMAT, db_sequence + 1); - control.saveDbSequence(db_sequence); - transactions.clear(); - control.saveComplete(db_sequence, transactions); - last_sequence = db_sequence; - last_offset = 0; + if (sequence == db_sequence + 1) + { + if (const auto oldest = findOldest(transactions)) + { + const TraNumber oldest_trans = oldest->tra_id; + const FB_UINT64 oldest_sequence = oldest ? oldest->sequence : 0; + target->verbose("Resetting replication to continue from segment %" UQUADFORMAT + " (new OAT: %" UQUADFORMAT " in segment %" UQUADFORMAT ")", + db_sequence + 1, oldest_trans, oldest_sequence); + } + else + { + target->verbose("Resetting replication to continue from segment %" UQUADFORMAT, + db_sequence + 1); + } + + control.saveDbSequence(db_sequence); + return PROCESS_SHUTDOWN; // this enforces restart from OAT + } + + if (action != FAST_FORWARD) + { + if (segment != queue.front()) + { + fb_assert(false); + return PROCESS_SHUTDOWN; + } + + if (db_sequence > max_sequence) + { + target->verbose("Database sequence has been changed to %" UQUADFORMAT + ", waiting for appropriate segment", db_sequence); + return PROCESS_SUSPEND; + } + + target->verbose("Database sequence has been changed to %" UQUADFORMAT + ", preparing for replication reset", db_sequence); + + action = FAST_FORWARD; + } } // If no new segments appeared since our last attempt, @@ -843,8 +874,10 @@ namespace if (blockLength) { - const bool rewind = (sequence < last_sequence || + const bool replay = (sequence < last_sequence || (sequence == last_sequence && (!last_offset || totalLength < last_offset))); + if (action != FAST_FORWARD) + action = replay ? REPLAY : REPLICATE; UCHAR* const data = buffer.getBuffer(length); memcpy(data, &header, sizeof(Block)); @@ -852,8 +885,7 @@ namespace if (read(file, data + sizeof(Block), blockLength) != blockLength) raiseError("Journal file %s read failed (error %d)", segment->filename.c_str(), ERRNO); - replicate(target, transactions, sequence, totalLength, - length, data, rewind); + replicate(target, transactions, sequence, totalLength, length, data, action); } totalLength += length; @@ -872,7 +904,10 @@ namespace oldest_sequence = oldest ? oldest->sequence : 0; next_sequence = sequence + 1; - string extra; + string actionName, extra; + actionName = (action == FAST_FORWARD) ? "scanned" : + (action == REPLAY) ? "replayed" : "replicated"; + if (oldest) { const TraNumber oldest_trans = oldest->tra_id; @@ -884,8 +919,8 @@ namespace extra = "deleting"; } - target->verbose("Segment %" UQUADFORMAT " (%u bytes) is replicated in %s, %s", - sequence, totalLength, interval.c_str(), extra.c_str()); + target->verbose("Segment %" UQUADFORMAT " (%u bytes) is %s in %s, %s", + sequence, totalLength, actionName.c_str(), interval.c_str(), extra.c_str()); if (!oldest_sequence) segment->remove(); @@ -907,8 +942,8 @@ namespace break; target->verbose("Deleting segment %" UQUADFORMAT " as no longer needed", sequence); - segment->remove(); + } while (pos < queue.getCount()); } }