diff --git a/doc/README.read_consistency.md b/doc/README.read_consistency.md index b2c2429cbe..fca9b49253 100644 --- a/doc/README.read_consistency.md +++ b/doc/README.read_consistency.md @@ -125,32 +125,58 @@ So, there are three kinds of read-committed transactions now: ### Update conflicts handling -When statement executed within read committed read consistency transaction its database view is +When statement executed within READ COMMITTED READ CONSISTENCY transaction its database view is not changed (similar to snapshot transaction). Therefore it is useless to wait for commit of concurrent transaction in the hope to re-read new committed record version. On read, behavior is -similar to read committed *record version* transaction - do not wait for active transaction and +similar to READ COMMITTED *RECORD VERSION* transaction - do not wait for active transaction and walk backversions chain looking for record version visible to the current snapshot. -When update conflict happens engine behavior is changed. If concurrent transaction is active, -engine waits (according to transaction lock timeout) and if concurrent transaction still not -committed, update conflict error is returned. If concurrent transaction is committed, engine -creates new snapshot and restart top-level statement execution. In both cases all work performed -up to the top-level statement is undone. +For READ COMMITTED *READ CONSISTENCY* mode handling of update conflicts by the engine is changed +significantly. +When update conflict is detected the following is performed: +a) transaction isolation mode temporarily switched to the READ COMMITTED *NO RECORD VERSION MODE* +b) engine put write lock on conflicted record +c) engine continue to evaluate remaining records of update\delete cursor and put write locks + on it too +d) when there is no more records to fetch, engine start to undo all actions performed since + top-level statement execution starts and preserve already taken write locks for every + updated\deleted\locked record, all inserted records are removed +e) then engine restores transaction isolation mode as READ COMMITTED *READ CONSISTENCY*, creates + new statement-level snapshot and restart execution of top-level statement. -This is the same logic as user applications uses for update conflict handling usually, but it -is a bit more efficient as it excludes network roundtrips to\from client host. This restart -logic is not applied to the selectable stored procedures if update conflict happens after any -record returned to the client application. In this case *isc_update_conflict* error is returned. +Such algorithm allows to ensure that after restart already updated records remains locked, +will be visible to the new snapshot, and could be updated again with no further conflicts. +Also, because of read consistency mode, set of modified records remains consistent. -Note: by historical reasons isc_update_conflict reported as secondary error code with primary -error code isc_deadlock. +Notes: +- restart algorithm above is applied to the UPDATE, DELETE, SELECT WITH LOCK and MERGE statements, + with and without RETURNING clause, executing directly by user applicaiton or as a part of some + PSQL object (stored procedure\function, trigger, EXECUTE BLOCK, etc) +- if UPDATE\DELETE statement is positioned on some explicit cursor (WHERE CURRENT OF) then engine + skip step (c) above, i.e. not fetches and not put write locks on remaining records of cursor +- if top-level statement is SELECT'able and update conflict happens after one or more records was + returned to the application, then update conflict error is reported as usual and restart is not + initiated +- restart is not initiated for statements in autonomous blocks (IN AUTONOMOUS TRANSACTION DO ...) +- after 10 attempts engine aborts restart algorithm, releases all write locks, restores transaction + isolation mode as READ COMMITTED *READ CONSISTENCY* and report update conflict +- any not handled error at step (c) above stops restart algorithm and engine continue processing + in usual way, for example error could be catched and handled by PSQL WHEN block or reported to + the application if not handled +- UPDATE\DELETE triggers will fire multiply times for the same record if statement execution was + restarted and record is updated\deleted again +- by historical reasons isc_update_conflict reported as secondary error code with primary error + code isc_deadlock. -### No more precommitted transactions -*Read-committed read only* (RCRO) transactions currently marked as committed immediately when -transaction started. This is correct if read-committed transaction needs no database snapshot. -But it is not correct to mark RCRO transaction as committed as it can break statement-level -snapshot consistency. +### Precommitted transactions + +READ COMMITTED READ ONLY transactions marked as committed immediately when transaction started. +This is correct if read-committed transaction needs no database snapshot. But it is not correct +to mark READ CONSISTENCY READ ONLY transaction as committed as it can break statement-level +snapshot consistency. Therefore READ CONSISTENCY READ ONLY transactions is not precommitted +on start. Other kinds of READ COMMITTED READ ONLY transactions ([NO] RECORD VERSION) works as +before and marked as committed when such transaction started. ### Support for new READ COMMITTED READ CONSISTENCY isolation level #### SQL syntax @@ -202,15 +228,16 @@ value of *oldest active snapshot* which could see *given* record version. If few in a chain got the same mark then all of them after the first one could be removed. This allows to keep versions chains short. -To make it works, engine maintains list of all active database snapshots. This list is kept in shared +To make it work, engine maintains list of all active database snapshots. This list is kept in shared memory. The initial size of shared memory block could be set in firebird.conf using new setting **SnapshotsMemSize**. Default value is 64KB. It could grow automatically, when necessary. -When engine need to find "*oldest active snapshot* which could see *given* record version" it -just search for CN of transaction that created given record version at the sorted array of active +When engine needs to find "*oldest active snapshot* which could see *given* record version" it just +searches for CN of transaction that created given record version in the sorted array of active snapshots. Garbage collection of intermediate record versions run by: - sweep - background garbage collector in SuperServer - every user attachment after update or delete record + - table scan at index creation diff --git a/doc/sql.extensions/README.autonomous_transactions.txt b/doc/sql.extensions/README.autonomous_transactions.txt index 53be08aecd..f68c8df534 100644 --- a/doc/sql.extensions/README.autonomous_transactions.txt +++ b/doc/sql.extensions/README.autonomous_transactions.txt @@ -10,7 +10,9 @@ need to raise an exception but do not want the database changes to be rolled-bac If exceptions are raised inside the body of an autonomous transaction block, the changes are rolled-back. If the block runs till the end, the transaction is committed. -The new transaction is initiated with the same isolation level of the existing one. +The new transaction is initiated with the same isolation level and lock timeout of the existing one. +The only exception is that if existing transaction run in READ COMMITTED READ CONSISTENCY mode, then +new autonomous transaction will run in CONCURRENCY mode. Should be used with caution to avoid deadlocks. Author: diff --git a/src/dsql/BlrDebugWriter.cpp b/src/dsql/BlrDebugWriter.cpp index 4757e16cd1..fdb810c440 100644 --- a/src/dsql/BlrDebugWriter.cpp +++ b/src/dsql/BlrDebugWriter.cpp @@ -58,21 +58,9 @@ void BlrDebugWriter::putDebugSrcInfo(ULONG line, ULONG col) { debugData.add(fb_dbg_map_src2blr); - debugData.add(line); - debugData.add(line >> 8); - debugData.add(line >> 16); - debugData.add(line >> 24); - - debugData.add(col); - debugData.add(col >> 8); - debugData.add(col >> 16); - debugData.add(col >> 24); - - const ULONG offset = (getBlrData().getCount() - getBaseOffset()); - debugData.add(offset); - debugData.add(offset >> 8); - debugData.add(offset >> 16); - debugData.add(offset >> 24); + putValue(line); + putValue(col); + putBlrOffset(); } void BlrDebugWriter::putDebugVariable(USHORT number, const MetaName& name) @@ -132,10 +120,7 @@ void BlrDebugWriter::putDebugSubFunction(DeclareSubFuncNode* subFuncNode) HalfStaticArray& subDebugData = subFuncNode->blockScratch->debugData; const ULONG count = ULONG(subDebugData.getCount()); - debugData.add(UCHAR(count)); - debugData.add(UCHAR(count >> 8)); - debugData.add(UCHAR(count >> 16)); - debugData.add(UCHAR(count >> 24)); + putValue(count); debugData.add(subDebugData.begin(), count); } @@ -152,11 +137,22 @@ void BlrDebugWriter::putDebugSubProcedure(DeclareSubProcNode* subProcNode) HalfStaticArray& subDebugData = subProcNode->blockScratch->debugData; const ULONG count = ULONG(subDebugData.getCount()); - debugData.add(UCHAR(count)); - debugData.add(UCHAR(count >> 8)); - debugData.add(UCHAR(count >> 16)); - debugData.add(UCHAR(count >> 24)); + putValue(count); debugData.add(subDebugData.begin(), count); } +void BlrDebugWriter::putValue(ULONG val) +{ + debugData.add(val); + debugData.add(val >> 8); + debugData.add(val >> 16); + debugData.add(val >> 24); +} + +void BlrDebugWriter::putBlrOffset() +{ + const ULONG offset = (getBlrData().getCount() - getBaseOffset()); + putValue(offset); +} + } // namespace Jrd diff --git a/src/dsql/BlrDebugWriter.h b/src/dsql/BlrDebugWriter.h index 22ff42c67d..ec81544e50 100644 --- a/src/dsql/BlrDebugWriter.h +++ b/src/dsql/BlrDebugWriter.h @@ -56,6 +56,9 @@ public: virtual void raiseError(const Firebird::Arg::StatusVector& vector); private: + void putValue(ULONG val); + void putBlrOffset(); + DebugData debugData; }; diff --git a/src/dsql/DsqlCompilerScratch.cpp b/src/dsql/DsqlCompilerScratch.cpp index aa5bd71736..a84970b0b8 100644 --- a/src/dsql/DsqlCompilerScratch.cpp +++ b/src/dsql/DsqlCompilerScratch.cpp @@ -59,6 +59,27 @@ void DsqlCompilerScratch::dumpContextStack(const DsqlContextStack* stack) #endif +void DsqlCompilerScratch::putBlrMarkers(ULONG marks) +{ + appendUChar(blr_marks); + if (marks <= MAX_UCHAR) + { + appendUChar(1); + appendUChar(marks); + } + else if (marks <= MAX_USHORT) + { + appendUChar(2); + appendUShort(marks); + } + else + { + appendUChar(4); + appendULong(marks); + } +} + + // Write out field data type. // Taking special care to declare international text. void DsqlCompilerScratch::putDtype(const TypeClause* field, bool useSubType) diff --git a/src/dsql/DsqlCompilerScratch.h b/src/dsql/DsqlCompilerScratch.h index e54cea0ff1..21b93d731a 100644 --- a/src/dsql/DsqlCompilerScratch.h +++ b/src/dsql/DsqlCompilerScratch.h @@ -169,6 +169,7 @@ public: return statement; } + void putBlrMarkers(ULONG marks); void putDtype(const TypeClause* field, bool useSubType); void putType(const TypeClause* type, bool useSubType); void putLocalVariables(CompoundStmtNode* parameters, USHORT locals); diff --git a/src/dsql/Nodes.h b/src/dsql/Nodes.h index f98d5e982c..c942cb0831 100644 --- a/src/dsql/Nodes.h +++ b/src/dsql/Nodes.h @@ -1389,6 +1389,12 @@ public: POST_TRIG = 2 }; + // Marks used by EraseNode, ModifyNode and StoreNode + static const unsigned MARK_POSITIONED = 0x01; // Erase|Modify node is positioned at explicit cursor + static const unsigned MARK_MERGE = 0x02; // node is part of MERGE statement + // Marks used by ForNode + static const unsigned MARK_FOR_UPDATE = 0x04; // implicit cursor used in UPDATE\DELETE\MERGE statement + struct ExeState { ExeState(thread_db* tdbb, jrd_req* request, jrd_tra* transaction) diff --git a/src/dsql/StmtNodes.cpp b/src/dsql/StmtNodes.cpp index 6b4ad2cb5a..f89e6a52c6 100644 --- a/src/dsql/StmtNodes.cpp +++ b/src/dsql/StmtNodes.cpp @@ -88,6 +88,7 @@ static void dsqlSetParameterName(DsqlCompilerScratch*, ExprNode*, const ValueExp static void dsqlSetParametersName(DsqlCompilerScratch*, CompoundStmtNode*, const RecordSourceNode*); static void cleanupRpb(thread_db* tdbb, record_param* rpb); +static void forceWriteLock(thread_db* tdbb, record_param* rpb, jrd_tra* transaction); static void makeValidation(thread_db* tdbb, CompilerScratch* csb, StreamType stream, Array& validations); static StmtNode* pass1ExpandView(thread_db* tdbb, CompilerScratch* csb, StreamType orgStream, @@ -96,12 +97,14 @@ static RelationSourceNode* pass1Update(thread_db* tdbb, CompilerScratch* csb, jr const TrigVector* trigger, StreamType stream, StreamType updateStream, SecurityClass::flags_t priv, jrd_rel* view, StreamType viewStream, StreamType viewUpdateStream); static void pass1Validations(thread_db* tdbb, CompilerScratch* csb, Array& validations); +static ForNode* pass2FindForNode(StmtNode* node, StreamType stream); static void postTriggerAccess(CompilerScratch* csb, jrd_rel* ownerRelation, ExternalAccess::exa_act operation, jrd_rel* view); static void preModifyEraseTriggers(thread_db* tdbb, TrigVector** trigs, StmtNode::WhichTrigger whichTrig, record_param* rpb, record_param* rec, TriggerAction op); static void preprocessAssignments(thread_db* tdbb, CompilerScratch* csb, StreamType stream, CompoundStmtNode* compoundNode, const Nullable* insertOverride); +static void restartRequest(const jrd_req* request, jrd_tra* transaction); static void validateExpressions(thread_db* tdbb, const Array& validations); } // namespace Jrd @@ -661,9 +664,12 @@ const StmtNode* BlockNode::execute(thread_db* tdbb, jrd_req* request, ExeState* return parentStmt; } + // Skip PSQL exception handlers when request restart is in progress + const bool skipHandlers = (transaction->tra_flags & TRA_ex_restart); + const StmtNode* temp = parentStmt; - if (handlers && handlers->statements.hasData()) + if (handlers && handlers->statements.hasData() && !skipHandlers) { // First of all rollback failed work if (!(transaction->tra_flags & TRA_system)) @@ -2278,6 +2284,9 @@ DmlNode* EraseNode::parse(thread_db* /*tdbb*/, MemoryPool& pool, CompilerScratch EraseNode* node = FB_NEW_POOL(pool) EraseNode(pool); node->stream = csb->csb_rpt[n].csb_stream; + if (csb->csb_blr_reader.peekByte() == blr_marks) + node->marks |= PAR_marks(csb); + return node; } @@ -2292,6 +2301,7 @@ StmtNode* EraseNode::dsqlPass(DsqlCompilerScratch* dsqlScratch) if (dsqlCursorName.hasData() && dsqlScratch->isPsql()) { node->dsqlContext = dsqlPassCursorContext(dsqlScratch, dsqlCursorName, relation); + node->marks |= StmtNode::MARK_POSITIONED; // Process old context values. dsqlScratch->context->push(node->dsqlContext); @@ -2315,7 +2325,10 @@ StmtNode* EraseNode::dsqlPass(DsqlCompilerScratch* dsqlScratch) RseNode* rse; if (dsqlCursorName.hasData()) + { rse = dsqlPassCursorReference(dsqlScratch, dsqlCursorName, relation); + node->marks |= StmtNode::MARK_POSITIONED; + } else { rse = FB_NEW_POOL(dsqlScratch->getPool()) RseNode(dsqlScratch->getPool()); @@ -2369,6 +2382,7 @@ string EraseNode::internalPrint(NodePrinter& printer) const NODE_PRINT(printer, statement); NODE_PRINT(printer, subStatement); NODE_PRINT(printer, stream); + NODE_PRINT(printer, marks); return "EraseNode"; } @@ -2379,42 +2393,25 @@ void EraseNode::genBlr(DsqlCompilerScratch* dsqlScratch) const dsql_ctx* context; if (dsqlContext) - { context = dsqlContext; - - if (statement) - { - dsqlScratch->appendUChar(blr_begin); - statement->genBlr(dsqlScratch); - dsqlScratch->appendUChar(blr_erase); - GEN_stuff_context(dsqlScratch, context); - dsqlScratch->appendUChar(blr_end); - } - else - { - dsqlScratch->appendUChar(blr_erase); - GEN_stuff_context(dsqlScratch, context); - } - } else - { context = dsqlRelation->dsqlContext; - if (statement) - { - dsqlScratch->appendUChar(blr_begin); - statement->genBlr(dsqlScratch); - dsqlScratch->appendUChar(blr_erase); - GEN_stuff_context(dsqlScratch, context); - dsqlScratch->appendUChar(blr_end); - } - else - { - dsqlScratch->appendUChar(blr_erase); - GEN_stuff_context(dsqlScratch, context); - } + if (statement) + { + dsqlScratch->appendUChar(blr_begin); + statement->genBlr(dsqlScratch); } + dsqlScratch->appendUChar(blr_erase); + GEN_stuff_context(dsqlScratch, context); + + if (marks) + dsqlScratch->putBlrMarkers(marks); + + if (statement) + dsqlScratch->appendUChar(blr_end); + if (message) dsqlScratch->appendUChar(blr_end); } @@ -2555,6 +2552,9 @@ EraseNode* EraseNode::pass2(thread_db* tdbb, CompilerScratch* csb) } } + if (!(marks & StmtNode::MARK_POSITIONED)) + forNode = pass2FindForNode(parentStmt, stream); + impureOffset = CMP_impure(csb, sizeof(SLONG)); csb->csb_rpt[stream].csb_flags |= csb_update; @@ -2567,6 +2567,7 @@ const StmtNode* EraseNode::execute(thread_db* tdbb, jrd_req* request, ExeState* if (request->req_operation == jrd_req::req_unwind) retNode = parentStmt; + else if (request->req_operation == jrd_req::req_return && subStatement) { if (!exeState->topNode) @@ -2644,6 +2645,12 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, jrd_req* request, WhichTrigger request->req_operation = jrd_req::req_return; RLCK_reserve_relation(tdbb, transaction, relation, true); + if (forNode && forNode->isWriteLockMode(request)) + { + forceWriteLock(tdbb, rpb, transaction); + return parentStmt; + } + // If the stream was sorted, the various fields in the rpb are probably junk. // Just to make sure that everything is cool, refetch and release the record. @@ -2667,7 +2674,20 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, jrd_req* request, WhichTrigger VirtualTable::erase(tdbb, rpb); else if (!relation->rel_view_rse) { - VIO_erase(tdbb, rpb, transaction); + // VIO_erase returns false if there is an update conflict in Read Consistency + // transaction. Before returning false it disables statement-level snapshot + // (via setting req_update_conflict flag) so re-fetch should see new data. + + if (!VIO_erase(tdbb, rpb, transaction)) + { + forceWriteLock(tdbb, rpb, transaction); + + if (!forNode) + restartRequest(request, transaction); + + forNode->setWriteLockMode(request); + return parentStmt; + } REPL_erase(tdbb, rpb, transaction); } @@ -3980,7 +4000,14 @@ const StmtNode* InAutonomousTransactionNode::execute(thread_db* tdbb, jrd_req* r jrd_tra* const org_transaction = request->req_transaction; fb_assert(tdbb->getTransaction() == org_transaction); - jrd_tra* const transaction = TRA_start(tdbb, org_transaction->tra_flags, + + ULONG transaction_flags = org_transaction->tra_flags; + + // Replace Read Consistency by Concurrecy isolation mode + if (transaction_flags & TRA_read_consistency) + transaction_flags &= ~(TRA_read_committed | TRA_read_consistency); + + jrd_tra* const transaction = TRA_start(tdbb, transaction_flags, org_transaction->tra_lock_timeout, org_transaction); @@ -4005,12 +4032,6 @@ const StmtNode* InAutonomousTransactionNode::execute(thread_db* tdbb, jrd_req* r const Savepoint* const savepoint = transaction->startSavepoint(); impure->savNumber = savepoint->getNumber(); - if ((transaction->tra_flags & TRA_read_committed) && - (transaction->tra_flags & TRA_read_consistency)) - { - TRA_setup_request_snapshot(tdbb, request, true); - } - return action; } @@ -4022,16 +4043,6 @@ const StmtNode* InAutonomousTransactionNode::execute(thread_db* tdbb, jrd_req* r fb_assert(transaction->tra_number == impure->traNumber); - if (request->req_operation == jrd_req::req_return || - request->req_operation == jrd_req::req_unwind) - { - if ((transaction->tra_flags & TRA_read_committed) && - (transaction->tra_flags & TRA_read_consistency)) - { - TRA_release_request_snapshot(tdbb, request); - } - } - switch (request->req_operation) { case jrd_req::req_return: @@ -4827,6 +4838,9 @@ DmlNode* ForNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* csb, { ForNode* node = FB_NEW_POOL(pool) ForNode(pool); + if (csb->csb_blr_reader.peekByte() == blr_marks) + node->forUpdate = (PAR_marks(csb) & StmtNode::MARK_FOR_UPDATE) != 0; + if (csb->csb_blr_reader.peekByte() == (UCHAR) blr_stall) node->stall = PAR_parse_stmt(tdbb, csb); @@ -4921,6 +4935,8 @@ string ForNode::internalPrint(NodePrinter& printer) const NODE_PRINT(printer, statement); NODE_PRINT(printer, cursor); NODE_PRINT(printer, parBlrBeginCnt); + NODE_PRINT(printer, forUpdate); + NODE_PRINT(printer, withLock); return "ForNode"; } @@ -4939,6 +4955,9 @@ void ForNode::genBlr(DsqlCompilerScratch* dsqlScratch) dsqlScratch->appendUChar(blr_for); + if (forUpdate) + dsqlScratch->putBlrMarkers(StmtNode::MARK_FOR_UPDATE); + if (!statement || dsqlForceSingular) dsqlScratch->appendUChar(blr_singular); @@ -5004,7 +5023,10 @@ StmtNode* ForNode::pass2(thread_db* tdbb, CompilerScratch* csb) // as implicit cursors are always positioned in a valid record, and the name is // only used to raise isc_cursor_not_positioned. - impureOffset = CMP_impure(csb, sizeof(SavNumber)); + if (rse->flags & RseNode::FLAG_WRITELOCK) + withLock = true; + + impureOffset = CMP_impure(csb, sizeof(Impure)); return this; } @@ -5012,17 +5034,21 @@ StmtNode* ForNode::pass2(thread_db* tdbb, CompilerScratch* csb) const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /*exeState*/) const { jrd_tra* transaction = request->req_transaction; + Impure* impure = request->getImpure(impureOffset); switch (request->req_operation) { case jrd_req::req_evaluate: - *request->getImpure(impureOffset) = 0; + // initialize impure values + impure->savepoint = 0; + impure->writeLockMode = false; + if (!(transaction->tra_flags & TRA_system) && transaction->tra_save_point && transaction->tra_save_point->hasChanges()) { const Savepoint* const savepoint = transaction->startSavepoint(); - *request->getImpure(impureOffset) = savepoint->getNumber(); + impure->savepoint = savepoint->getNumber(); } cursor->open(tdbb); request->req_records_affected.clear(); @@ -5034,11 +5060,32 @@ const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /* // fall into case jrd_req::req_sync: - if (cursor->fetchNext(tdbb)) { - request->req_operation = jrd_req::req_evaluate; - return statement; + const bool fetched = cursor->fetchNext(tdbb); + if (withLock) + { + const jrd_req* top_request = request->req_snapshot.m_owner; + if ((top_request) && (top_request->req_flags & req_update_conflict)) + impure->writeLockMode = true; + } + + if (fetched) + { + if (impure->writeLockMode && withLock) + { + // Skip statement execution and fetch (and try to lock) next record. + request->req_operation = jrd_req::req_sync; + return this; + } + + request->req_operation = jrd_req::req_evaluate; + return statement; + } } + + if (impure->writeLockMode) + restartRequest(request, transaction); + request->req_operation = jrd_req::req_return; // fall into @@ -5059,7 +5106,7 @@ const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /* default: { - const SavNumber savNumber = *request->getImpure(impureOffset); + const SavNumber savNumber = impure->savepoint; if (savNumber) { @@ -5083,6 +5130,21 @@ const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /* } +bool ForNode::isWriteLockMode(jrd_req* request) const +{ + const Impure* impure = request->getImpure(impureOffset); + return impure->writeLockMode; +} + + +void ForNode::setWriteLockMode(jrd_req* request) const +{ + Impure* impure = request->getImpure(impureOffset); + fb_assert(!impure->writeLockMode); + + impure->writeLockMode = true; +} + //-------------------- @@ -5474,6 +5536,8 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch) if (returning) forNode->dsqlForceSingular = true; + forNode->forUpdate = true; + // Get the already processed relations. RseNode* processedRse = nodeAs(forNode->rse->dsqlStreams->items[0]); source = processedRse->dsqlStreams->items[0]; @@ -5511,6 +5575,7 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch) // Build the MODIFY node. ModifyNode* modify = FB_NEW_POOL(pool) ModifyNode(pool); + modify->marks |= StmtNode::MARK_MERGE; thisIf->trueAction = modify; dsql_ctx* const oldContext = dsqlGetContext(target); @@ -5599,6 +5664,7 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch) { // Build the DELETE node. EraseNode* erase = FB_NEW_POOL(pool) EraseNode(pool); + erase->marks |= StmtNode::MARK_MERGE; thisIf->trueAction = erase; dsql_ctx* context = dsqlGetContext(target); @@ -5663,6 +5729,7 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch) // Build the INSERT node. StoreNode* store = FB_NEW_POOL(pool) StoreNode(pool); + // TODO: store->marks |= StmtNode::MARK_MERGE; store->dsqlRelation = relation; store->dsqlFields = notMatched->fields; store->dsqlValues = notMatched->values; @@ -5956,6 +6023,9 @@ DmlNode* ModifyNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* c node->orgStream = orgStream; node->newStream = newStream; + if (csb->csb_blr_reader.peekByte() == blr_marks) + node->marks |= PAR_marks(csb); + AutoSetRestore autoCurrentDMLNode(&csb->csb_currentDMLNode, node); node->statement = PAR_parse_stmt(tdbb, csb); @@ -5997,6 +6067,7 @@ StmtNode* ModifyNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch, bool up if (dsqlCursorName.hasData() && dsqlScratch->isPsql()) { node->dsqlContext = dsqlPassCursorContext(dsqlScratch, dsqlCursorName, relation); + node->marks |= StmtNode::MARK_POSITIONED; // Process old context values. dsqlScratch->context->push(node->dsqlContext); @@ -6077,6 +6148,7 @@ StmtNode* ModifyNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch, bool up { rse = dsqlPassCursorReference(dsqlScratch, dsqlCursorName, relation); old_context = rse->dsqlStreams->items[0]->dsqlContext; + node->marks |= StmtNode::MARK_POSITIONED; } else { @@ -6177,6 +6249,7 @@ string ModifyNode::internalPrint(NodePrinter& printer) const NODE_PRINT(printer, mapView); NODE_PRINT(printer, orgStream); NODE_PRINT(printer, newStream); + NODE_PRINT(printer, marks); return "ModifyNode"; } @@ -6202,6 +6275,10 @@ void ModifyNode::genBlr(DsqlCompilerScratch* dsqlScratch) GEN_stuff_context(dsqlScratch, context); context = dsqlRelation->dsqlContext; GEN_stuff_context(dsqlScratch, context); + + if (marks) + dsqlScratch->putBlrMarkers(marks); + statement->genBlr(dsqlScratch); if (statement2) @@ -6381,6 +6458,9 @@ ModifyNode* ModifyNode::pass2(thread_db* tdbb, CompilerScratch* csb) SBM_SET(tdbb->getDefaultPool(), &csb->csb_rpt[orgStream].csb_fields, id); } + if (!(marks & StmtNode::MARK_POSITIONED)) + forNode = pass2FindForNode(parentStmt, orgStream); + impureOffset = CMP_impure(csb, sizeof(impure_state)); return this; @@ -6450,7 +6530,12 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, jrd_req* request, WhichTrigg { case jrd_req::req_evaluate: request->req_records_affected.bumpModified(false); - break; + + if (impure->sta_state == 0 && forNode && forNode->isWriteLockMode(request)) + request->req_operation = jrd_req::req_return; + // fall thru + else + break; case jrd_req::req_return: if (impure->sta_state == 1) @@ -6465,6 +6550,12 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, jrd_req* request, WhichTrigg if (impure->sta_state == 0) { + if (forNode && forNode->isWriteLockMode(request)) + { + forceWriteLock(tdbb, orgRpb, transaction); + return parentStmt; + } + // CVC: This call made here to clear the record in each NULL field and // varchar field whose tail may contain garbage. cleanupRpb(tdbb, newRpb); @@ -6483,7 +6574,20 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, jrd_req* request, WhichTrigg VirtualTable::modify(tdbb, orgRpb, newRpb); else if (!relation->rel_view_rse) { - VIO_modify(tdbb, orgRpb, newRpb, transaction); + // VIO_modify returns false if there is an update conflict in Read Consistency + // transaction. Before returning false it disables statement-level snapshot + // (via setting req_update_conflict flag) so re-fetch should see new data. + + if (!VIO_modify(tdbb, orgRpb, newRpb, transaction)) + { + forceWriteLock(tdbb, orgRpb, transaction); + + if (!forNode) + restartRequest(request, transaction); + + forNode->setWriteLockMode(request); + return parentStmt; + } IDX_modify(tdbb, orgRpb, newRpb, transaction); REPL_modify(tdbb, orgRpb, newRpb, transaction); } @@ -8833,6 +8937,7 @@ static const dsql_msg* dsqlGenDmlHeader(DsqlCompilerScratch* dsqlScratch, RseNod if (dsqlRse) { dsqlScratch->appendUChar(blr_for); + dsqlScratch->putBlrMarkers(StmtNode::MARK_FOR_UPDATE); GEN_expr(dsqlScratch, dsqlRse); } @@ -9514,6 +9619,20 @@ static void cleanupRpb(thread_db* tdbb, record_param* rpb) } } +// Try to set write lock on record until success or record exists +static void forceWriteLock(thread_db * tdbb, record_param * rpb, jrd_tra * transaction) +{ + while (VIO_refetch_record(tdbb, rpb, transaction, true, true)) + { + rpb->rpb_runtime_flags &= ~RPB_refetch; + + // VIO_writelock returns false if record has been deleted or modified + // by someone else. + if (VIO_writelock(tdbb, rpb, transaction)) + break; + } +} + // Build a validation list for a relation, if appropriate. static void makeValidation(thread_db* tdbb, CompilerScratch* csb, StreamType stream, Array& validations) @@ -9717,6 +9836,23 @@ static void pass1Validations(thread_db* tdbb, CompilerScratch* csb, Array(node)) + node = node->parentStmt; + + ForNode* forNode = nodeAs(node); + if (forNode && forNode->rse->containsStream(stream)) + { + //fb_assert(forNode->forUpdate == true); + if (forNode->forUpdate) + return forNode; + } + + return nullptr; +}; + // Inherit access to triggers to be fired. // // When we detect that a trigger could be fired by a request, @@ -9875,6 +10011,19 @@ static void preprocessAssignments(thread_db* tdbb, CompilerScratch* csb, } } +static void restartRequest(const jrd_req* request, jrd_tra* transaction) +{ + const jrd_req* top_request = request->req_snapshot.m_owner; + fb_assert(top_request); + fb_assert(top_request->req_flags & req_update_conflict); + + transaction->tra_flags |= TRA_ex_restart; + + ERR_post(Arg::Gds(isc_deadlock) << + Arg::Gds(isc_update_conflict) << + Arg::Gds(isc_concurrent_transaction) << Arg::Int64(top_request->req_conflict_txn)); +} + // Execute a list of validation expressions. static void validateExpressions(thread_db* tdbb, const Array& validations) { diff --git a/src/dsql/StmtNodes.h b/src/dsql/StmtNodes.h index 484f7d64d0..de57a00c7a 100644 --- a/src/dsql/StmtNodes.h +++ b/src/dsql/StmtNodes.h @@ -36,6 +36,7 @@ namespace Jrd { class CompoundStmtNode; class ExecBlockNode; +class ForNode; class PlanNode; class RelationSourceNode; class SelectNode; @@ -565,7 +566,8 @@ public: dsqlContext(NULL), statement(NULL), subStatement(NULL), - stream(0) + stream(0), + marks(0) { } @@ -596,6 +598,8 @@ public: NestConst statement; NestConst subStatement; StreamType stream; + NestConst forNode; // parent implicit cursor, if present + unsigned marks; // see StmtNode::IUD_MARK_xxx }; @@ -914,7 +918,9 @@ public: rse(NULL), statement(NULL), cursor(NULL), - parBlrBeginCnt(0) + parBlrBeginCnt(0), + forUpdate(false), + withLock(false) { } @@ -928,7 +934,16 @@ public: virtual StmtNode* pass2(thread_db* tdbb, CompilerScratch* csb); virtual const StmtNode* execute(thread_db* tdbb, jrd_req* request, ExeState* exeState) const; + bool isWriteLockMode(jrd_req* request) const; + void setWriteLockMode(jrd_req* request) const; + public: + struct Impure + { + SavNumber savepoint; + bool writeLockMode; // true - driven statement (UPDATE\DELETE\SELECT WITH LOCK) works in "write lock" mode, false - normal mode + }; + NestConst dsqlSelect; NestConst dsqlInto; DeclareCursorNode* dsqlCursor; @@ -940,6 +955,8 @@ public: NestConst statement; NestConst cursor; int parBlrBeginCnt; + bool forUpdate; // part of UPDATE\DELETE\MERGE statement + bool withLock; // part of SELECT ... WITH LOCK statement }; @@ -1154,7 +1171,8 @@ public: validations(pool), mapView(NULL), orgStream(0), - newStream(0) + newStream(0), + marks(0) { } @@ -1191,6 +1209,8 @@ public: NestConst mapView; StreamType orgStream; StreamType newStream; + NestConst forNode; // parent implicit cursor, if present + unsigned marks; // see StmtNode::IUD_MARK_xxx }; diff --git a/src/dsql/dsql.cpp b/src/dsql/dsql.cpp index b78a61b2dd..5dea42e7f6 100644 --- a/src/dsql/dsql.cpp +++ b/src/dsql/dsql.cpp @@ -286,7 +286,10 @@ bool DsqlDmlRequest::fetch(thread_db* tdbb, UCHAR* msgBuffer) tdbb->checkCancelState(true); UCHAR* dsqlMsgBuffer = req_msg_buffers[message->msg_buffer_number]; - JRD_receive(tdbb, req_request, message->msg_number, message->msg_length, dsqlMsgBuffer); + if (prefetchedFirstRow) + prefetchedFirstRow = false; + else + JRD_receive(tdbb, req_request, message->msg_number, message->msg_length, dsqlMsgBuffer); const dsql_par* const eof = statement->getEof(); const USHORT* eofPtr = eof ? (USHORT*) (dsqlMsgBuffer + (IPTR) eof->par_desc.dsc_address) : NULL; @@ -677,33 +680,14 @@ void DsqlDmlRequest::dsqlPass(thread_db* tdbb, DsqlCompilerScratch* scratch, boo *destroyScratchPool = true; } -// Execute a dynamic SQL statement. -void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle, +// Execute a dynamic SQL statement +void DsqlDmlRequest::doExecute(thread_db* tdbb, jrd_tra** traHandle, Firebird::IMessageMetadata* inMetadata, const UCHAR* inMsg, Firebird::IMessageMetadata* outMetadata, UCHAR* outMsg, bool singleton) { - if (!req_request) - { - ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-504) << - Arg::Gds(isc_unprepared_stmt)); - } - - // If there is no data required, just start the request - + prefetchedFirstRow = false; const dsql_msg* message = statement->getSendMsg(); - if (message) - mapInOut(tdbb, false, message, inMetadata, NULL, inMsg); - - // we need to mapInOut() before tracing of execution start to let trace - // manager know statement parameters values - TraceDSQLExecute trace(req_dbb->dbb_attachment, this); - - // Setup and start timeout timer - const bool have_cursor = reqTypeWithCursor(statement->getType()) && !singleton; - - setupTimer(tdbb); - thread_db::TimerGuard timerGuard(tdbb, req_timer, !have_cursor); if (!message) JRD_start(tdbb, req_request, req_transaction); @@ -805,6 +789,18 @@ void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle, status_exception::raise(&localStatus); } } + else + { + // Prefetch first row of a query + if (reqTypeWithCursor(statement->getType())) + { + dsql_msg* message = (dsql_msg*) statement->getReceiveMsg(); + + UCHAR* dsqlMsgBuffer = req_msg_buffers[message->msg_buffer_number]; + JRD_receive(tdbb, req_request, message->msg_number, message->msg_length, dsqlMsgBuffer); + prefetchedFirstRow = true; + } + } switch (statement->getType()) { @@ -826,6 +822,97 @@ void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle, } break; } +} + +// Execute a dynamic SQL statement with tracing, restart and timeout handler +void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle, + Firebird::IMessageMetadata* inMetadata, const UCHAR* inMsg, + Firebird::IMessageMetadata* outMetadata, UCHAR* outMsg, + bool singleton) +{ + if (!req_request) + { + ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-504) << + Arg::Gds(isc_unprepared_stmt)); + } + + // If there is no data required, just start the request + + const dsql_msg* message = statement->getSendMsg(); + if (message) + mapInOut(tdbb, false, message, inMetadata, NULL, inMsg); + + // we need to mapInOut() before tracing of execution start to let trace + // manager know statement parameters values + TraceDSQLExecute trace(req_dbb->dbb_attachment, this); + + // Setup and start timeout timer + const bool have_cursor = reqTypeWithCursor(statement->getType()) && !singleton; + + setupTimer(tdbb); + thread_db::TimerGuard timerGuard(tdbb, req_timer, !have_cursor); + + if (req_transaction && (req_transaction->tra_flags & TRA_read_consistency) && + statement->getType() != DsqlCompiledStatement::TYPE_SAVEPOINT) + { + AutoSavePoint savePoint(tdbb, req_transaction); + req_request->req_flags &= ~req_update_conflict; + int numTries = 0; + while (true) + { + AutoSetRestoreFlag restartReady(&req_request->req_flags, req_restart_ready, true); + try + { + doExecute(tdbb, traHandle, inMetadata, inMsg, outMetadata, outMsg, singleton); + } + catch (const status_exception&) + { + if (!(req_transaction->tra_flags & TRA_ex_restart)) + { + req_request->req_flags &= ~req_update_conflict; + throw; + } + } + + if (!(req_request->req_flags & req_update_conflict)) + { + fb_assert((req_transaction->tra_flags & TRA_ex_restart) == 0); + req_transaction->tra_flags &= ~TRA_ex_restart; + +#ifdef DEV_BUILD + if (numTries > 0) + { + string s; + s.printf("restarts = %d", numTries); + + ERRD_post_warning(Arg::Warning(isc_random) << Arg::Str(s)); + } +#endif + break; + } + + fb_assert((req_transaction->tra_flags & TRA_ex_restart) != 0); + + req_request->req_flags &= ~req_update_conflict; + req_transaction->tra_flags &= ~TRA_ex_restart; + fb_utils::init_status(tdbb->tdbb_status_vector); + + if (numTries >= 10) + { + gds__log("Update conflict: unable to get a stable set of rows in the source tables"); + ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-913) << + Arg::Gds(isc_deadlock) << + Arg::Gds(isc_update_conflict) << + Arg::Gds(isc_concurrent_transaction) << Arg::Num(req_request->req_conflict_txn)); + } + req_transaction->rollbackSavepoint(tdbb, true); + req_transaction->startSavepoint(tdbb); + numTries++; + } + savePoint.release(); // everything is ok + } else { + doExecute(tdbb, traHandle, inMetadata, inMsg, outMetadata, outMsg, singleton); + } trace.finish(have_cursor, ITracePlugin::RESULT_SUCCESS); } @@ -2026,7 +2113,15 @@ static void sql_info(thread_db* tdbb, { const bool detailed = (item == isc_info_sql_explain_plan); string plan = OPT_get_plan(tdbb, request->req_request, detailed); - +#ifdef DEV_BUILD + if (!detailed) + { + NodePrinter printer; + request->req_request->getStatement()->topNode->print(printer); + plan += "\n--------\n"; + plan += printer.getText(); + } +#endif if (plan.hasData()) { // 1-byte item + 2-byte length + isc_info_end/isc_info_truncated == 4 diff --git a/src/dsql/dsql.h b/src/dsql/dsql.h index 2cbb27d8e3..ba21fadc6b 100644 --- a/src/dsql/dsql.h +++ b/src/dsql/dsql.h @@ -652,7 +652,8 @@ public: explicit DsqlDmlRequest(MemoryPool& pool, StmtNode* aNode) : dsql_req(pool), node(aNode), - needDelayedFormat(false) + needDelayedFormat(false), + prefetchedFirstRow(false) { } @@ -671,9 +672,14 @@ public: virtual void setDelayedFormat(thread_db* tdbb, Firebird::IMessageMetadata* metadata); private: + void doExecute(thread_db* tdbb, jrd_tra** traHandle, + Firebird::IMessageMetadata* inMetadata, const UCHAR* inMsg, + Firebird::IMessageMetadata* outMetadata, UCHAR* outMsg, + bool singleton); NestConst node; Firebird::RefPtr delayedFormat; bool needDelayedFormat; + bool prefetchedFirstRow; }; class DsqlDdlRequest : public dsql_req diff --git a/src/include/firebird/impl/blr.h b/src/include/firebird/impl/blr.h index 0bbc0b965b..1d30a4a5ad 100644 --- a/src/include/firebird/impl/blr.h +++ b/src/include/firebird/impl/blr.h @@ -443,4 +443,6 @@ #define blr_at_local (unsigned char) 0 #define blr_at_zone (unsigned char) 1 +#define blr_marks (unsigned char) 217 // mark some blr code with specific flags + #endif // FIREBIRD_IMPL_BLR_H diff --git a/src/jrd/DebugInterface.cpp b/src/jrd/DebugInterface.cpp index 14a6e50193..79de9e8147 100644 --- a/src/jrd/DebugInterface.cpp +++ b/src/jrd/DebugInterface.cpp @@ -10,10 +10,10 @@ * See the License for the specific language governing rights * and limitations under the License. * - * The Original Code was created by Vlad Horsun + * The Original Code was created by Vlad Khorsun * for the Firebird Open Source RDBMS project. * - * Copyright (c) 2006 Vlad Horsun + * Copyright (c) 2006 Vlad Khorsun * and all contributors signed below. * * All Rights Reserved. diff --git a/src/jrd/DebugInterface.h b/src/jrd/DebugInterface.h index f0295c4c56..f7cd7253f9 100644 --- a/src/jrd/DebugInterface.h +++ b/src/jrd/DebugInterface.h @@ -10,10 +10,10 @@ * See the License for the specific language governing rights * and limitations under the License. * - * The Original Code was created by Vlad Horsun + * The Original Code was created by Vlad Khorsun * for the Firebird Open Source RDBMS project. * - * Copyright (c) 2006 Vlad Horsun + * Copyright (c) 2006 Vlad Khorsun * and all contributors signed below. * * All Rights Reserved. diff --git a/src/jrd/RecordSourceNodes.cpp b/src/jrd/RecordSourceNodes.cpp index 597b194af9..1a6ddd2b97 100644 --- a/src/jrd/RecordSourceNodes.cpp +++ b/src/jrd/RecordSourceNodes.cpp @@ -550,6 +550,8 @@ string RelationSourceNode::internalPrint(NodePrinter& printer) const NODE_PRINT(printer, dsqlName); NODE_PRINT(printer, alias); NODE_PRINT(printer, context); + if (relation) + printer.print("rel_name", relation->rel_name); return "RelationSourceNode"; } diff --git a/src/jrd/Savepoint.cpp b/src/jrd/Savepoint.cpp index 3cd31a5f51..19b898bf33 100644 --- a/src/jrd/Savepoint.cpp +++ b/src/jrd/Savepoint.cpp @@ -234,7 +234,7 @@ void VerbAction::mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* next release(transaction); } -void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction) +void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks, VerbAction* preserveAction) { // Undo changes recorded for this verb action. // After that, clear the verb action and prepare it for later reuse. @@ -258,7 +258,7 @@ void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction) if (!DPM_get(tdbb, &rpb, LCK_read)) BUGCHECK(186); // msg 186 record disappeared - if (have_undo && !(rpb.rpb_flags & rpb_deleted)) + if ((have_undo || preserveLocks) && !(rpb.rpb_flags & rpb_deleted)) VIO_data(tdbb, &rpb, transaction->tra_pool); else CCH_RELEASE(tdbb, &rpb.getWindow(tdbb)); @@ -267,7 +267,50 @@ void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction) BUGCHECK(185); // msg 185 wrong record version if (!have_undo) - VIO_backout(tdbb, &rpb, transaction); + { + if (preserveLocks && rpb.rpb_b_page) + { + // Fetch previous record version and update in place current version with it + record_param temp = rpb; + temp.rpb_page = rpb.rpb_b_page; + temp.rpb_line = rpb.rpb_b_line; + temp.rpb_record = NULL; + + if (temp.rpb_flags & rpb_delta) + fb_assert(temp.rpb_prior != NULL); + else + fb_assert(temp.rpb_prior == NULL); + + if (!DPM_fetch(tdbb, &temp, LCK_read)) + BUGCHECK(291); // msg 291 cannot find record back version + + if (!(temp.rpb_flags & rpb_chained) || (temp.rpb_flags & (rpb_blob | rpb_fragment))) + ERR_bugcheck_msg("invalid back version"); + + VIO_data(tdbb, &temp, tdbb->getDefaultPool()); + + Record* const save_record = rpb.rpb_record; + if (rpb.rpb_flags & rpb_deleted) + rpb.rpb_record = NULL; + Record* const dead_record = rpb.rpb_record; + + VIO_update_in_place(tdbb, transaction, &rpb, &temp); + + if (dead_record) + { + rpb.rpb_record = NULL; // VIO_garbage_collect_idx will play with this record dirty tricks + VIO_garbage_collect_idx(tdbb, transaction, &rpb, dead_record); + } + rpb.rpb_record = save_record; + + delete temp.rpb_record; + + if (preserveAction) + RBM_SET(transaction->tra_pool, &preserveAction->vct_records, rpb.rpb_number.getValue()); + } + else + VIO_backout(tdbb, &rpb, transaction); + } else { AutoUndoRecord record(vct_undo->current().setupRecord(transaction)); @@ -378,7 +421,7 @@ void Savepoint::cleanupTempData() } } -Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior) +Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior, bool preserveLocks) { // Undo changes made in this savepoint. // Perform index and BLOB cleanup if needed. @@ -398,8 +441,12 @@ Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior) while (m_actions) { VerbAction* const action = m_actions; + VerbAction* preserveAction = nullptr; - action->undo(tdbb, m_transaction); + if (preserveLocks && m_next) + preserveAction = m_next->createAction(action->vct_relation); + + action->undo(tdbb, m_transaction, preserveLocks, preserveAction); m_actions = action->vct_next; action->vct_next = m_freeActions; diff --git a/src/jrd/Savepoint.h b/src/jrd/Savepoint.h index b70632701c..81340de405 100644 --- a/src/jrd/Savepoint.h +++ b/src/jrd/Savepoint.h @@ -94,7 +94,8 @@ namespace Jrd UndoItemTree* vct_undo; // Data for undo records void mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* nextAction); - void undo(thread_db* tdbb, jrd_tra* transaction); + void undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks, + VerbAction* preserveAction); void garbageCollectIdxLite(thread_db* tdbb, jrd_tra* transaction, SINT64 recordNumber, VerbAction* nextAction, Record* goingRecord); @@ -231,7 +232,7 @@ namespace Jrd void cleanupTempData(); - Savepoint* rollback(thread_db* tdbb, Savepoint* prior = NULL); + Savepoint* rollback(thread_db* tdbb, Savepoint* prior = NULL, bool preserveLocks = false); Savepoint* rollforward(thread_db* tdbb, Savepoint* prior = NULL); static void destroy(Savepoint*& savepoint) diff --git a/src/jrd/blp.h b/src/jrd/blp.h index da41ec60aa..a6e41a55b0 100644 --- a/src/jrd/blp.h +++ b/src/jrd/blp.h @@ -247,5 +247,6 @@ static const struct {"local_timestamp", byte_line}, {"local_time", byte_line}, {"at", verb_byte_verb}, + {"marks", byte_literal}, {0, 0} }; diff --git a/src/jrd/exe.cpp b/src/jrd/exe.cpp index 029d540801..eefdef2bb4 100644 --- a/src/jrd/exe.cpp +++ b/src/jrd/exe.cpp @@ -877,7 +877,7 @@ void EXE_start(thread_db* tdbb, jrd_req* request, jrd_tra* transaction) TRA_post_resources(tdbb, transaction, statement->resources); TRA_attach_request(transaction, request); - request->req_flags &= req_in_use; + request->req_flags &= req_in_use | req_restart_ready; request->req_flags |= req_active; request->req_flags &= ~req_reserved; diff --git a/src/jrd/jrd.cpp b/src/jrd/jrd.cpp index 2331f7c24d..638021c216 100644 --- a/src/jrd/jrd.cpp +++ b/src/jrd/jrd.cpp @@ -8970,39 +8970,8 @@ void JRD_start(Jrd::thread_db* tdbb, jrd_req* request, jrd_tra* transaction) * Get a record from the host program. * **************************************/ - - // Repeat execution to handle update conflicts, if any - int numTries = 0; - while (true) - { - try - { - EXE_unwind(tdbb, request); - EXE_start(tdbb, request, transaction); - break; - } - catch (const status_exception &ex) - { - const ISC_STATUS* v = ex.value(); - if (// Update conflict error - v[0] == isc_arg_gds && - v[1] == isc_update_conflict && - // Read committed transaction with snapshots - (transaction->tra_flags & TRA_read_committed) && - (transaction->tra_flags & TRA_read_consistency) && - // Snapshot has been assigned to the request - - // it was top-level request - !TRA_get_prior_request(tdbb)) - { - if (++numTries < 10) - { - fb_utils::init_status(tdbb->tdbb_status_vector); - continue; - } - } - throw; - } - } + EXE_unwind(tdbb, request); + EXE_start(tdbb, request, transaction); check_autocommit(tdbb, request); @@ -9091,40 +9060,9 @@ void JRD_start_and_send(thread_db* tdbb, jrd_req* request, jrd_tra* transaction, * Get a record from the host program. * **************************************/ - - // Repeat execution to handle update conflicts, if any - int numTries = 0; - while (true) - { - try - { - EXE_unwind(tdbb, request); - EXE_start(tdbb, request, transaction); - EXE_send(tdbb, request, msg_type, msg_length, msg); - break; - } - catch (const status_exception &ex) - { - const ISC_STATUS* v = ex.value(); - if (// Update conflict error - v[0] == isc_arg_gds && - v[1] == isc_update_conflict && - // Read committed transaction with snapshots - (transaction->tra_flags & TRA_read_committed) && - (transaction->tra_flags & TRA_read_consistency) && - // Snapshot has been assigned to the request - - // it was top-level request - !TRA_get_prior_request(tdbb)) - { - if (++numTries < 10) - { - fb_utils::init_status(tdbb->tdbb_status_vector); - continue; - } - } - throw; - } - } + EXE_unwind(tdbb, request); + EXE_start(tdbb, request, transaction); + EXE_send(tdbb, request, msg_type, msg_length, msg); check_autocommit(tdbb, request); diff --git a/src/jrd/par.cpp b/src/jrd/par.cpp index c97eedab97..c294285637 100644 --- a/src/jrd/par.cpp +++ b/src/jrd/par.cpp @@ -693,6 +693,26 @@ CompoundStmtNode* PAR_make_list(thread_db* tdbb, StmtNodeStack& stack) } +ULONG PAR_marks(Jrd::CompilerScratch* csb) +{ + if (csb->csb_blr_reader.getByte() != blr_marks) + PAR_syntax_error(csb, "blr_marks"); + + switch (csb->csb_blr_reader.getByte()) + { + case 1: + return csb->csb_blr_reader.getByte(); + + case 2: + return csb->csb_blr_reader.getWord(); + + case 4: + return csb->csb_blr_reader.getLong(); + } + PAR_syntax_error(csb, "valid length for blr_marks value (1, 2, or 4)"); + return 0; +} + CompilerScratch* PAR_parse(thread_db* tdbb, const UCHAR* blr, ULONG blr_length, bool internal_flag, ULONG dbginfo_length, const UCHAR* dbginfo) { @@ -1637,6 +1657,7 @@ void PAR_syntax_error(CompilerScratch* csb, const TEXT* string) csb->csb_blr_reader.seekBackward(1); + // BLR syntax error: expected @1 at offset @2, encountered @3 PAR_error(csb, Arg::Gds(isc_syntaxerr) << Arg::Str(string) << Arg::Num(csb->csb_blr_reader.getOffset()) << Arg::Num(csb->csb_blr_reader.peekByte())); diff --git a/src/jrd/par_proto.h b/src/jrd/par_proto.h index 487f8dfe8b..13a5c8594c 100644 --- a/src/jrd/par_proto.h +++ b/src/jrd/par_proto.h @@ -61,6 +61,7 @@ SSHORT PAR_find_proc_field(const Jrd::jrd_prc*, const Firebird::MetaName&); Jrd::ValueExprNode* PAR_gen_field(Jrd::thread_db* tdbb, StreamType stream, USHORT id, bool byId = false); Jrd::ValueExprNode* PAR_make_field(Jrd::thread_db*, Jrd::CompilerScratch*, USHORT, const Firebird::MetaName&); Jrd::CompoundStmtNode* PAR_make_list(Jrd::thread_db*, Jrd::StmtNodeStack&); +ULONG PAR_marks(Jrd::CompilerScratch*); Jrd::CompilerScratch* PAR_parse(Jrd::thread_db*, const UCHAR* blr, ULONG blr_length, bool internal_flag, ULONG = 0, const UCHAR* = NULL); diff --git a/src/jrd/req.h b/src/jrd/req.h index d88ca0b593..d4c86d04b0 100644 --- a/src/jrd/req.h +++ b/src/jrd/req.h @@ -275,6 +275,7 @@ public: SINT64 req_fetch_rowcount; // Total number of rows returned by this request jrd_req* req_proc_caller; // Procedure's caller request const ValueListNode* req_proc_inputs; // and its node with input parameters + TraNumber req_conflict_txn; // Transaction number for update conflict in read consistency mode ULONG req_src_line; ULONG req_src_column; @@ -397,6 +398,8 @@ const ULONG req_continue_loop = 0x100L; // PSQL continue statement const ULONG req_proc_fetch = 0x200L; // Fetch from procedure in progress const ULONG req_same_tx_upd = 0x400L; // record was updated by same transaction const ULONG req_reserved = 0x800L; // Request reserved for client +const ULONG req_update_conflict = 0x1000L; // We need to restart request due to update conflict +const ULONG req_restart_ready = 0x2000L; // Request is ready to restat in case of update conflict // Index lock block diff --git a/src/jrd/tra.cpp b/src/jrd/tra.cpp index 03976cb94e..537605a551 100644 --- a/src/jrd/tra.cpp +++ b/src/jrd/tra.cpp @@ -145,7 +145,7 @@ jrd_req* TRA_get_prior_request(thread_db* tdbb) return org_request; } -void TRA_setup_request_snapshot(Jrd::thread_db* tdbb, Jrd::jrd_req* request, bool no_prior) +void TRA_setup_request_snapshot(Jrd::thread_db* tdbb, Jrd::jrd_req* request) { // This function is called whenever request is started in a transaction. // Setup context to preserve read consistency in READ COMMITTED transactions. @@ -160,7 +160,7 @@ void TRA_setup_request_snapshot(Jrd::thread_db* tdbb, Jrd::jrd_req* request, boo return; // See if there is any request right above us in the call stack - jrd_req* org_request = no_prior ? nullptr : TRA_get_prior_request(tdbb); + jrd_req* org_request = TRA_get_prior_request(tdbb); if (org_request && org_request->req_transaction == transaction) { @@ -1617,8 +1617,11 @@ int TRA_snapshot_state(thread_db* tdbb, const jrd_tra* trans, TraNumber number, // GC thread accesses data directly without any request if (jrd_req* current_request = tdbb->getRequest()) { - // There is no request snapshot when we build expression index - if (jrd_req* snapshot_request = current_request->req_snapshot.m_owner) + // Notes: + // 1) There is no request snapshot when we build expression index + // 2) Disable read committed snapshot after we encountered update conflict + jrd_req* snapshot_request = current_request->req_snapshot.m_owner; + if (snapshot_request && !(snapshot_request->req_flags & req_update_conflict)) { if (stateCn > snapshot_request->req_snapshot.m_number) return tra_active; @@ -3849,7 +3852,7 @@ Savepoint* jrd_tra::startSavepoint(bool root) return savepoint; } -void jrd_tra::rollbackSavepoint(thread_db* tdbb) +void jrd_tra::rollbackSavepoint(thread_db* tdbb, bool preserveLocks) /************************************** * * r o l l b a c k S a v e p o i n t @@ -3865,8 +3868,11 @@ void jrd_tra::rollbackSavepoint(thread_db* tdbb) { REPL_save_cleanup(tdbb, this, tra_save_point, true); + if (tra_flags & TRA_ex_restart) + preserveLocks = true; + Jrd::ContextPoolHolder context(tdbb, tra_pool); - tra_save_point = tra_save_point->rollback(tdbb); + tra_save_point = tra_save_point->rollback(tdbb, NULL, preserveLocks); } } diff --git a/src/jrd/tra.h b/src/jrd/tra.h index 267ee891f5..b14a04d810 100644 --- a/src/jrd/tra.h +++ b/src/jrd/tra.h @@ -387,7 +387,7 @@ public: Record* findNextUndo(VerbAction* before_this, jrd_rel* relation, SINT64 number); void listStayingUndo(jrd_rel* relation, SINT64 number, RecordStack &staying); Savepoint* startSavepoint(bool root = false); - void rollbackSavepoint(thread_db* tdbb); + void rollbackSavepoint(thread_db* tdbb, bool preserveLocks = false); void rollbackToSavepoint(thread_db* tdbb, SavNumber number); void rollforwardSavepoint(thread_db* tdbb); DbCreatorsList* getDbCreatorsList(); @@ -426,6 +426,7 @@ const ULONG TRA_no_auto_undo = 0x8000L; // don't start a savepoint in TRA_start const ULONG TRA_precommitted = 0x10000L; // transaction committed at startup const ULONG TRA_own_interface = 0x20000L; // tra_interface was created for internal needs const ULONG TRA_read_consistency = 0x40000L; // ensure read consistency in this transaction +const ULONG TRA_ex_restart = 0x80000L; // Exception was raised to restart request // flags derived from TPB, see also transaction_options() at tra.cpp const ULONG TRA_OPTIONS_MASK = (TRA_degree3 | TRA_readonly | TRA_ignore_limbo | TRA_read_committed | diff --git a/src/jrd/tra_proto.h b/src/jrd/tra_proto.h index 1ac8f9bf73..f424db449d 100644 --- a/src/jrd/tra_proto.h +++ b/src/jrd/tra_proto.h @@ -63,7 +63,7 @@ void TRA_update_counters(Jrd::thread_db*, Jrd::Database*); int TRA_wait(Jrd::thread_db* tdbb, Jrd::jrd_tra* trans, TraNumber number, Jrd::jrd_tra::wait_t wait); void TRA_attach_request(Jrd::jrd_tra* transaction, Jrd::jrd_req* request); void TRA_detach_request(Jrd::jrd_req* request); -void TRA_setup_request_snapshot(Jrd::thread_db*, Jrd::jrd_req* request, bool no_prior = false); +void TRA_setup_request_snapshot(Jrd::thread_db*, Jrd::jrd_req* request); void TRA_release_request_snapshot(Jrd::thread_db*, Jrd::jrd_req* request); Jrd::jrd_req* TRA_get_prior_request(Jrd::thread_db*); diff --git a/src/jrd/vio.cpp b/src/jrd/vio.cpp index 48e0d08c42..46d5de6bfb 100644 --- a/src/jrd/vio.cpp +++ b/src/jrd/vio.cpp @@ -1444,7 +1444,57 @@ void VIO_data(thread_db* tdbb, record_param* rpb, MemoryPool* pool) } -void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction) +static bool check_prepare_result(int prepare_result, jrd_tra* transaction, jrd_req* request, record_param* rpb) +{ +/************************************** + * + * c h e c k _ p r e p a r e _ r e s u l t + * + ************************************** + * + * Functional description + * Called by VIO_modify and VIO_erase. Raise update conflict error if not in + * read consistency transaction or lock error happens or if request is already + * in update conflict mode. In latter case set TRA_ex_restart flag to correctly + * handle request restart. + * + **************************************/ + if (prepare_result == PREPARE_OK) + return true; + + jrd_req* top_request = request->req_snapshot.m_owner; + + const bool restart_ready = top_request && + (top_request->req_flags & req_restart_ready); + + // Second update conflict when request is already in update conflict mode + // means we have some (indirect) UPDATE\DELETE in WHERE clause of primary + // cursor. In this case all we can do is restart whole request immediately. + const bool secondary = top_request && + (top_request->req_flags & req_update_conflict) && + (prepare_result != PREPARE_LOCKERR); + + if (!(transaction->tra_flags & TRA_read_consistency) || prepare_result == PREPARE_LOCKERR || + secondary || !restart_ready) + { + if (secondary) + transaction->tra_flags |= TRA_ex_restart; + + ERR_post(Arg::Gds(isc_deadlock) << + Arg::Gds(isc_update_conflict) << + Arg::Gds(isc_concurrent_transaction) << Arg::Int64(rpb->rpb_transaction_nr)); + } + + if (top_request) + { + top_request->req_flags |= req_update_conflict; + top_request->req_conflict_txn = rpb->rpb_transaction_nr; + } + return false; +} + + +bool VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction) { /************************************** * @@ -1500,7 +1550,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction) // hvlad: what if record was created\modified by user tx also, // i.e. if there is backversion ??? VIO_backout(tdbb, rpb, transaction); - return; + return true; } transaction->tra_flags |= TRA_write; @@ -1835,7 +1885,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction) if (transaction->tra_save_point && transaction->tra_save_point->isChanging()) verb_post(tdbb, transaction, rpb, rpb->rpb_undo); - return; + return true; } const bool backVersion = (rpb->rpb_b_page != 0); @@ -1854,13 +1904,9 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction) { // Update stub didn't find one page -- do a long, hard update PageStack stack; - if (prepare_update(tdbb, transaction, tid_fetch, rpb, &temp, 0, stack, false)) - { - // Cannot use Arg::Num here because transaction number is 64-bit unsigned integer - ERR_post(Arg::Gds(isc_deadlock) << - Arg::Gds(isc_update_conflict) << - Arg::Gds(isc_concurrent_transaction) << Arg::Int64(rpb->rpb_transaction_nr)); - } + int prepare_result = prepare_update(tdbb, transaction, tid_fetch, rpb, &temp, 0, stack, false); + if (!check_prepare_result(prepare_result, transaction, request, rpb)) + return false; // Old record was restored and re-fetched for write. Now replace it. @@ -1919,6 +1965,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction) { notify_garbage_collector(tdbb, rpb, transaction->tra_number); } + return true; } @@ -2723,7 +2770,7 @@ void VIO_init(thread_db* tdbb) } } -void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, jrd_tra* transaction) +bool VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, jrd_tra* transaction) { /************************************** * @@ -2789,7 +2836,7 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j { VIO_update_in_place(tdbb, transaction, org_rpb, new_rpb); tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id); - return; + return true; } check_gbak_cheating_insupd(tdbb, relation, "UPDATE"); @@ -3142,20 +3189,16 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j } tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id); - return; + return true; } const bool backVersion = (org_rpb->rpb_b_page != 0); record_param temp; PageStack stack; - if (prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb, &temp, new_rpb, - stack, false)) - { - // Cannot use Arg::Num here because transaction number is 64-bit unsigned integer - ERR_post(Arg::Gds(isc_deadlock) << - Arg::Gds(isc_update_conflict) << - Arg::Gds(isc_concurrent_transaction) << Arg::Int64(org_rpb->rpb_transaction_nr)); - } + int prepare_result = prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb, + &temp, new_rpb, stack, false); + if (!check_prepare_result(prepare_result, transaction, tdbb->getRequest(), org_rpb)) + return false; IDX_modify_flag_uk_modified(tdbb, org_rpb, new_rpb, transaction); @@ -3204,6 +3247,7 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j { notify_garbage_collector(tdbb, org_rpb, transaction->tra_number); } + return true; } @@ -4004,6 +4048,22 @@ bool VIO_writelock(thread_db* tdbb, record_param* org_rpb, jrd_tra* transaction) { case PREPARE_CONFLICT: case PREPARE_DELETE: + if ((transaction->tra_flags & TRA_read_consistency)) + { + jrd_req* top_request = tdbb->getRequest()->req_snapshot.m_owner; + if (top_request && !(top_request->req_flags & req_update_conflict)) + { + if (!(top_request->req_flags & req_restart_ready)) + { + ERR_post(Arg::Gds(isc_deadlock) << + Arg::Gds(isc_update_conflict) << + Arg::Gds(isc_concurrent_transaction) << Arg::Int64(org_rpb->rpb_transaction_nr)); + } + + top_request->req_flags |= req_update_conflict; + top_request->req_conflict_txn = org_rpb->rpb_transaction_nr; + } + } org_rpb->rpb_runtime_flags |= RPB_refetch; return false; case PREPARE_LOCKERR: @@ -5607,7 +5667,7 @@ static int prepare_update( thread_db* tdbb, delete_record(tdbb, temp, 0, NULL); - if (writelock) + if (writelock || (transaction->tra_flags & TRA_read_consistency)) { tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id); return PREPARE_DELETE; @@ -5730,16 +5790,17 @@ static int prepare_update( thread_db* tdbb, switch (state) { case tra_committed: - // We need to loop waiting in read committed with no read consistency transactions only - if (!(transaction->tra_flags & TRA_read_committed) || - (transaction->tra_flags & TRA_read_consistency)) + // For SNAPSHOT mode transactions raise error early + if (!(transaction->tra_flags & TRA_read_committed)) { tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id); // Cannot use Arg::Num here because transaction number is 64-bit unsigned integer - ERR_post(Arg::Gds(isc_update_conflict) << + ERR_post(Arg::Gds(isc_deadlock) << + Arg::Gds(isc_update_conflict) << Arg::Gds(isc_concurrent_transaction) << Arg::Int64(update_conflict_trans)); } + return PREPARE_CONFLICT; case tra_limbo: if (!(transaction->tra_flags & TRA_ignore_limbo)) diff --git a/src/jrd/vio_proto.h b/src/jrd/vio_proto.h index a833c18799..e37eaa229f 100644 --- a/src/jrd/vio_proto.h +++ b/src/jrd/vio_proto.h @@ -42,7 +42,7 @@ bool VIO_chase_record_version(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, MemoryPool*, bool, bool); void VIO_copy_record(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*); void VIO_data(Jrd::thread_db*, Jrd::record_param*, MemoryPool*); -void VIO_erase(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*); +bool VIO_erase(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*); void VIO_fini(Jrd::thread_db*); bool VIO_garbage_collect(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*); Jrd::Record* VIO_gc_record(Jrd::thread_db*, Jrd::jrd_rel*); @@ -51,7 +51,7 @@ bool VIO_get_current(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, MemoryPool*, bool, bool&); void VIO_init(Jrd::thread_db*); bool VIO_writelock(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*); -void VIO_modify(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*, Jrd::jrd_tra*); +bool VIO_modify(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*, Jrd::jrd_tra*); bool VIO_next_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, MemoryPool*, bool); Jrd::Record* VIO_record(Jrd::thread_db*, Jrd::record_param*, const Jrd::Format*, MemoryPool*); bool VIO_refetch_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, bool, bool);