8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 16:43:03 +01:00

Reworked request restart logic

This commit is contained in:
hvlad 2020-02-13 01:47:14 +02:00
parent 0b60865d60
commit 447974bd84
18 changed files with 426 additions and 139 deletions

View File

@ -10,7 +10,9 @@ need to raise an exception but do not want the database changes to be rolled-bac
If exceptions are raised inside the body of an autonomous transaction block, the changes are
rolled-back. If the block runs till the end, the transaction is committed.
The new transaction is initiated with snapshot isolation level and lock timeout of the existing one.
The new transaction is initiated with the same isolation level and lock timeout of the existing one.
The only exception is that if existing transaction run in Read Committed Read Consistency mode, then
new autonomous transaction will run in Concurrency mode.
Should be used with caution to avoid deadlocks.
Author:

View File

@ -58,21 +58,9 @@ void BlrDebugWriter::putDebugSrcInfo(ULONG line, ULONG col)
{
debugData.add(fb_dbg_map_src2blr);
debugData.add(line);
debugData.add(line >> 8);
debugData.add(line >> 16);
debugData.add(line >> 24);
debugData.add(col);
debugData.add(col >> 8);
debugData.add(col >> 16);
debugData.add(col >> 24);
const ULONG offset = (getBlrData().getCount() - getBaseOffset());
debugData.add(offset);
debugData.add(offset >> 8);
debugData.add(offset >> 16);
debugData.add(offset >> 24);
putValue(line);
putValue(col);
putBlrOffset();
}
void BlrDebugWriter::putDebugVariable(USHORT number, const MetaName& name)
@ -132,10 +120,7 @@ void BlrDebugWriter::putDebugSubFunction(DeclareSubFuncNode* subFuncNode)
HalfStaticArray<UCHAR, 128>& subDebugData = subFuncNode->blockScratch->debugData;
const ULONG count = ULONG(subDebugData.getCount());
debugData.add(UCHAR(count));
debugData.add(UCHAR(count >> 8));
debugData.add(UCHAR(count >> 16));
debugData.add(UCHAR(count >> 24));
putValue(count);
debugData.add(subDebugData.begin(), count);
}
@ -152,11 +137,29 @@ void BlrDebugWriter::putDebugSubProcedure(DeclareSubProcNode* subProcNode)
HalfStaticArray<UCHAR, 128>& subDebugData = subProcNode->blockScratch->debugData;
const ULONG count = ULONG(subDebugData.getCount());
debugData.add(UCHAR(count));
debugData.add(UCHAR(count >> 8));
debugData.add(UCHAR(count >> 16));
debugData.add(UCHAR(count >> 24));
putValue(count);
debugData.add(subDebugData.begin(), count);
}
void BlrDebugWriter::putDebugMarkers(ULONG marks)
{
debugData.add(fb_dbg_map_markers);
putValue(marks);
putBlrOffset();
}
void BlrDebugWriter::putValue(ULONG val)
{
debugData.add(val);
debugData.add(val >> 8);
debugData.add(val >> 16);
debugData.add(val >> 24);
}
void BlrDebugWriter::putBlrOffset()
{
const ULONG offset = (getBlrData().getCount() - getBaseOffset());
putValue(offset);
}
} // namespace Jrd

View File

@ -50,12 +50,16 @@ public:
void putDebugCursor(USHORT, const Firebird::MetaName&);
void putDebugSubFunction(DeclareSubFuncNode* subFuncNode);
void putDebugSubProcedure(DeclareSubProcNode* subProcNode);
void putDebugMarkers(ULONG marks);
DebugData& getDebugData() { return debugData; }
virtual void raiseError(const Firebird::Arg::StatusVector& vector);
private:
void putValue(ULONG val);
void putBlrOffset();
DebugData debugData;
};

View File

@ -1400,6 +1400,12 @@ public:
POST_TRIG = 2
};
// Marks used by EraseNode, ModifyNode and StoreNode
static const unsigned MARK_POSITIONED = 0x01; // Erase|Modify node is positioned at explicit cursor
static const unsigned MARK_MERGE = 0x02; // node is part of MERGE statement
// Marks used by ForNode
static const unsigned MARK_FOR_UPDATE = 0x04; // implicit cursor used in UPDATE\DELETE\MERGE statement
struct ExeState
{
ExeState(thread_db* tdbb, jrd_req* request, jrd_tra* transaction)

View File

@ -93,6 +93,7 @@ static RelationSourceNode* pass1Update(thread_db* tdbb, CompilerScratch* csb, jr
const TrigVector* trigger, StreamType stream, StreamType updateStream, SecurityClass::flags_t priv,
jrd_rel* view, StreamType viewStream, StreamType viewUpdateStream);
static void pass1Validations(thread_db* tdbb, CompilerScratch* csb, Array<ValidateInfo>& validations);
static ForNode* pass2FindForNode(StmtNode* node, StreamType stream);
static void postTriggerAccess(CompilerScratch* csb, jrd_rel* ownerRelation,
ExternalAccess::exa_act operation, jrd_rel* view);
static void preModifyEraseTriggers(thread_db* tdbb, TrigVector** trigs,
@ -658,9 +659,12 @@ const StmtNode* BlockNode::execute(thread_db* tdbb, jrd_req* request, ExeState*
return parentStmt;
}
// Skip PSQL exception handlers when request restart is in progress
const bool skipHandlers = (transaction->tra_flags & TRA_ex_restart);
const StmtNode* temp = parentStmt;
if (handlers && handlers->statements.hasData())
if (handlers && handlers->statements.hasData() && !skipHandlers)
{
// First of all rollback failed work
if (!(transaction->tra_flags & TRA_system))
@ -2267,6 +2271,7 @@ static RegisterNode<EraseNode> regEraseNode(blr_erase);
DmlNode* EraseNode::parse(thread_db* /*tdbb*/, MemoryPool& pool, CompilerScratch* csb, const UCHAR /*blrOp*/)
{
const ULONG blrOffset = csb->csb_blr_reader.getOffset();
const USHORT n = csb->csb_blr_reader.getByte();
if (n >= csb->csb_rpt.getCount() || !(csb->csb_rpt[n].csb_flags & csb_used))
@ -2275,6 +2280,10 @@ DmlNode* EraseNode::parse(thread_db* /*tdbb*/, MemoryPool& pool, CompilerScratch
EraseNode* node = FB_NEW_POOL(pool) EraseNode(pool);
node->stream = csb->csb_rpt[n].csb_stream;
ULONG marks;
if (csb->csb_dbg_info && csb->csb_dbg_info->blrToMarks.get(blrOffset, marks))
node->marks |= marks;
return node;
}
@ -2289,6 +2298,7 @@ StmtNode* EraseNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
if (dsqlCursorName.hasData() && dsqlScratch->isPsql())
{
node->dsqlContext = dsqlPassCursorContext(dsqlScratch, dsqlCursorName, relation);
node->marks |= StmtNode::MARK_POSITIONED;
// Process old context values.
dsqlScratch->context->push(node->dsqlContext);
@ -2312,7 +2322,10 @@ StmtNode* EraseNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
RseNode* rse;
if (dsqlCursorName.hasData())
{
rse = dsqlPassCursorReference(dsqlScratch, dsqlCursorName, relation);
node->marks |= StmtNode::MARK_POSITIONED;
}
else
{
rse = FB_NEW_POOL(dsqlScratch->getPool()) RseNode(dsqlScratch->getPool());
@ -2366,6 +2379,7 @@ string EraseNode::internalPrint(NodePrinter& printer) const
NODE_PRINT(printer, statement);
NODE_PRINT(printer, subStatement);
NODE_PRINT(printer, stream);
NODE_PRINT(printer, marks);
return "EraseNode";
}
@ -2376,42 +2390,24 @@ void EraseNode::genBlr(DsqlCompilerScratch* dsqlScratch)
const dsql_ctx* context;
if (dsqlContext)
{
context = dsqlContext;
if (statement)
{
dsqlScratch->appendUChar(blr_begin);
statement->genBlr(dsqlScratch);
dsqlScratch->appendUChar(blr_erase);
GEN_stuff_context(dsqlScratch, context);
dsqlScratch->appendUChar(blr_end);
}
else
{
dsqlScratch->appendUChar(blr_erase);
GEN_stuff_context(dsqlScratch, context);
}
}
else
{
context = dsqlRelation->dsqlContext;
if (statement)
{
dsqlScratch->appendUChar(blr_begin);
statement->genBlr(dsqlScratch);
dsqlScratch->appendUChar(blr_erase);
GEN_stuff_context(dsqlScratch, context);
dsqlScratch->appendUChar(blr_end);
}
else
{
dsqlScratch->appendUChar(blr_erase);
GEN_stuff_context(dsqlScratch, context);
}
if (statement)
{
dsqlScratch->appendUChar(blr_begin);
statement->genBlr(dsqlScratch);
}
dsqlScratch->appendUChar(blr_erase);
if (marks)
dsqlScratch->putDebugMarkers(marks);
GEN_stuff_context(dsqlScratch, context);
if (statement)
dsqlScratch->appendUChar(blr_end);
if (message)
dsqlScratch->appendUChar(blr_end);
}
@ -2531,6 +2527,9 @@ EraseNode* EraseNode::pass2(thread_db* tdbb, CompilerScratch* csb)
doPass2(tdbb, csb, statement.getAddress(), this);
doPass2(tdbb, csb, subStatement.getAddress(), this);
if (!(marks & StmtNode::MARK_POSITIONED))
forNode = pass2FindForNode(parentStmt, stream);
impureOffset = CMP_impure(csb, sizeof(SLONG));
csb->csb_rpt[stream].csb_flags |= csb_update;
@ -2543,6 +2542,7 @@ const StmtNode* EraseNode::execute(thread_db* tdbb, jrd_req* request, ExeState*
if (request->req_operation == jrd_req::req_unwind)
retNode = parentStmt;
else if (request->req_operation == jrd_req::req_return && subStatement)
{
if (!exeState->topNode)
@ -2620,6 +2620,12 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, jrd_req* request, WhichTrigger
request->req_operation = jrd_req::req_return;
RLCK_reserve_relation(tdbb, transaction, relation, true);
if (forNode && forNode->isWriteLockMode(request))
{
VIO_writelock(tdbb, rpb, transaction);
return parentStmt;
}
// If the stream was sorted, the various fields in the rpb are probably junk.
// Just to make sure that everything is cool, refetch and release the record.
@ -2649,14 +2655,23 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, jrd_req* request, WhichTrigger
// Deleting new version blindly is generally unsafe, but is ok in this situation
// because all changes made by this request will certainly be undone and request
// will be restarted.
while (!VIO_erase(tdbb, rpb, transaction))
{
// VIO_refetch_record returns false if record has been deleted by someone else.
// Gently ignore this situation and proceed further.
if (!VIO_refetch_record(tdbb, rpb, transaction, true, true))
return parentStmt;
rpb->rpb_runtime_flags &= ~RPB_refetch;
if (forNode)
rpb->rpb_runtime_flags |= RPB_restart_ready;
else
rpb->rpb_runtime_flags &= ~RPB_restart_ready;
if (!VIO_erase(tdbb, rpb, transaction))
{
fb_assert(forNode != nullptr);
forNode->setWriteLockMode(request);
// VIO_writelock returns false if record has been deleted by someone else.
// Gently ignore this situation and proceed further.
rpb->rpb_runtime_flags |= RPB_refetch;
VIO_writelock(tdbb, rpb, transaction);
return parentStmt;
}
REPL_erase(tdbb, rpb, transaction);
}
@ -3966,15 +3981,12 @@ const StmtNode* InAutonomousTransactionNode::execute(thread_db* tdbb, jrd_req* r
jrd_tra* const org_transaction = request->req_transaction;
fb_assert(tdbb->getTransaction() == org_transaction);
// Use SNAPSHOT isolation mode in autonomous transactions by default
ULONG transaction_flags = 0;
// Simulate legacy behavior if Read Consistency is not used
if (!dbb->dbb_config->getReadConsistency() &&
!(org_transaction->tra_flags & TRA_read_consistency))
{
transaction_flags = org_transaction->tra_flags;
}
ULONG transaction_flags = org_transaction->tra_flags;
// Replace Read Consistency by Concurrecy isolation mode
if (transaction_flags & TRA_read_consistency)
transaction_flags &= ~(TRA_read_committed | TRA_read_consistency);
jrd_tra* const transaction = TRA_start(tdbb, transaction_flags,
org_transaction->tra_lock_timeout,
@ -4315,8 +4327,6 @@ void ExecBlockNode::genBlr(DsqlCompilerScratch* dsqlScratch)
{
thread_db* tdbb = JRD_get_thread_data();
dsqlScratch->beginDebug();
// Sub routine needs a different approach from EXECUTE BLOCK.
// EXECUTE BLOCK needs "ports", which creates DSQL messages using the client charset.
// Sub routine doesn't need ports and should generate BLR as declared in its metadata.
@ -4451,8 +4461,6 @@ void ExecBlockNode::genBlr(DsqlCompilerScratch* dsqlScratch)
dsqlScratch->appendUChar(blr_end);
dsqlScratch->genReturn(true);
dsqlScratch->appendUChar(blr_end);
dsqlScratch->endDebug();
}
// Revert parameters order for EXECUTE BLOCK statement
@ -4804,6 +4812,11 @@ DmlNode* ForNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* csb,
{
ForNode* node = FB_NEW_POOL(pool) ForNode(pool);
const ULONG blrOffset = csb->csb_blr_reader.getOffset();
ULONG marks;
if (csb->csb_dbg_info && csb->csb_dbg_info->blrToMarks.get(blrOffset, marks))
node->forUpdate = (marks & StmtNode::MARK_FOR_UPDATE) != 0;
if (csb->csb_blr_reader.peekByte() == (UCHAR) blr_stall)
node->stall = PAR_parse_stmt(tdbb, csb);
@ -4898,6 +4911,8 @@ string ForNode::internalPrint(NodePrinter& printer) const
NODE_PRINT(printer, statement);
NODE_PRINT(printer, cursor);
NODE_PRINT(printer, parBlrBeginCnt);
NODE_PRINT(printer, forUpdate);
NODE_PRINT(printer, withLock);
return "ForNode";
}
@ -4916,6 +4931,9 @@ void ForNode::genBlr(DsqlCompilerScratch* dsqlScratch)
dsqlScratch->appendUChar(blr_for);
if (forUpdate)
dsqlScratch->putDebugMarkers(StmtNode::MARK_FOR_UPDATE);
if (!statement || dsqlForceSingular)
dsqlScratch->appendUChar(blr_singular);
@ -4981,7 +4999,10 @@ StmtNode* ForNode::pass2(thread_db* tdbb, CompilerScratch* csb)
// as implicit cursors are always positioned in a valid record, and the name is
// only used to raise isc_cursor_not_positioned.
impureOffset = CMP_impure(csb, sizeof(SavNumber));
if (rse->flags & RseNode::FLAG_WRITELOCK)
withLock = true;
impureOffset = CMP_impure(csb, sizeof(Impure));
return this;
}
@ -4989,17 +5010,21 @@ StmtNode* ForNode::pass2(thread_db* tdbb, CompilerScratch* csb)
const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /*exeState*/) const
{
jrd_tra* transaction = request->req_transaction;
Impure* impure = request->getImpure<Impure>(impureOffset);
switch (request->req_operation)
{
case jrd_req::req_evaluate:
*request->getImpure<SavNumber>(impureOffset) = 0;
// initialize impure values
impure->savepoint = 0;
impure->writeLockMode = false;
if (!(transaction->tra_flags & TRA_system) &&
transaction->tra_save_point &&
transaction->tra_save_point->hasChanges())
{
const Savepoint* const savepoint = transaction->startSavepoint();
*request->getImpure<SavNumber>(impureOffset) = savepoint->getNumber();
impure->savepoint = savepoint->getNumber();
}
cursor->open(tdbb);
request->req_records_affected.clear();
@ -5011,11 +5036,42 @@ const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /*
// fall into
case jrd_req::req_sync:
if (cursor->fetchNext(tdbb))
{
request->req_operation = jrd_req::req_evaluate;
return statement;
const bool fetched = cursor->fetchNext(tdbb);
if (withLock)
{
const jrd_req* top_request = request->req_snapshot.m_owner;
if ((top_request) && (top_request->req_flags & req_update_conflict))
impure->writeLockMode = true;
}
if (fetched)
{
if (impure->writeLockMode && withLock)
{
// Skip statement execution and fetch (and try to lock) next record.
request->req_operation = jrd_req::req_sync;
return this;
}
request->req_operation = jrd_req::req_evaluate;
return statement;
}
}
if (impure->writeLockMode)
{
const jrd_req* top_request = request->req_snapshot.m_owner;
fb_assert(top_request);
fb_assert(top_request->req_flags & req_update_conflict);
transaction->tra_flags |= TRA_ex_restart;
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(top_request->req_conflict_txn));
}
request->req_operation = jrd_req::req_return;
// fall into
@ -5036,7 +5092,7 @@ const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /*
default:
{
const SavNumber savNumber = *request->getImpure<SavNumber>(impureOffset);
const SavNumber savNumber = impure->savepoint;
if (savNumber)
{
@ -5060,6 +5116,21 @@ const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /*
}
bool ForNode::isWriteLockMode(jrd_req* request) const
{
const Impure* impure = request->getImpure<Impure>(impureOffset);
return impure->writeLockMode;
}
void ForNode::setWriteLockMode(jrd_req* request) const
{
Impure* impure = request->getImpure<Impure>(impureOffset);
fb_assert(!impure->writeLockMode);
impure->writeLockMode = true;
}
//--------------------
@ -5451,6 +5522,8 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
if (returning)
forNode->dsqlForceSingular = true;
forNode->forUpdate = true;
// Get the already processed relations.
RseNode* processedRse = nodeAs<RseNode>(forNode->rse->dsqlStreams->items[0]);
source = processedRse->dsqlStreams->items[0];
@ -5488,6 +5561,7 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
// Build the MODIFY node.
ModifyNode* modify = FB_NEW_POOL(pool) ModifyNode(pool);
modify->marks |= StmtNode::MARK_MERGE;
thisIf->trueAction = modify;
dsql_ctx* const oldContext = dsqlGetContext(target);
@ -5576,6 +5650,7 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
{
// Build the DELETE node.
EraseNode* erase = FB_NEW_POOL(pool) EraseNode(pool);
erase->marks |= StmtNode::MARK_MERGE;
thisIf->trueAction = erase;
dsql_ctx* context = dsqlGetContext(target);
@ -5640,6 +5715,7 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
// Build the INSERT node.
StoreNode* store = FB_NEW_POOL(pool) StoreNode(pool);
// TODO: store->marks |= StmtNode::MARK_MERGE;
store->dsqlRelation = relation;
store->dsqlFields = notMatched->fields;
store->dsqlValues = notMatched->values;
@ -5904,6 +5980,7 @@ static RegisterNode<ModifyNode> regModifyNode2(blr_modify2);
DmlNode* ModifyNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* csb, const UCHAR blrOp)
{
// Parse the original and new contexts.
const ULONG blrOffset = csb->csb_blr_reader.getOffset();
USHORT context = (unsigned int) csb->csb_blr_reader.getByte();
@ -5940,6 +6017,10 @@ DmlNode* ModifyNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* c
if (blrOp == blr_modify2)
node->statement2 = PAR_parse_stmt(tdbb, csb);
ULONG marks;
if (csb->csb_dbg_info && csb->csb_dbg_info->blrToMarks.get(blrOffset, marks))
node->marks |= marks;
return node;
}
@ -5974,6 +6055,7 @@ StmtNode* ModifyNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch, bool up
if (dsqlCursorName.hasData() && dsqlScratch->isPsql())
{
node->dsqlContext = dsqlPassCursorContext(dsqlScratch, dsqlCursorName, relation);
node->marks |= StmtNode::MARK_POSITIONED;
// Process old context values.
dsqlScratch->context->push(node->dsqlContext);
@ -6054,6 +6136,7 @@ StmtNode* ModifyNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch, bool up
{
rse = dsqlPassCursorReference(dsqlScratch, dsqlCursorName, relation);
old_context = rse->dsqlStreams->items[0]->dsqlContext;
node->marks |= StmtNode::MARK_POSITIONED;
}
else
{
@ -6154,6 +6237,7 @@ string ModifyNode::internalPrint(NodePrinter& printer) const
NODE_PRINT(printer, mapView);
NODE_PRINT(printer, orgStream);
NODE_PRINT(printer, newStream);
NODE_PRINT(printer, marks);
return "ModifyNode";
}
@ -6165,6 +6249,8 @@ void ModifyNode::genBlr(DsqlCompilerScratch* dsqlScratch)
const dsql_msg* message = dsqlGenDmlHeader(dsqlScratch, rse);
dsqlScratch->appendUChar(statement2 ? blr_modify2 : blr_modify);
if (marks)
dsqlScratch->putDebugMarkers(marks);
const dsql_ctx* context;
@ -6358,6 +6444,9 @@ ModifyNode* ModifyNode::pass2(thread_db* tdbb, CompilerScratch* csb)
SBM_SET(tdbb->getDefaultPool(), &csb->csb_rpt[orgStream].csb_fields, id);
}
if (!(marks & StmtNode::MARK_POSITIONED))
forNode = pass2FindForNode(parentStmt, orgStream);
impureOffset = CMP_impure(csb, sizeof(impure_state));
return this;
@ -6427,7 +6516,12 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, jrd_req* request, WhichTrigg
{
case jrd_req::req_evaluate:
request->req_records_affected.bumpModified(false);
break;
if (impure->sta_state == 0 && forNode && forNode->isWriteLockMode(request))
request->req_operation = jrd_req::req_return;
// fall thru
else
break;
case jrd_req::req_return:
if (impure->sta_state == 1)
@ -6442,6 +6536,12 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, jrd_req* request, WhichTrigg
if (impure->sta_state == 0)
{
if (forNode && forNode->isWriteLockMode(request))
{
VIO_writelock(tdbb, orgRpb, transaction);
return parentStmt;
}
// CVC: This call made here to clear the record in each NULL field and
// varchar field whose tail may contain garbage.
cleanupRpb(tdbb, newRpb);
@ -6461,19 +6561,26 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, jrd_req* request, WhichTrigg
else if (!relation->rel_view_rse)
{
// VIO_modify returns false if there is an update conflict in Read Consistency
// transaction. Before returning false it disables statement-level snapshot
// transaction and our code is ready to work in write lock mode (see flag set below).
// Before returning false it disables statement-level snapshot
// (via setting req_update_conflict flag) so re-fetch should see new data.
// Updating new version blindly is generally unsafe, but is ok in this situation
// because all changes made by this request will certainly be undone and request
// will be restarted.
while (!VIO_modify(tdbb, orgRpb, newRpb, transaction))
{
// VIO_refetch_record returns false if record has been deleted by someone else.
// Gently ignore this situation and proceed further.
if (!VIO_refetch_record(tdbb, orgRpb, transaction, true, true))
return parentStmt;
orgRpb->rpb_runtime_flags &= ~RPB_refetch;
if (forNode)
orgRpb->rpb_runtime_flags |= RPB_restart_ready;
else
orgRpb->rpb_runtime_flags &= ~RPB_restart_ready;
if (!VIO_modify(tdbb, orgRpb, newRpb, transaction))
{
fb_assert(forNode != nullptr);
forNode->setWriteLockMode(request);
// VIO_writelock returns false if record has been deleted by someone else.
// Gently ignore this situation and proceed further.
orgRpb->rpb_runtime_flags |= RPB_refetch;
VIO_writelock(tdbb, orgRpb, transaction);
return parentStmt;
}
IDX_modify(tdbb, orgRpb, newRpb, transaction);
REPL_modify(tdbb, orgRpb, newRpb, transaction);
@ -8812,6 +8919,7 @@ static const dsql_msg* dsqlGenDmlHeader(DsqlCompilerScratch* dsqlScratch, RseNod
if (dsqlRse)
{
dsqlScratch->appendUChar(blr_for);
dsqlScratch->putDebugMarkers(StmtNode::MARK_FOR_UPDATE);
GEN_expr(dsqlScratch, dsqlRse);
}
@ -9696,6 +9804,23 @@ static void pass1Validations(thread_db* tdbb, CompilerScratch* csb, Array<Valida
}
}
ForNode* pass2FindForNode(StmtNode* node, StreamType stream)
{
// lookup for parent ForNode
while (node && !nodeIs<ForNode>(node))
node = node->parentStmt;
ForNode* forNode = nodeAs<ForNode>(node);
if (forNode && forNode->rse->containsStream(stream))
{
//fb_assert(forNode->forUpdate == true);
if (forNode->forUpdate)
return forNode;
}
return nullptr;
};
// Inherit access to triggers to be fired.
//
// When we detect that a trigger could be fired by a request,

View File

@ -35,6 +35,7 @@ namespace Jrd {
class CompoundStmtNode;
class ExecBlockNode;
class ForNode;
class PlanNode;
class RelationSourceNode;
class SelectNode;
@ -564,7 +565,8 @@ public:
dsqlContext(NULL),
statement(NULL),
subStatement(NULL),
stream(0)
stream(0),
marks(0)
{
}
@ -595,6 +597,8 @@ public:
NestConst<StmtNode> statement;
NestConst<StmtNode> subStatement;
StreamType stream;
NestConst<ForNode> forNode; // parent implicit cursor, if present
unsigned marks; // see StmtNode::IUD_MARK_xxx
};
@ -913,7 +917,9 @@ public:
rse(NULL),
statement(NULL),
cursor(NULL),
parBlrBeginCnt(0)
parBlrBeginCnt(0),
forUpdate(false),
withLock(false)
{
}
@ -927,7 +933,16 @@ public:
virtual StmtNode* pass2(thread_db* tdbb, CompilerScratch* csb);
virtual const StmtNode* execute(thread_db* tdbb, jrd_req* request, ExeState* exeState) const;
bool isWriteLockMode(jrd_req* request) const;
void setWriteLockMode(jrd_req* request) const;
public:
struct Impure
{
SavNumber savepoint;
bool writeLockMode; // true - driven statement (UPDATE\DELETE\SELECT WITH LOCK) works in "write lock" mode, false - normal mode
};
NestConst<SelectNode> dsqlSelect;
NestConst<ValueListNode> dsqlInto;
DeclareCursorNode* dsqlCursor;
@ -939,6 +954,8 @@ public:
NestConst<StmtNode> statement;
NestConst<Cursor> cursor;
int parBlrBeginCnt;
bool forUpdate; // part of UPDATE\DELETE\MERGE statement
bool withLock; // part of SELECT ... WITH LOCK statement
};
@ -1153,7 +1170,8 @@ public:
validations(pool),
mapView(NULL),
orgStream(0),
newStream(0)
newStream(0),
marks(0)
{
}
@ -1190,6 +1208,8 @@ public:
NestConst<StmtNode> mapView;
StreamType orgStream;
StreamType newStream;
NestConst<ForNode> forNode; // parent implicit cursor, if present
unsigned marks; // see StmtNode::IUD_MARK_xxx
};

View File

@ -603,8 +603,12 @@ void DsqlDmlRequest::dsqlPass(thread_db* tdbb, DsqlCompilerScratch* scratch, boo
else
scratch->getStatement()->setBlrVersion(4);
scratch->beginDebug();
GEN_request(scratch, node);
scratch->endDebug();
// Create the messages buffers
for (FB_SIZE_T i = 0; i < scratch->ports.getCount(); ++i)
{
@ -856,13 +860,47 @@ void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle,
statement->getType() != DsqlCompiledStatement::TYPE_SAVEPOINT)
{
AutoSavePoint savePoint(tdbb, req_transaction);
req_request->req_flags &= ~req_update_conflict;
int numTries = 0;
while (true)
{
doExecute(tdbb, traHandle, inMetadata, inMsg, outMetadata, outMsg, singleton);
AutoSetRestoreFlag<ULONG> restartReady(&req_request->req_flags, req_restart_ready, true);
try
{
doExecute(tdbb, traHandle, inMetadata, inMsg, outMetadata, outMsg, singleton);
}
catch (const status_exception&)
{
if (!(req_transaction->tra_flags & TRA_ex_restart))
{
req_request->req_flags &= ~req_update_conflict;
throw;
}
}
if (!(req_request->req_flags & req_update_conflict))
{
fb_assert((req_transaction->tra_flags & TRA_ex_restart) == 0);
req_transaction->tra_flags &= ~TRA_ex_restart;
#ifdef DEV_BUILD
if (numTries > 0)
{
string s;
s.printf("restarts = %d", numTries);
ERRD_post_warning(Arg::Warning(isc_random) << Arg::Str(s));
}
#endif
break;
}
fb_assert((req_transaction->tra_flags & TRA_ex_restart) != 0);
req_request->req_flags &= ~req_update_conflict;
req_transaction->tra_flags &= ~TRA_ex_restart;
fb_utils::init_status(tdbb->tdbb_status_vector);
if (numTries >= 10)
{
gds__log("Update conflict: unable to get a stable set of rows in the source tables");
@ -2076,7 +2114,15 @@ static void sql_info(thread_db* tdbb,
{
const bool detailed = (item == isc_info_sql_explain_plan);
string plan = OPT_get_plan(tdbb, request->req_request, detailed);
#ifdef DEV_BUILD
if (!detailed)
{
NodePrinter printer;
request->req_request->getStatement()->topNode->print(printer);
plan += "\n--------\n";
plan += printer.getText();
}
#endif
if (plan.hasData())
{
// 1-byte item + 2-byte length + isc_info_end/isc_info_truncated == 4

View File

@ -732,6 +732,7 @@
#define fb_dbg_subproc 5
#define fb_dbg_subfunc 6
#define fb_dbg_map_curname 7
#define fb_dbg_map_markers 8
// sub code for fb_dbg_map_argument
#define fb_dbg_arg_input 0

View File

@ -10,10 +10,10 @@
* See the License for the specific language governing rights
* and limitations under the License.
*
* The Original Code was created by Vlad Horsun
* The Original Code was created by Vlad Khorsun
* for the Firebird Open Source RDBMS project.
*
* Copyright (c) 2006 Vlad Horsun <hvlad@users.sourceforge.net>
* Copyright (c) 2006 Vlad Khorsun <hvlad@users.sourceforge.net>
* and all contributors signed below.
*
* All Rights Reserved.
@ -225,6 +225,28 @@ void DBG_parse_debug_info(ULONG length, const UCHAR* data, DbgInfo& dbgInfo)
break;
}
case fb_dbg_map_markers:
{
if (data + 8 >= end)
{
bad_format = true;
break;
}
ULONG marks = *data++;
marks |= *data++ << 8;
marks |= *data++ << 16;
marks |= *data++ << 24;
ULONG offset = *data++;
offset |= *data++ << 8;
offset |= *data++ << 16;
offset |= *data++ << 24;
dbgInfo.blrToMarks.put(offset, marks);
}
break;
case fb_dbg_end:
if (data != end)
bad_format = true;

View File

@ -10,10 +10,10 @@
* See the License for the specific language governing rights
* and limitations under the License.
*
* The Original Code was created by Vlad Horsun
* The Original Code was created by Vlad Khorsun
* for the Firebird Open Source RDBMS project.
*
* Copyright (c) 2006 Vlad Horsun <hvlad@users.sourceforge.net>
* Copyright (c) 2006 Vlad Khorsun <hvlad@users.sourceforge.net>
* and all contributors signed below.
*
* All Rights Reserved.
@ -33,7 +33,8 @@
// Also, it introduces some new tags.
const UCHAR DBG_INFO_VERSION_1 = UCHAR(1);
const UCHAR DBG_INFO_VERSION_2 = UCHAR(2);
const UCHAR CURRENT_DBG_INFO_VERSION = DBG_INFO_VERSION_2;
const UCHAR DBG_INFO_VERSION_3 = UCHAR(3); // blr offsets of "update" cursors and driven sub-statements
const UCHAR CURRENT_DBG_INFO_VERSION = DBG_INFO_VERSION_3;
namespace Firebird {
@ -55,6 +56,7 @@ typedef Firebird::SortedArray<
MapBlrToSrcItem> MapBlrToSrc;
typedef GenericMap<Pair<Right<USHORT, MetaName> > > MapVarIndexToName;
typedef GenericMap<Pair<NonPooled<ULONG, ULONG> > > MapBlrToMarks;
struct ArgumentInfo
{
@ -93,7 +95,8 @@ struct DbgInfo : public PermanentStorage
argInfoToName(p),
curIndexToName(p),
subFuncs(p),
subProcs(p)
subProcs(p),
blrToMarks(p)
{
}
@ -126,6 +129,8 @@ struct DbgInfo : public PermanentStorage
subProcs.clear();
}
blrToMarks.clear();
}
MapBlrToSrc blrToSrc; // mapping between blr offsets and source text position
@ -134,6 +139,7 @@ struct DbgInfo : public PermanentStorage
MapVarIndexToName curIndexToName; // mapping between cursor index and name
GenericMap<Pair<Left<MetaName, DbgInfo*> > > subFuncs; // sub functions
GenericMap<Pair<Left<MetaName, DbgInfo*> > > subProcs; // sub procedures
MapBlrToMarks blrToMarks; // blr offsets of marked verbs
};
} // namespace Firebird

View File

@ -550,6 +550,8 @@ string RelationSourceNode::internalPrint(NodePrinter& printer) const
NODE_PRINT(printer, dsqlName);
NODE_PRINT(printer, alias);
NODE_PRINT(printer, context);
if (relation)
printer.print("rel_name", relation->rel_name);
return "RelationSourceNode";
}

View File

@ -234,7 +234,7 @@ void VerbAction::mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* next
release(transaction);
}
void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks)
void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction, VerbAction* preserveAction)
{
// Undo changes recorded for this verb action.
// After that, clear the verb action and prepare it for later reuse.
@ -258,7 +258,7 @@ void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks)
if (!DPM_get(tdbb, &rpb, LCK_read))
BUGCHECK(186); // msg 186 record disappeared
if ((have_undo || preserveLocks) && !(rpb.rpb_flags & rpb_deleted))
if ((have_undo || preserveAction) && !(rpb.rpb_flags & rpb_deleted))
VIO_data(tdbb, &rpb, transaction->tra_pool);
else
CCH_RELEASE(tdbb, &rpb.getWindow(tdbb));
@ -268,7 +268,7 @@ void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks)
if (!have_undo)
{
if (preserveLocks && rpb.rpb_b_page)
if (preserveAction && rpb.rpb_b_page)
{
// Fetch previous record version and update in place current version with it
record_param temp = rpb;
@ -304,7 +304,10 @@ void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks)
rpb.rpb_record = save_record;
delete temp.rpb_record;
} else
RBM_SET(transaction->tra_pool, &preserveAction->vct_records, rpb.rpb_number.getValue());
}
else
VIO_backout(tdbb, &rpb, transaction);
}
else
@ -437,8 +440,12 @@ Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior, bool preserveL
while (m_actions)
{
VerbAction* const action = m_actions;
VerbAction* preserveAction = nullptr;
action->undo(tdbb, m_transaction, preserveLocks);
if (preserveLocks && m_next)
preserveAction = m_next->getAction(action->vct_relation);
action->undo(tdbb, m_transaction, preserveAction);
m_actions = action->vct_next;
action->vct_next = m_freeActions;

View File

@ -94,7 +94,7 @@ namespace Jrd
UndoItemTree* vct_undo; // Data for undo records
void mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* nextAction);
void undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks);
void undo(thread_db* tdbb, jrd_tra* transaction, VerbAction* preserveAction);
void garbageCollectIdxLite(thread_db* tdbb, jrd_tra* transaction, SINT64 recordNumber,
VerbAction* nextAction, Record* goingRecord);

View File

@ -875,7 +875,7 @@ void EXE_start(thread_db* tdbb, jrd_req* request, jrd_tra* transaction)
TRA_post_resources(tdbb, transaction, statement->resources);
TRA_attach_request(transaction, request);
request->req_flags &= req_in_use;
request->req_flags &= req_in_use | req_restart_ready;
request->req_flags |= req_active;
request->req_flags &= ~req_reserved;

View File

@ -132,6 +132,7 @@ const USHORT RPB_refetch = 0x01; // re-fetch is required
const USHORT RPB_undo_data = 0x02; // data got from undo log
const USHORT RPB_undo_read = 0x04; // read was performed using the undo log
const USHORT RPB_undo_deleted = 0x08; // read was performed using the undo log, primary version is deleted
const USHORT RPB_restart_ready = 0x10; // update conflict could be handled by statement restart
const USHORT RPB_UNDO_FLAGS = (RPB_undo_data | RPB_undo_read | RPB_undo_deleted);
@ -399,6 +400,7 @@ const ULONG req_proc_fetch = 0x200L; // Fetch from procedure in progress
const ULONG req_same_tx_upd = 0x400L; // record was updated by same transaction
const ULONG req_reserved = 0x800L; // Request reserved for client
const ULONG req_update_conflict = 0x1000L; // We need to restart request due to update conflict
const ULONG req_restart_ready = 0x2000L; // Request is ready to restat in case of update conflict
// Index lock block

View File

@ -3866,6 +3866,9 @@ void jrd_tra::rollbackSavepoint(thread_db* tdbb, bool preserveLocks)
{
REPL_save_cleanup(tdbb, this, tra_save_point, true);
if (tra_flags & TRA_ex_restart)
preserveLocks = true;
Jrd::ContextPoolHolder context(tdbb, tra_pool);
tra_save_point = tra_save_point->rollback(tdbb, NULL, preserveLocks);
}

View File

@ -425,6 +425,7 @@ const ULONG TRA_no_auto_undo = 0x8000L; // don't start a savepoint in TRA_start
const ULONG TRA_precommitted = 0x10000L; // transaction committed at startup
const ULONG TRA_own_interface = 0x20000L; // tra_interface was created for internal needs
const ULONG TRA_read_consistency = 0x40000L; // ensure read consistency in this transaction
const ULONG TRA_ex_restart = 0x80000L; // Exception was raised to restart request
// flags derived from TPB, see also transaction_options() at tra.cpp
const ULONG TRA_OPTIONS_MASK = (TRA_degree3 | TRA_readonly | TRA_ignore_limbo | TRA_read_committed |

View File

@ -1441,6 +1441,57 @@ void VIO_data(thread_db* tdbb, record_param* rpb, MemoryPool* pool)
}
static bool check_prepare_result(int prepare_result, jrd_tra* transaction, jrd_req* request, record_param* rpb)
{
/**************************************
*
* c h e c k _ p r e p a r e _ r e s u l t
*
**************************************
*
* Functional description
* Called by VIO_modify and VIO_erase. Raise update conflict error if not in
* read consistency transaction or lock error happens or if request is already
* in update conflict mode. In latter case set TRA_ex_restart flag to correctly
* handle request restart.
*
**************************************/
if (prepare_result == PREPARE_OK)
return true;
jrd_req* top_request = request->req_snapshot.m_owner;
const bool restart_ready = top_request &&
(top_request->req_flags & req_restart_ready) &&
(rpb->rpb_runtime_flags & RPB_restart_ready);
// Second update conflict when request is already in update conflict mode
// means we have some (indirect) UPDATE\DELETE in WHERE clause of primary
// cursor. In this case all we can do is restart whole request immediately.
const bool secondary = top_request &&
(top_request->req_flags & req_update_conflict) &&
(prepare_result != PREPARE_LOCKERR);
if (!(transaction->tra_flags & TRA_read_consistency) || prepare_result == PREPARE_LOCKERR ||
secondary || !restart_ready)
{
if (secondary)
transaction->tra_flags |= TRA_ex_restart;
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(rpb->rpb_transaction_nr));
}
if (top_request)
{
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = rpb->rpb_transaction_nr;
}
return false;
}
bool VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
{
/**************************************
@ -1911,20 +1962,8 @@ bool VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
// Update stub didn't find one page -- do a long, hard update
PageStack stack;
int prepare_result = prepare_update(tdbb, transaction, tid_fetch, rpb, &temp, 0, stack, false);
if (prepare_result &&
(!(transaction->tra_flags & TRA_read_consistency) || prepare_result == PREPARE_LOCKERR))
{
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(rpb->rpb_transaction_nr));
}
if (prepare_result)
{
jrd_req* top_request = request->req_snapshot.m_owner;
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = rpb->rpb_transaction_nr;
if (!check_prepare_result(prepare_result, transaction, request, rpb))
return false;
}
// Old record was restored and re-fetched for write. Now replace it.
@ -3216,20 +3255,8 @@ bool VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j
PageStack stack;
int prepare_result = prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb,
&temp, new_rpb, stack, false);
if (prepare_result &&
(!(transaction->tra_flags & TRA_read_consistency) || prepare_result == PREPARE_LOCKERR))
{
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(org_rpb->rpb_transaction_nr));
}
if (prepare_result)
{
jrd_req* top_request = tdbb->getRequest()->req_snapshot.m_owner;
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = org_rpb->rpb_transaction_nr;
if (!check_prepare_result(prepare_result, transaction, tdbb->getRequest(), org_rpb))
return false;
}
IDX_modify_flag_uk_modified(tdbb, org_rpb, new_rpb, transaction);
@ -4085,8 +4112,18 @@ bool VIO_writelock(thread_db* tdbb, record_param* org_rpb, jrd_tra* transaction)
if ((transaction->tra_flags & TRA_read_consistency))
{
jrd_req* top_request = tdbb->getRequest()->req_snapshot.m_owner;
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = org_rpb->rpb_transaction_nr;
if (top_request && !(top_request->req_flags & req_update_conflict))
{
if (!(top_request->req_flags & req_restart_ready))
{
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(org_rpb->rpb_transaction_nr));
}
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = org_rpb->rpb_transaction_nr;
}
}
org_rpb->rpb_runtime_flags |= RPB_refetch;
return false;