8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 17:23:03 +01:00

Merge branch 'query_restarts_2'

This commit is contained in:
hvlad 2020-03-28 12:20:04 +02:00
commit 9ac2238c3d
28 changed files with 656 additions and 246 deletions

View File

@ -125,32 +125,58 @@ So, there are three kinds of read-committed transactions now:
### Update conflicts handling
When statement executed within read committed read consistency transaction its database view is
When statement executed within READ COMMITTED READ CONSISTENCY transaction its database view is
not changed (similar to snapshot transaction). Therefore it is useless to wait for commit of
concurrent transaction in the hope to re-read new committed record version. On read, behavior is
similar to read committed *record version* transaction - do not wait for active transaction and
similar to READ COMMITTED *RECORD VERSION* transaction - do not wait for active transaction and
walk backversions chain looking for record version visible to the current snapshot.
When update conflict happens engine behavior is changed. If concurrent transaction is active,
engine waits (according to transaction lock timeout) and if concurrent transaction still not
committed, update conflict error is returned. If concurrent transaction is committed, engine
creates new snapshot and restart top-level statement execution. In both cases all work performed
up to the top-level statement is undone.
For READ COMMITTED *READ CONSISTENCY* mode handling of update conflicts by the engine is changed
significantly.
When update conflict is detected the following is performed:
a) transaction isolation mode temporarily switched to the READ COMMITTED *NO RECORD VERSION MODE*
b) engine put write lock on conflicted record
c) engine continue to evaluate remaining records of update\delete cursor and put write locks
on it too
d) when there is no more records to fetch, engine start to undo all actions performed since
top-level statement execution starts and preserve already taken write locks for every
updated\deleted\locked record, all inserted records are removed
e) then engine restores transaction isolation mode as READ COMMITTED *READ CONSISTENCY*, creates
new statement-level snapshot and restart execution of top-level statement.
This is the same logic as user applications uses for update conflict handling usually, but it
is a bit more efficient as it excludes network roundtrips to\from client host. This restart
logic is not applied to the selectable stored procedures if update conflict happens after any
record returned to the client application. In this case *isc_update_conflict* error is returned.
Such algorithm allows to ensure that after restart already updated records remains locked,
will be visible to the new snapshot, and could be updated again with no further conflicts.
Also, because of read consistency mode, set of modified records remains consistent.
Note: by historical reasons isc_update_conflict reported as secondary error code with primary
error code isc_deadlock.
Notes:
- restart algorithm above is applied to the UPDATE, DELETE, SELECT WITH LOCK and MERGE statements,
with and without RETURNING clause, executing directly by user applicaiton or as a part of some
PSQL object (stored procedure\function, trigger, EXECUTE BLOCK, etc)
- if UPDATE\DELETE statement is positioned on some explicit cursor (WHERE CURRENT OF) then engine
skip step (c) above, i.e. not fetches and not put write locks on remaining records of cursor
- if top-level statement is SELECT'able and update conflict happens after one or more records was
returned to the application, then update conflict error is reported as usual and restart is not
initiated
- restart is not initiated for statements in autonomous blocks (IN AUTONOMOUS TRANSACTION DO ...)
- after 10 attempts engine aborts restart algorithm, releases all write locks, restores transaction
isolation mode as READ COMMITTED *READ CONSISTENCY* and report update conflict
- any not handled error at step (c) above stops restart algorithm and engine continue processing
in usual way, for example error could be catched and handled by PSQL WHEN block or reported to
the application if not handled
- UPDATE\DELETE triggers will fire multiply times for the same record if statement execution was
restarted and record is updated\deleted again
- by historical reasons isc_update_conflict reported as secondary error code with primary error
code isc_deadlock.
### No more precommitted transactions
*Read-committed read only* (RCRO) transactions currently marked as committed immediately when
transaction started. This is correct if read-committed transaction needs no database snapshot.
But it is not correct to mark RCRO transaction as committed as it can break statement-level
snapshot consistency.
### Precommitted transactions
READ COMMITTED READ ONLY transactions marked as committed immediately when transaction started.
This is correct if read-committed transaction needs no database snapshot. But it is not correct
to mark READ CONSISTENCY READ ONLY transaction as committed as it can break statement-level
snapshot consistency. Therefore READ CONSISTENCY READ ONLY transactions is not precommitted
on start. Other kinds of READ COMMITTED READ ONLY transactions ([NO] RECORD VERSION) works as
before and marked as committed when such transaction started.
### Support for new READ COMMITTED READ CONSISTENCY isolation level
#### SQL syntax
@ -202,15 +228,16 @@ value of *oldest active snapshot* which could see *given* record version. If few
in a chain got the same mark then all of them after the first one could be removed. This allows to
keep versions chains short.
To make it works, engine maintains list of all active database snapshots. This list is kept in shared
To make it work, engine maintains list of all active database snapshots. This list is kept in shared
memory. The initial size of shared memory block could be set in firebird.conf using new setting
**SnapshotsMemSize**. Default value is 64KB. It could grow automatically, when necessary.
When engine need to find "*oldest active snapshot* which could see *given* record version" it
just search for CN of transaction that created given record version at the sorted array of active
When engine needs to find "*oldest active snapshot* which could see *given* record version" it just
searches for CN of transaction that created given record version in the sorted array of active
snapshots.
Garbage collection of intermediate record versions run by:
- sweep
- background garbage collector in SuperServer
- every user attachment after update or delete record
- table scan at index creation

View File

@ -10,7 +10,9 @@ need to raise an exception but do not want the database changes to be rolled-bac
If exceptions are raised inside the body of an autonomous transaction block, the changes are
rolled-back. If the block runs till the end, the transaction is committed.
The new transaction is initiated with the same isolation level of the existing one.
The new transaction is initiated with the same isolation level and lock timeout of the existing one.
The only exception is that if existing transaction run in READ COMMITTED READ CONSISTENCY mode, then
new autonomous transaction will run in CONCURRENCY mode.
Should be used with caution to avoid deadlocks.
Author:

View File

@ -58,21 +58,9 @@ void BlrDebugWriter::putDebugSrcInfo(ULONG line, ULONG col)
{
debugData.add(fb_dbg_map_src2blr);
debugData.add(line);
debugData.add(line >> 8);
debugData.add(line >> 16);
debugData.add(line >> 24);
debugData.add(col);
debugData.add(col >> 8);
debugData.add(col >> 16);
debugData.add(col >> 24);
const ULONG offset = (getBlrData().getCount() - getBaseOffset());
debugData.add(offset);
debugData.add(offset >> 8);
debugData.add(offset >> 16);
debugData.add(offset >> 24);
putValue(line);
putValue(col);
putBlrOffset();
}
void BlrDebugWriter::putDebugVariable(USHORT number, const MetaName& name)
@ -132,10 +120,7 @@ void BlrDebugWriter::putDebugSubFunction(DeclareSubFuncNode* subFuncNode)
HalfStaticArray<UCHAR, 128>& subDebugData = subFuncNode->blockScratch->debugData;
const ULONG count = ULONG(subDebugData.getCount());
debugData.add(UCHAR(count));
debugData.add(UCHAR(count >> 8));
debugData.add(UCHAR(count >> 16));
debugData.add(UCHAR(count >> 24));
putValue(count);
debugData.add(subDebugData.begin(), count);
}
@ -152,11 +137,22 @@ void BlrDebugWriter::putDebugSubProcedure(DeclareSubProcNode* subProcNode)
HalfStaticArray<UCHAR, 128>& subDebugData = subProcNode->blockScratch->debugData;
const ULONG count = ULONG(subDebugData.getCount());
debugData.add(UCHAR(count));
debugData.add(UCHAR(count >> 8));
debugData.add(UCHAR(count >> 16));
debugData.add(UCHAR(count >> 24));
putValue(count);
debugData.add(subDebugData.begin(), count);
}
void BlrDebugWriter::putValue(ULONG val)
{
debugData.add(val);
debugData.add(val >> 8);
debugData.add(val >> 16);
debugData.add(val >> 24);
}
void BlrDebugWriter::putBlrOffset()
{
const ULONG offset = (getBlrData().getCount() - getBaseOffset());
putValue(offset);
}
} // namespace Jrd

View File

@ -56,6 +56,9 @@ public:
virtual void raiseError(const Firebird::Arg::StatusVector& vector);
private:
void putValue(ULONG val);
void putBlrOffset();
DebugData debugData;
};

View File

@ -59,6 +59,27 @@ void DsqlCompilerScratch::dumpContextStack(const DsqlContextStack* stack)
#endif
void DsqlCompilerScratch::putBlrMarkers(ULONG marks)
{
appendUChar(blr_marks);
if (marks <= MAX_UCHAR)
{
appendUChar(1);
appendUChar(marks);
}
else if (marks <= MAX_USHORT)
{
appendUChar(2);
appendUShort(marks);
}
else
{
appendUChar(4);
appendULong(marks);
}
}
// Write out field data type.
// Taking special care to declare international text.
void DsqlCompilerScratch::putDtype(const TypeClause* field, bool useSubType)

View File

@ -169,6 +169,7 @@ public:
return statement;
}
void putBlrMarkers(ULONG marks);
void putDtype(const TypeClause* field, bool useSubType);
void putType(const TypeClause* type, bool useSubType);
void putLocalVariables(CompoundStmtNode* parameters, USHORT locals);

View File

@ -1389,6 +1389,12 @@ public:
POST_TRIG = 2
};
// Marks used by EraseNode, ModifyNode and StoreNode
static const unsigned MARK_POSITIONED = 0x01; // Erase|Modify node is positioned at explicit cursor
static const unsigned MARK_MERGE = 0x02; // node is part of MERGE statement
// Marks used by ForNode
static const unsigned MARK_FOR_UPDATE = 0x04; // implicit cursor used in UPDATE\DELETE\MERGE statement
struct ExeState
{
ExeState(thread_db* tdbb, jrd_req* request, jrd_tra* transaction)

View File

@ -88,6 +88,7 @@ static void dsqlSetParameterName(DsqlCompilerScratch*, ExprNode*, const ValueExp
static void dsqlSetParametersName(DsqlCompilerScratch*, CompoundStmtNode*, const RecordSourceNode*);
static void cleanupRpb(thread_db* tdbb, record_param* rpb);
static void forceWriteLock(thread_db* tdbb, record_param* rpb, jrd_tra* transaction);
static void makeValidation(thread_db* tdbb, CompilerScratch* csb, StreamType stream,
Array<ValidateInfo>& validations);
static StmtNode* pass1ExpandView(thread_db* tdbb, CompilerScratch* csb, StreamType orgStream,
@ -96,12 +97,14 @@ static RelationSourceNode* pass1Update(thread_db* tdbb, CompilerScratch* csb, jr
const TrigVector* trigger, StreamType stream, StreamType updateStream, SecurityClass::flags_t priv,
jrd_rel* view, StreamType viewStream, StreamType viewUpdateStream);
static void pass1Validations(thread_db* tdbb, CompilerScratch* csb, Array<ValidateInfo>& validations);
static ForNode* pass2FindForNode(StmtNode* node, StreamType stream);
static void postTriggerAccess(CompilerScratch* csb, jrd_rel* ownerRelation,
ExternalAccess::exa_act operation, jrd_rel* view);
static void preModifyEraseTriggers(thread_db* tdbb, TrigVector** trigs,
StmtNode::WhichTrigger whichTrig, record_param* rpb, record_param* rec, TriggerAction op);
static void preprocessAssignments(thread_db* tdbb, CompilerScratch* csb,
StreamType stream, CompoundStmtNode* compoundNode, const Nullable<OverrideClause>* insertOverride);
static void restartRequest(const jrd_req* request, jrd_tra* transaction);
static void validateExpressions(thread_db* tdbb, const Array<ValidateInfo>& validations);
} // namespace Jrd
@ -661,9 +664,12 @@ const StmtNode* BlockNode::execute(thread_db* tdbb, jrd_req* request, ExeState*
return parentStmt;
}
// Skip PSQL exception handlers when request restart is in progress
const bool skipHandlers = (transaction->tra_flags & TRA_ex_restart);
const StmtNode* temp = parentStmt;
if (handlers && handlers->statements.hasData())
if (handlers && handlers->statements.hasData() && !skipHandlers)
{
// First of all rollback failed work
if (!(transaction->tra_flags & TRA_system))
@ -2278,6 +2284,9 @@ DmlNode* EraseNode::parse(thread_db* /*tdbb*/, MemoryPool& pool, CompilerScratch
EraseNode* node = FB_NEW_POOL(pool) EraseNode(pool);
node->stream = csb->csb_rpt[n].csb_stream;
if (csb->csb_blr_reader.peekByte() == blr_marks)
node->marks |= PAR_marks(csb);
return node;
}
@ -2292,6 +2301,7 @@ StmtNode* EraseNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
if (dsqlCursorName.hasData() && dsqlScratch->isPsql())
{
node->dsqlContext = dsqlPassCursorContext(dsqlScratch, dsqlCursorName, relation);
node->marks |= StmtNode::MARK_POSITIONED;
// Process old context values.
dsqlScratch->context->push(node->dsqlContext);
@ -2315,7 +2325,10 @@ StmtNode* EraseNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
RseNode* rse;
if (dsqlCursorName.hasData())
{
rse = dsqlPassCursorReference(dsqlScratch, dsqlCursorName, relation);
node->marks |= StmtNode::MARK_POSITIONED;
}
else
{
rse = FB_NEW_POOL(dsqlScratch->getPool()) RseNode(dsqlScratch->getPool());
@ -2369,6 +2382,7 @@ string EraseNode::internalPrint(NodePrinter& printer) const
NODE_PRINT(printer, statement);
NODE_PRINT(printer, subStatement);
NODE_PRINT(printer, stream);
NODE_PRINT(printer, marks);
return "EraseNode";
}
@ -2379,42 +2393,25 @@ void EraseNode::genBlr(DsqlCompilerScratch* dsqlScratch)
const dsql_ctx* context;
if (dsqlContext)
{
context = dsqlContext;
if (statement)
{
dsqlScratch->appendUChar(blr_begin);
statement->genBlr(dsqlScratch);
dsqlScratch->appendUChar(blr_erase);
GEN_stuff_context(dsqlScratch, context);
dsqlScratch->appendUChar(blr_end);
}
else
{
dsqlScratch->appendUChar(blr_erase);
GEN_stuff_context(dsqlScratch, context);
}
}
else
{
context = dsqlRelation->dsqlContext;
if (statement)
{
dsqlScratch->appendUChar(blr_begin);
statement->genBlr(dsqlScratch);
dsqlScratch->appendUChar(blr_erase);
GEN_stuff_context(dsqlScratch, context);
dsqlScratch->appendUChar(blr_end);
}
else
{
dsqlScratch->appendUChar(blr_erase);
GEN_stuff_context(dsqlScratch, context);
}
if (statement)
{
dsqlScratch->appendUChar(blr_begin);
statement->genBlr(dsqlScratch);
}
dsqlScratch->appendUChar(blr_erase);
GEN_stuff_context(dsqlScratch, context);
if (marks)
dsqlScratch->putBlrMarkers(marks);
if (statement)
dsqlScratch->appendUChar(blr_end);
if (message)
dsqlScratch->appendUChar(blr_end);
}
@ -2555,6 +2552,9 @@ EraseNode* EraseNode::pass2(thread_db* tdbb, CompilerScratch* csb)
}
}
if (!(marks & StmtNode::MARK_POSITIONED))
forNode = pass2FindForNode(parentStmt, stream);
impureOffset = CMP_impure(csb, sizeof(SLONG));
csb->csb_rpt[stream].csb_flags |= csb_update;
@ -2567,6 +2567,7 @@ const StmtNode* EraseNode::execute(thread_db* tdbb, jrd_req* request, ExeState*
if (request->req_operation == jrd_req::req_unwind)
retNode = parentStmt;
else if (request->req_operation == jrd_req::req_return && subStatement)
{
if (!exeState->topNode)
@ -2644,6 +2645,12 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, jrd_req* request, WhichTrigger
request->req_operation = jrd_req::req_return;
RLCK_reserve_relation(tdbb, transaction, relation, true);
if (forNode && forNode->isWriteLockMode(request))
{
forceWriteLock(tdbb, rpb, transaction);
return parentStmt;
}
// If the stream was sorted, the various fields in the rpb are probably junk.
// Just to make sure that everything is cool, refetch and release the record.
@ -2667,7 +2674,20 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, jrd_req* request, WhichTrigger
VirtualTable::erase(tdbb, rpb);
else if (!relation->rel_view_rse)
{
VIO_erase(tdbb, rpb, transaction);
// VIO_erase returns false if there is an update conflict in Read Consistency
// transaction. Before returning false it disables statement-level snapshot
// (via setting req_update_conflict flag) so re-fetch should see new data.
if (!VIO_erase(tdbb, rpb, transaction))
{
forceWriteLock(tdbb, rpb, transaction);
if (!forNode)
restartRequest(request, transaction);
forNode->setWriteLockMode(request);
return parentStmt;
}
REPL_erase(tdbb, rpb, transaction);
}
@ -3980,7 +4000,14 @@ const StmtNode* InAutonomousTransactionNode::execute(thread_db* tdbb, jrd_req* r
jrd_tra* const org_transaction = request->req_transaction;
fb_assert(tdbb->getTransaction() == org_transaction);
jrd_tra* const transaction = TRA_start(tdbb, org_transaction->tra_flags,
ULONG transaction_flags = org_transaction->tra_flags;
// Replace Read Consistency by Concurrecy isolation mode
if (transaction_flags & TRA_read_consistency)
transaction_flags &= ~(TRA_read_committed | TRA_read_consistency);
jrd_tra* const transaction = TRA_start(tdbb, transaction_flags,
org_transaction->tra_lock_timeout,
org_transaction);
@ -4005,12 +4032,6 @@ const StmtNode* InAutonomousTransactionNode::execute(thread_db* tdbb, jrd_req* r
const Savepoint* const savepoint = transaction->startSavepoint();
impure->savNumber = savepoint->getNumber();
if ((transaction->tra_flags & TRA_read_committed) &&
(transaction->tra_flags & TRA_read_consistency))
{
TRA_setup_request_snapshot(tdbb, request, true);
}
return action;
}
@ -4022,16 +4043,6 @@ const StmtNode* InAutonomousTransactionNode::execute(thread_db* tdbb, jrd_req* r
fb_assert(transaction->tra_number == impure->traNumber);
if (request->req_operation == jrd_req::req_return ||
request->req_operation == jrd_req::req_unwind)
{
if ((transaction->tra_flags & TRA_read_committed) &&
(transaction->tra_flags & TRA_read_consistency))
{
TRA_release_request_snapshot(tdbb, request);
}
}
switch (request->req_operation)
{
case jrd_req::req_return:
@ -4827,6 +4838,9 @@ DmlNode* ForNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* csb,
{
ForNode* node = FB_NEW_POOL(pool) ForNode(pool);
if (csb->csb_blr_reader.peekByte() == blr_marks)
node->forUpdate = (PAR_marks(csb) & StmtNode::MARK_FOR_UPDATE) != 0;
if (csb->csb_blr_reader.peekByte() == (UCHAR) blr_stall)
node->stall = PAR_parse_stmt(tdbb, csb);
@ -4921,6 +4935,8 @@ string ForNode::internalPrint(NodePrinter& printer) const
NODE_PRINT(printer, statement);
NODE_PRINT(printer, cursor);
NODE_PRINT(printer, parBlrBeginCnt);
NODE_PRINT(printer, forUpdate);
NODE_PRINT(printer, withLock);
return "ForNode";
}
@ -4939,6 +4955,9 @@ void ForNode::genBlr(DsqlCompilerScratch* dsqlScratch)
dsqlScratch->appendUChar(blr_for);
if (forUpdate)
dsqlScratch->putBlrMarkers(StmtNode::MARK_FOR_UPDATE);
if (!statement || dsqlForceSingular)
dsqlScratch->appendUChar(blr_singular);
@ -5004,7 +5023,10 @@ StmtNode* ForNode::pass2(thread_db* tdbb, CompilerScratch* csb)
// as implicit cursors are always positioned in a valid record, and the name is
// only used to raise isc_cursor_not_positioned.
impureOffset = CMP_impure(csb, sizeof(SavNumber));
if (rse->flags & RseNode::FLAG_WRITELOCK)
withLock = true;
impureOffset = CMP_impure(csb, sizeof(Impure));
return this;
}
@ -5012,17 +5034,21 @@ StmtNode* ForNode::pass2(thread_db* tdbb, CompilerScratch* csb)
const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /*exeState*/) const
{
jrd_tra* transaction = request->req_transaction;
Impure* impure = request->getImpure<Impure>(impureOffset);
switch (request->req_operation)
{
case jrd_req::req_evaluate:
*request->getImpure<SavNumber>(impureOffset) = 0;
// initialize impure values
impure->savepoint = 0;
impure->writeLockMode = false;
if (!(transaction->tra_flags & TRA_system) &&
transaction->tra_save_point &&
transaction->tra_save_point->hasChanges())
{
const Savepoint* const savepoint = transaction->startSavepoint();
*request->getImpure<SavNumber>(impureOffset) = savepoint->getNumber();
impure->savepoint = savepoint->getNumber();
}
cursor->open(tdbb);
request->req_records_affected.clear();
@ -5034,11 +5060,32 @@ const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /*
// fall into
case jrd_req::req_sync:
if (cursor->fetchNext(tdbb))
{
request->req_operation = jrd_req::req_evaluate;
return statement;
const bool fetched = cursor->fetchNext(tdbb);
if (withLock)
{
const jrd_req* top_request = request->req_snapshot.m_owner;
if ((top_request) && (top_request->req_flags & req_update_conflict))
impure->writeLockMode = true;
}
if (fetched)
{
if (impure->writeLockMode && withLock)
{
// Skip statement execution and fetch (and try to lock) next record.
request->req_operation = jrd_req::req_sync;
return this;
}
request->req_operation = jrd_req::req_evaluate;
return statement;
}
}
if (impure->writeLockMode)
restartRequest(request, transaction);
request->req_operation = jrd_req::req_return;
// fall into
@ -5059,7 +5106,7 @@ const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /*
default:
{
const SavNumber savNumber = *request->getImpure<SavNumber>(impureOffset);
const SavNumber savNumber = impure->savepoint;
if (savNumber)
{
@ -5083,6 +5130,21 @@ const StmtNode* ForNode::execute(thread_db* tdbb, jrd_req* request, ExeState* /*
}
bool ForNode::isWriteLockMode(jrd_req* request) const
{
const Impure* impure = request->getImpure<Impure>(impureOffset);
return impure->writeLockMode;
}
void ForNode::setWriteLockMode(jrd_req* request) const
{
Impure* impure = request->getImpure<Impure>(impureOffset);
fb_assert(!impure->writeLockMode);
impure->writeLockMode = true;
}
//--------------------
@ -5474,6 +5536,8 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
if (returning)
forNode->dsqlForceSingular = true;
forNode->forUpdate = true;
// Get the already processed relations.
RseNode* processedRse = nodeAs<RseNode>(forNode->rse->dsqlStreams->items[0]);
source = processedRse->dsqlStreams->items[0];
@ -5511,6 +5575,7 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
// Build the MODIFY node.
ModifyNode* modify = FB_NEW_POOL(pool) ModifyNode(pool);
modify->marks |= StmtNode::MARK_MERGE;
thisIf->trueAction = modify;
dsql_ctx* const oldContext = dsqlGetContext(target);
@ -5599,6 +5664,7 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
{
// Build the DELETE node.
EraseNode* erase = FB_NEW_POOL(pool) EraseNode(pool);
erase->marks |= StmtNode::MARK_MERGE;
thisIf->trueAction = erase;
dsql_ctx* context = dsqlGetContext(target);
@ -5663,6 +5729,7 @@ StmtNode* MergeNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
// Build the INSERT node.
StoreNode* store = FB_NEW_POOL(pool) StoreNode(pool);
// TODO: store->marks |= StmtNode::MARK_MERGE;
store->dsqlRelation = relation;
store->dsqlFields = notMatched->fields;
store->dsqlValues = notMatched->values;
@ -5956,6 +6023,9 @@ DmlNode* ModifyNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* c
node->orgStream = orgStream;
node->newStream = newStream;
if (csb->csb_blr_reader.peekByte() == blr_marks)
node->marks |= PAR_marks(csb);
AutoSetRestore<StmtNode*> autoCurrentDMLNode(&csb->csb_currentDMLNode, node);
node->statement = PAR_parse_stmt(tdbb, csb);
@ -5997,6 +6067,7 @@ StmtNode* ModifyNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch, bool up
if (dsqlCursorName.hasData() && dsqlScratch->isPsql())
{
node->dsqlContext = dsqlPassCursorContext(dsqlScratch, dsqlCursorName, relation);
node->marks |= StmtNode::MARK_POSITIONED;
// Process old context values.
dsqlScratch->context->push(node->dsqlContext);
@ -6077,6 +6148,7 @@ StmtNode* ModifyNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch, bool up
{
rse = dsqlPassCursorReference(dsqlScratch, dsqlCursorName, relation);
old_context = rse->dsqlStreams->items[0]->dsqlContext;
node->marks |= StmtNode::MARK_POSITIONED;
}
else
{
@ -6177,6 +6249,7 @@ string ModifyNode::internalPrint(NodePrinter& printer) const
NODE_PRINT(printer, mapView);
NODE_PRINT(printer, orgStream);
NODE_PRINT(printer, newStream);
NODE_PRINT(printer, marks);
return "ModifyNode";
}
@ -6202,6 +6275,10 @@ void ModifyNode::genBlr(DsqlCompilerScratch* dsqlScratch)
GEN_stuff_context(dsqlScratch, context);
context = dsqlRelation->dsqlContext;
GEN_stuff_context(dsqlScratch, context);
if (marks)
dsqlScratch->putBlrMarkers(marks);
statement->genBlr(dsqlScratch);
if (statement2)
@ -6381,6 +6458,9 @@ ModifyNode* ModifyNode::pass2(thread_db* tdbb, CompilerScratch* csb)
SBM_SET(tdbb->getDefaultPool(), &csb->csb_rpt[orgStream].csb_fields, id);
}
if (!(marks & StmtNode::MARK_POSITIONED))
forNode = pass2FindForNode(parentStmt, orgStream);
impureOffset = CMP_impure(csb, sizeof(impure_state));
return this;
@ -6450,7 +6530,12 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, jrd_req* request, WhichTrigg
{
case jrd_req::req_evaluate:
request->req_records_affected.bumpModified(false);
break;
if (impure->sta_state == 0 && forNode && forNode->isWriteLockMode(request))
request->req_operation = jrd_req::req_return;
// fall thru
else
break;
case jrd_req::req_return:
if (impure->sta_state == 1)
@ -6465,6 +6550,12 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, jrd_req* request, WhichTrigg
if (impure->sta_state == 0)
{
if (forNode && forNode->isWriteLockMode(request))
{
forceWriteLock(tdbb, orgRpb, transaction);
return parentStmt;
}
// CVC: This call made here to clear the record in each NULL field and
// varchar field whose tail may contain garbage.
cleanupRpb(tdbb, newRpb);
@ -6483,7 +6574,20 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, jrd_req* request, WhichTrigg
VirtualTable::modify(tdbb, orgRpb, newRpb);
else if (!relation->rel_view_rse)
{
VIO_modify(tdbb, orgRpb, newRpb, transaction);
// VIO_modify returns false if there is an update conflict in Read Consistency
// transaction. Before returning false it disables statement-level snapshot
// (via setting req_update_conflict flag) so re-fetch should see new data.
if (!VIO_modify(tdbb, orgRpb, newRpb, transaction))
{
forceWriteLock(tdbb, orgRpb, transaction);
if (!forNode)
restartRequest(request, transaction);
forNode->setWriteLockMode(request);
return parentStmt;
}
IDX_modify(tdbb, orgRpb, newRpb, transaction);
REPL_modify(tdbb, orgRpb, newRpb, transaction);
}
@ -8833,6 +8937,7 @@ static const dsql_msg* dsqlGenDmlHeader(DsqlCompilerScratch* dsqlScratch, RseNod
if (dsqlRse)
{
dsqlScratch->appendUChar(blr_for);
dsqlScratch->putBlrMarkers(StmtNode::MARK_FOR_UPDATE);
GEN_expr(dsqlScratch, dsqlRse);
}
@ -9514,6 +9619,20 @@ static void cleanupRpb(thread_db* tdbb, record_param* rpb)
}
}
// Try to set write lock on record until success or record exists
static void forceWriteLock(thread_db * tdbb, record_param * rpb, jrd_tra * transaction)
{
while (VIO_refetch_record(tdbb, rpb, transaction, true, true))
{
rpb->rpb_runtime_flags &= ~RPB_refetch;
// VIO_writelock returns false if record has been deleted or modified
// by someone else.
if (VIO_writelock(tdbb, rpb, transaction))
break;
}
}
// Build a validation list for a relation, if appropriate.
static void makeValidation(thread_db* tdbb, CompilerScratch* csb, StreamType stream,
Array<ValidateInfo>& validations)
@ -9717,6 +9836,23 @@ static void pass1Validations(thread_db* tdbb, CompilerScratch* csb, Array<Valida
}
}
ForNode* pass2FindForNode(StmtNode* node, StreamType stream)
{
// lookup for parent ForNode
while (node && !nodeIs<ForNode>(node))
node = node->parentStmt;
ForNode* forNode = nodeAs<ForNode>(node);
if (forNode && forNode->rse->containsStream(stream))
{
//fb_assert(forNode->forUpdate == true);
if (forNode->forUpdate)
return forNode;
}
return nullptr;
};
// Inherit access to triggers to be fired.
//
// When we detect that a trigger could be fired by a request,
@ -9875,6 +10011,19 @@ static void preprocessAssignments(thread_db* tdbb, CompilerScratch* csb,
}
}
static void restartRequest(const jrd_req* request, jrd_tra* transaction)
{
const jrd_req* top_request = request->req_snapshot.m_owner;
fb_assert(top_request);
fb_assert(top_request->req_flags & req_update_conflict);
transaction->tra_flags |= TRA_ex_restart;
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Int64(top_request->req_conflict_txn));
}
// Execute a list of validation expressions.
static void validateExpressions(thread_db* tdbb, const Array<ValidateInfo>& validations)
{

View File

@ -36,6 +36,7 @@ namespace Jrd {
class CompoundStmtNode;
class ExecBlockNode;
class ForNode;
class PlanNode;
class RelationSourceNode;
class SelectNode;
@ -565,7 +566,8 @@ public:
dsqlContext(NULL),
statement(NULL),
subStatement(NULL),
stream(0)
stream(0),
marks(0)
{
}
@ -596,6 +598,8 @@ public:
NestConst<StmtNode> statement;
NestConst<StmtNode> subStatement;
StreamType stream;
NestConst<ForNode> forNode; // parent implicit cursor, if present
unsigned marks; // see StmtNode::IUD_MARK_xxx
};
@ -914,7 +918,9 @@ public:
rse(NULL),
statement(NULL),
cursor(NULL),
parBlrBeginCnt(0)
parBlrBeginCnt(0),
forUpdate(false),
withLock(false)
{
}
@ -928,7 +934,16 @@ public:
virtual StmtNode* pass2(thread_db* tdbb, CompilerScratch* csb);
virtual const StmtNode* execute(thread_db* tdbb, jrd_req* request, ExeState* exeState) const;
bool isWriteLockMode(jrd_req* request) const;
void setWriteLockMode(jrd_req* request) const;
public:
struct Impure
{
SavNumber savepoint;
bool writeLockMode; // true - driven statement (UPDATE\DELETE\SELECT WITH LOCK) works in "write lock" mode, false - normal mode
};
NestConst<SelectNode> dsqlSelect;
NestConst<ValueListNode> dsqlInto;
DeclareCursorNode* dsqlCursor;
@ -940,6 +955,8 @@ public:
NestConst<StmtNode> statement;
NestConst<Cursor> cursor;
int parBlrBeginCnt;
bool forUpdate; // part of UPDATE\DELETE\MERGE statement
bool withLock; // part of SELECT ... WITH LOCK statement
};
@ -1154,7 +1171,8 @@ public:
validations(pool),
mapView(NULL),
orgStream(0),
newStream(0)
newStream(0),
marks(0)
{
}
@ -1191,6 +1209,8 @@ public:
NestConst<StmtNode> mapView;
StreamType orgStream;
StreamType newStream;
NestConst<ForNode> forNode; // parent implicit cursor, if present
unsigned marks; // see StmtNode::IUD_MARK_xxx
};

View File

@ -286,7 +286,10 @@ bool DsqlDmlRequest::fetch(thread_db* tdbb, UCHAR* msgBuffer)
tdbb->checkCancelState(true);
UCHAR* dsqlMsgBuffer = req_msg_buffers[message->msg_buffer_number];
JRD_receive(tdbb, req_request, message->msg_number, message->msg_length, dsqlMsgBuffer);
if (prefetchedFirstRow)
prefetchedFirstRow = false;
else
JRD_receive(tdbb, req_request, message->msg_number, message->msg_length, dsqlMsgBuffer);
const dsql_par* const eof = statement->getEof();
const USHORT* eofPtr = eof ? (USHORT*) (dsqlMsgBuffer + (IPTR) eof->par_desc.dsc_address) : NULL;
@ -677,33 +680,14 @@ void DsqlDmlRequest::dsqlPass(thread_db* tdbb, DsqlCompilerScratch* scratch, boo
*destroyScratchPool = true;
}
// Execute a dynamic SQL statement.
void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle,
// Execute a dynamic SQL statement
void DsqlDmlRequest::doExecute(thread_db* tdbb, jrd_tra** traHandle,
Firebird::IMessageMetadata* inMetadata, const UCHAR* inMsg,
Firebird::IMessageMetadata* outMetadata, UCHAR* outMsg,
bool singleton)
{
if (!req_request)
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-504) <<
Arg::Gds(isc_unprepared_stmt));
}
// If there is no data required, just start the request
prefetchedFirstRow = false;
const dsql_msg* message = statement->getSendMsg();
if (message)
mapInOut(tdbb, false, message, inMetadata, NULL, inMsg);
// we need to mapInOut() before tracing of execution start to let trace
// manager know statement parameters values
TraceDSQLExecute trace(req_dbb->dbb_attachment, this);
// Setup and start timeout timer
const bool have_cursor = reqTypeWithCursor(statement->getType()) && !singleton;
setupTimer(tdbb);
thread_db::TimerGuard timerGuard(tdbb, req_timer, !have_cursor);
if (!message)
JRD_start(tdbb, req_request, req_transaction);
@ -805,6 +789,18 @@ void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle,
status_exception::raise(&localStatus);
}
}
else
{
// Prefetch first row of a query
if (reqTypeWithCursor(statement->getType()))
{
dsql_msg* message = (dsql_msg*) statement->getReceiveMsg();
UCHAR* dsqlMsgBuffer = req_msg_buffers[message->msg_buffer_number];
JRD_receive(tdbb, req_request, message->msg_number, message->msg_length, dsqlMsgBuffer);
prefetchedFirstRow = true;
}
}
switch (statement->getType())
{
@ -826,6 +822,97 @@ void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle,
}
break;
}
}
// Execute a dynamic SQL statement with tracing, restart and timeout handler
void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle,
Firebird::IMessageMetadata* inMetadata, const UCHAR* inMsg,
Firebird::IMessageMetadata* outMetadata, UCHAR* outMsg,
bool singleton)
{
if (!req_request)
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-504) <<
Arg::Gds(isc_unprepared_stmt));
}
// If there is no data required, just start the request
const dsql_msg* message = statement->getSendMsg();
if (message)
mapInOut(tdbb, false, message, inMetadata, NULL, inMsg);
// we need to mapInOut() before tracing of execution start to let trace
// manager know statement parameters values
TraceDSQLExecute trace(req_dbb->dbb_attachment, this);
// Setup and start timeout timer
const bool have_cursor = reqTypeWithCursor(statement->getType()) && !singleton;
setupTimer(tdbb);
thread_db::TimerGuard timerGuard(tdbb, req_timer, !have_cursor);
if (req_transaction && (req_transaction->tra_flags & TRA_read_consistency) &&
statement->getType() != DsqlCompiledStatement::TYPE_SAVEPOINT)
{
AutoSavePoint savePoint(tdbb, req_transaction);
req_request->req_flags &= ~req_update_conflict;
int numTries = 0;
while (true)
{
AutoSetRestoreFlag<ULONG> restartReady(&req_request->req_flags, req_restart_ready, true);
try
{
doExecute(tdbb, traHandle, inMetadata, inMsg, outMetadata, outMsg, singleton);
}
catch (const status_exception&)
{
if (!(req_transaction->tra_flags & TRA_ex_restart))
{
req_request->req_flags &= ~req_update_conflict;
throw;
}
}
if (!(req_request->req_flags & req_update_conflict))
{
fb_assert((req_transaction->tra_flags & TRA_ex_restart) == 0);
req_transaction->tra_flags &= ~TRA_ex_restart;
#ifdef DEV_BUILD
if (numTries > 0)
{
string s;
s.printf("restarts = %d", numTries);
ERRD_post_warning(Arg::Warning(isc_random) << Arg::Str(s));
}
#endif
break;
}
fb_assert((req_transaction->tra_flags & TRA_ex_restart) != 0);
req_request->req_flags &= ~req_update_conflict;
req_transaction->tra_flags &= ~TRA_ex_restart;
fb_utils::init_status(tdbb->tdbb_status_vector);
if (numTries >= 10)
{
gds__log("Update conflict: unable to get a stable set of rows in the source tables");
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-913) <<
Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(req_request->req_conflict_txn));
}
req_transaction->rollbackSavepoint(tdbb, true);
req_transaction->startSavepoint(tdbb);
numTries++;
}
savePoint.release(); // everything is ok
} else {
doExecute(tdbb, traHandle, inMetadata, inMsg, outMetadata, outMsg, singleton);
}
trace.finish(have_cursor, ITracePlugin::RESULT_SUCCESS);
}
@ -2026,7 +2113,15 @@ static void sql_info(thread_db* tdbb,
{
const bool detailed = (item == isc_info_sql_explain_plan);
string plan = OPT_get_plan(tdbb, request->req_request, detailed);
#ifdef DEV_BUILD
if (!detailed)
{
NodePrinter printer;
request->req_request->getStatement()->topNode->print(printer);
plan += "\n--------\n";
plan += printer.getText();
}
#endif
if (plan.hasData())
{
// 1-byte item + 2-byte length + isc_info_end/isc_info_truncated == 4

View File

@ -652,7 +652,8 @@ public:
explicit DsqlDmlRequest(MemoryPool& pool, StmtNode* aNode)
: dsql_req(pool),
node(aNode),
needDelayedFormat(false)
needDelayedFormat(false),
prefetchedFirstRow(false)
{
}
@ -671,9 +672,14 @@ public:
virtual void setDelayedFormat(thread_db* tdbb, Firebird::IMessageMetadata* metadata);
private:
void doExecute(thread_db* tdbb, jrd_tra** traHandle,
Firebird::IMessageMetadata* inMetadata, const UCHAR* inMsg,
Firebird::IMessageMetadata* outMetadata, UCHAR* outMsg,
bool singleton);
NestConst<StmtNode> node;
Firebird::RefPtr<Firebird::IMessageMetadata> delayedFormat;
bool needDelayedFormat;
bool prefetchedFirstRow;
};
class DsqlDdlRequest : public dsql_req

View File

@ -443,4 +443,6 @@
#define blr_at_local (unsigned char) 0
#define blr_at_zone (unsigned char) 1
#define blr_marks (unsigned char) 217 // mark some blr code with specific flags
#endif // FIREBIRD_IMPL_BLR_H

View File

@ -10,10 +10,10 @@
* See the License for the specific language governing rights
* and limitations under the License.
*
* The Original Code was created by Vlad Horsun
* The Original Code was created by Vlad Khorsun
* for the Firebird Open Source RDBMS project.
*
* Copyright (c) 2006 Vlad Horsun <hvlad@users.sourceforge.net>
* Copyright (c) 2006 Vlad Khorsun <hvlad@users.sourceforge.net>
* and all contributors signed below.
*
* All Rights Reserved.

View File

@ -10,10 +10,10 @@
* See the License for the specific language governing rights
* and limitations under the License.
*
* The Original Code was created by Vlad Horsun
* The Original Code was created by Vlad Khorsun
* for the Firebird Open Source RDBMS project.
*
* Copyright (c) 2006 Vlad Horsun <hvlad@users.sourceforge.net>
* Copyright (c) 2006 Vlad Khorsun <hvlad@users.sourceforge.net>
* and all contributors signed below.
*
* All Rights Reserved.

View File

@ -550,6 +550,8 @@ string RelationSourceNode::internalPrint(NodePrinter& printer) const
NODE_PRINT(printer, dsqlName);
NODE_PRINT(printer, alias);
NODE_PRINT(printer, context);
if (relation)
printer.print("rel_name", relation->rel_name);
return "RelationSourceNode";
}

View File

@ -234,7 +234,7 @@ void VerbAction::mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* next
release(transaction);
}
void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction)
void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks, VerbAction* preserveAction)
{
// Undo changes recorded for this verb action.
// After that, clear the verb action and prepare it for later reuse.
@ -258,7 +258,7 @@ void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction)
if (!DPM_get(tdbb, &rpb, LCK_read))
BUGCHECK(186); // msg 186 record disappeared
if (have_undo && !(rpb.rpb_flags & rpb_deleted))
if ((have_undo || preserveLocks) && !(rpb.rpb_flags & rpb_deleted))
VIO_data(tdbb, &rpb, transaction->tra_pool);
else
CCH_RELEASE(tdbb, &rpb.getWindow(tdbb));
@ -267,7 +267,50 @@ void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction)
BUGCHECK(185); // msg 185 wrong record version
if (!have_undo)
VIO_backout(tdbb, &rpb, transaction);
{
if (preserveLocks && rpb.rpb_b_page)
{
// Fetch previous record version and update in place current version with it
record_param temp = rpb;
temp.rpb_page = rpb.rpb_b_page;
temp.rpb_line = rpb.rpb_b_line;
temp.rpb_record = NULL;
if (temp.rpb_flags & rpb_delta)
fb_assert(temp.rpb_prior != NULL);
else
fb_assert(temp.rpb_prior == NULL);
if (!DPM_fetch(tdbb, &temp, LCK_read))
BUGCHECK(291); // msg 291 cannot find record back version
if (!(temp.rpb_flags & rpb_chained) || (temp.rpb_flags & (rpb_blob | rpb_fragment)))
ERR_bugcheck_msg("invalid back version");
VIO_data(tdbb, &temp, tdbb->getDefaultPool());
Record* const save_record = rpb.rpb_record;
if (rpb.rpb_flags & rpb_deleted)
rpb.rpb_record = NULL;
Record* const dead_record = rpb.rpb_record;
VIO_update_in_place(tdbb, transaction, &rpb, &temp);
if (dead_record)
{
rpb.rpb_record = NULL; // VIO_garbage_collect_idx will play with this record dirty tricks
VIO_garbage_collect_idx(tdbb, transaction, &rpb, dead_record);
}
rpb.rpb_record = save_record;
delete temp.rpb_record;
if (preserveAction)
RBM_SET(transaction->tra_pool, &preserveAction->vct_records, rpb.rpb_number.getValue());
}
else
VIO_backout(tdbb, &rpb, transaction);
}
else
{
AutoUndoRecord record(vct_undo->current().setupRecord(transaction));
@ -378,7 +421,7 @@ void Savepoint::cleanupTempData()
}
}
Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior)
Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior, bool preserveLocks)
{
// Undo changes made in this savepoint.
// Perform index and BLOB cleanup if needed.
@ -398,8 +441,12 @@ Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior)
while (m_actions)
{
VerbAction* const action = m_actions;
VerbAction* preserveAction = nullptr;
action->undo(tdbb, m_transaction);
if (preserveLocks && m_next)
preserveAction = m_next->createAction(action->vct_relation);
action->undo(tdbb, m_transaction, preserveLocks, preserveAction);
m_actions = action->vct_next;
action->vct_next = m_freeActions;

View File

@ -94,7 +94,8 @@ namespace Jrd
UndoItemTree* vct_undo; // Data for undo records
void mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* nextAction);
void undo(thread_db* tdbb, jrd_tra* transaction);
void undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks,
VerbAction* preserveAction);
void garbageCollectIdxLite(thread_db* tdbb, jrd_tra* transaction, SINT64 recordNumber,
VerbAction* nextAction, Record* goingRecord);
@ -231,7 +232,7 @@ namespace Jrd
void cleanupTempData();
Savepoint* rollback(thread_db* tdbb, Savepoint* prior = NULL);
Savepoint* rollback(thread_db* tdbb, Savepoint* prior = NULL, bool preserveLocks = false);
Savepoint* rollforward(thread_db* tdbb, Savepoint* prior = NULL);
static void destroy(Savepoint*& savepoint)

View File

@ -247,5 +247,6 @@ static const struct
{"local_timestamp", byte_line},
{"local_time", byte_line},
{"at", verb_byte_verb},
{"marks", byte_literal},
{0, 0}
};

View File

@ -877,7 +877,7 @@ void EXE_start(thread_db* tdbb, jrd_req* request, jrd_tra* transaction)
TRA_post_resources(tdbb, transaction, statement->resources);
TRA_attach_request(transaction, request);
request->req_flags &= req_in_use;
request->req_flags &= req_in_use | req_restart_ready;
request->req_flags |= req_active;
request->req_flags &= ~req_reserved;

View File

@ -8970,39 +8970,8 @@ void JRD_start(Jrd::thread_db* tdbb, jrd_req* request, jrd_tra* transaction)
* Get a record from the host program.
*
**************************************/
// Repeat execution to handle update conflicts, if any
int numTries = 0;
while (true)
{
try
{
EXE_unwind(tdbb, request);
EXE_start(tdbb, request, transaction);
break;
}
catch (const status_exception &ex)
{
const ISC_STATUS* v = ex.value();
if (// Update conflict error
v[0] == isc_arg_gds &&
v[1] == isc_update_conflict &&
// Read committed transaction with snapshots
(transaction->tra_flags & TRA_read_committed) &&
(transaction->tra_flags & TRA_read_consistency) &&
// Snapshot has been assigned to the request -
// it was top-level request
!TRA_get_prior_request(tdbb))
{
if (++numTries < 10)
{
fb_utils::init_status(tdbb->tdbb_status_vector);
continue;
}
}
throw;
}
}
EXE_unwind(tdbb, request);
EXE_start(tdbb, request, transaction);
check_autocommit(tdbb, request);
@ -9091,40 +9060,9 @@ void JRD_start_and_send(thread_db* tdbb, jrd_req* request, jrd_tra* transaction,
* Get a record from the host program.
*
**************************************/
// Repeat execution to handle update conflicts, if any
int numTries = 0;
while (true)
{
try
{
EXE_unwind(tdbb, request);
EXE_start(tdbb, request, transaction);
EXE_send(tdbb, request, msg_type, msg_length, msg);
break;
}
catch (const status_exception &ex)
{
const ISC_STATUS* v = ex.value();
if (// Update conflict error
v[0] == isc_arg_gds &&
v[1] == isc_update_conflict &&
// Read committed transaction with snapshots
(transaction->tra_flags & TRA_read_committed) &&
(transaction->tra_flags & TRA_read_consistency) &&
// Snapshot has been assigned to the request -
// it was top-level request
!TRA_get_prior_request(tdbb))
{
if (++numTries < 10)
{
fb_utils::init_status(tdbb->tdbb_status_vector);
continue;
}
}
throw;
}
}
EXE_unwind(tdbb, request);
EXE_start(tdbb, request, transaction);
EXE_send(tdbb, request, msg_type, msg_length, msg);
check_autocommit(tdbb, request);

View File

@ -693,6 +693,26 @@ CompoundStmtNode* PAR_make_list(thread_db* tdbb, StmtNodeStack& stack)
}
ULONG PAR_marks(Jrd::CompilerScratch* csb)
{
if (csb->csb_blr_reader.getByte() != blr_marks)
PAR_syntax_error(csb, "blr_marks");
switch (csb->csb_blr_reader.getByte())
{
case 1:
return csb->csb_blr_reader.getByte();
case 2:
return csb->csb_blr_reader.getWord();
case 4:
return csb->csb_blr_reader.getLong();
}
PAR_syntax_error(csb, "valid length for blr_marks value (1, 2, or 4)");
return 0;
}
CompilerScratch* PAR_parse(thread_db* tdbb, const UCHAR* blr, ULONG blr_length,
bool internal_flag, ULONG dbginfo_length, const UCHAR* dbginfo)
{
@ -1637,6 +1657,7 @@ void PAR_syntax_error(CompilerScratch* csb, const TEXT* string)
csb->csb_blr_reader.seekBackward(1);
// BLR syntax error: expected @1 at offset @2, encountered @3
PAR_error(csb, Arg::Gds(isc_syntaxerr) << Arg::Str(string) <<
Arg::Num(csb->csb_blr_reader.getOffset()) <<
Arg::Num(csb->csb_blr_reader.peekByte()));

View File

@ -61,6 +61,7 @@ SSHORT PAR_find_proc_field(const Jrd::jrd_prc*, const Firebird::MetaName&);
Jrd::ValueExprNode* PAR_gen_field(Jrd::thread_db* tdbb, StreamType stream, USHORT id, bool byId = false);
Jrd::ValueExprNode* PAR_make_field(Jrd::thread_db*, Jrd::CompilerScratch*, USHORT, const Firebird::MetaName&);
Jrd::CompoundStmtNode* PAR_make_list(Jrd::thread_db*, Jrd::StmtNodeStack&);
ULONG PAR_marks(Jrd::CompilerScratch*);
Jrd::CompilerScratch* PAR_parse(Jrd::thread_db*, const UCHAR* blr, ULONG blr_length,
bool internal_flag, ULONG = 0, const UCHAR* = NULL);

View File

@ -275,6 +275,7 @@ public:
SINT64 req_fetch_rowcount; // Total number of rows returned by this request
jrd_req* req_proc_caller; // Procedure's caller request
const ValueListNode* req_proc_inputs; // and its node with input parameters
TraNumber req_conflict_txn; // Transaction number for update conflict in read consistency mode
ULONG req_src_line;
ULONG req_src_column;
@ -397,6 +398,8 @@ const ULONG req_continue_loop = 0x100L; // PSQL continue statement
const ULONG req_proc_fetch = 0x200L; // Fetch from procedure in progress
const ULONG req_same_tx_upd = 0x400L; // record was updated by same transaction
const ULONG req_reserved = 0x800L; // Request reserved for client
const ULONG req_update_conflict = 0x1000L; // We need to restart request due to update conflict
const ULONG req_restart_ready = 0x2000L; // Request is ready to restat in case of update conflict
// Index lock block

View File

@ -145,7 +145,7 @@ jrd_req* TRA_get_prior_request(thread_db* tdbb)
return org_request;
}
void TRA_setup_request_snapshot(Jrd::thread_db* tdbb, Jrd::jrd_req* request, bool no_prior)
void TRA_setup_request_snapshot(Jrd::thread_db* tdbb, Jrd::jrd_req* request)
{
// This function is called whenever request is started in a transaction.
// Setup context to preserve read consistency in READ COMMITTED transactions.
@ -160,7 +160,7 @@ void TRA_setup_request_snapshot(Jrd::thread_db* tdbb, Jrd::jrd_req* request, boo
return;
// See if there is any request right above us in the call stack
jrd_req* org_request = no_prior ? nullptr : TRA_get_prior_request(tdbb);
jrd_req* org_request = TRA_get_prior_request(tdbb);
if (org_request && org_request->req_transaction == transaction)
{
@ -1617,8 +1617,11 @@ int TRA_snapshot_state(thread_db* tdbb, const jrd_tra* trans, TraNumber number,
// GC thread accesses data directly without any request
if (jrd_req* current_request = tdbb->getRequest())
{
// There is no request snapshot when we build expression index
if (jrd_req* snapshot_request = current_request->req_snapshot.m_owner)
// Notes:
// 1) There is no request snapshot when we build expression index
// 2) Disable read committed snapshot after we encountered update conflict
jrd_req* snapshot_request = current_request->req_snapshot.m_owner;
if (snapshot_request && !(snapshot_request->req_flags & req_update_conflict))
{
if (stateCn > snapshot_request->req_snapshot.m_number)
return tra_active;
@ -3849,7 +3852,7 @@ Savepoint* jrd_tra::startSavepoint(bool root)
return savepoint;
}
void jrd_tra::rollbackSavepoint(thread_db* tdbb)
void jrd_tra::rollbackSavepoint(thread_db* tdbb, bool preserveLocks)
/**************************************
*
* r o l l b a c k S a v e p o i n t
@ -3865,8 +3868,11 @@ void jrd_tra::rollbackSavepoint(thread_db* tdbb)
{
REPL_save_cleanup(tdbb, this, tra_save_point, true);
if (tra_flags & TRA_ex_restart)
preserveLocks = true;
Jrd::ContextPoolHolder context(tdbb, tra_pool);
tra_save_point = tra_save_point->rollback(tdbb);
tra_save_point = tra_save_point->rollback(tdbb, NULL, preserveLocks);
}
}

View File

@ -387,7 +387,7 @@ public:
Record* findNextUndo(VerbAction* before_this, jrd_rel* relation, SINT64 number);
void listStayingUndo(jrd_rel* relation, SINT64 number, RecordStack &staying);
Savepoint* startSavepoint(bool root = false);
void rollbackSavepoint(thread_db* tdbb);
void rollbackSavepoint(thread_db* tdbb, bool preserveLocks = false);
void rollbackToSavepoint(thread_db* tdbb, SavNumber number);
void rollforwardSavepoint(thread_db* tdbb);
DbCreatorsList* getDbCreatorsList();
@ -426,6 +426,7 @@ const ULONG TRA_no_auto_undo = 0x8000L; // don't start a savepoint in TRA_start
const ULONG TRA_precommitted = 0x10000L; // transaction committed at startup
const ULONG TRA_own_interface = 0x20000L; // tra_interface was created for internal needs
const ULONG TRA_read_consistency = 0x40000L; // ensure read consistency in this transaction
const ULONG TRA_ex_restart = 0x80000L; // Exception was raised to restart request
// flags derived from TPB, see also transaction_options() at tra.cpp
const ULONG TRA_OPTIONS_MASK = (TRA_degree3 | TRA_readonly | TRA_ignore_limbo | TRA_read_committed |

View File

@ -63,7 +63,7 @@ void TRA_update_counters(Jrd::thread_db*, Jrd::Database*);
int TRA_wait(Jrd::thread_db* tdbb, Jrd::jrd_tra* trans, TraNumber number, Jrd::jrd_tra::wait_t wait);
void TRA_attach_request(Jrd::jrd_tra* transaction, Jrd::jrd_req* request);
void TRA_detach_request(Jrd::jrd_req* request);
void TRA_setup_request_snapshot(Jrd::thread_db*, Jrd::jrd_req* request, bool no_prior = false);
void TRA_setup_request_snapshot(Jrd::thread_db*, Jrd::jrd_req* request);
void TRA_release_request_snapshot(Jrd::thread_db*, Jrd::jrd_req* request);
Jrd::jrd_req* TRA_get_prior_request(Jrd::thread_db*);

View File

@ -1444,7 +1444,57 @@ void VIO_data(thread_db* tdbb, record_param* rpb, MemoryPool* pool)
}
void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
static bool check_prepare_result(int prepare_result, jrd_tra* transaction, jrd_req* request, record_param* rpb)
{
/**************************************
*
* c h e c k _ p r e p a r e _ r e s u l t
*
**************************************
*
* Functional description
* Called by VIO_modify and VIO_erase. Raise update conflict error if not in
* read consistency transaction or lock error happens or if request is already
* in update conflict mode. In latter case set TRA_ex_restart flag to correctly
* handle request restart.
*
**************************************/
if (prepare_result == PREPARE_OK)
return true;
jrd_req* top_request = request->req_snapshot.m_owner;
const bool restart_ready = top_request &&
(top_request->req_flags & req_restart_ready);
// Second update conflict when request is already in update conflict mode
// means we have some (indirect) UPDATE\DELETE in WHERE clause of primary
// cursor. In this case all we can do is restart whole request immediately.
const bool secondary = top_request &&
(top_request->req_flags & req_update_conflict) &&
(prepare_result != PREPARE_LOCKERR);
if (!(transaction->tra_flags & TRA_read_consistency) || prepare_result == PREPARE_LOCKERR ||
secondary || !restart_ready)
{
if (secondary)
transaction->tra_flags |= TRA_ex_restart;
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Int64(rpb->rpb_transaction_nr));
}
if (top_request)
{
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = rpb->rpb_transaction_nr;
}
return false;
}
bool VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
{
/**************************************
*
@ -1500,7 +1550,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
// hvlad: what if record was created\modified by user tx also,
// i.e. if there is backversion ???
VIO_backout(tdbb, rpb, transaction);
return;
return true;
}
transaction->tra_flags |= TRA_write;
@ -1835,7 +1885,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
if (transaction->tra_save_point && transaction->tra_save_point->isChanging())
verb_post(tdbb, transaction, rpb, rpb->rpb_undo);
return;
return true;
}
const bool backVersion = (rpb->rpb_b_page != 0);
@ -1854,13 +1904,9 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
{
// Update stub didn't find one page -- do a long, hard update
PageStack stack;
if (prepare_update(tdbb, transaction, tid_fetch, rpb, &temp, 0, stack, false))
{
// Cannot use Arg::Num here because transaction number is 64-bit unsigned integer
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Int64(rpb->rpb_transaction_nr));
}
int prepare_result = prepare_update(tdbb, transaction, tid_fetch, rpb, &temp, 0, stack, false);
if (!check_prepare_result(prepare_result, transaction, request, rpb))
return false;
// Old record was restored and re-fetched for write. Now replace it.
@ -1919,6 +1965,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
{
notify_garbage_collector(tdbb, rpb, transaction->tra_number);
}
return true;
}
@ -2723,7 +2770,7 @@ void VIO_init(thread_db* tdbb)
}
}
void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, jrd_tra* transaction)
bool VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, jrd_tra* transaction)
{
/**************************************
*
@ -2789,7 +2836,7 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j
{
VIO_update_in_place(tdbb, transaction, org_rpb, new_rpb);
tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id);
return;
return true;
}
check_gbak_cheating_insupd(tdbb, relation, "UPDATE");
@ -3142,20 +3189,16 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j
}
tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id);
return;
return true;
}
const bool backVersion = (org_rpb->rpb_b_page != 0);
record_param temp;
PageStack stack;
if (prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb, &temp, new_rpb,
stack, false))
{
// Cannot use Arg::Num here because transaction number is 64-bit unsigned integer
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Int64(org_rpb->rpb_transaction_nr));
}
int prepare_result = prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb,
&temp, new_rpb, stack, false);
if (!check_prepare_result(prepare_result, transaction, tdbb->getRequest(), org_rpb))
return false;
IDX_modify_flag_uk_modified(tdbb, org_rpb, new_rpb, transaction);
@ -3204,6 +3247,7 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j
{
notify_garbage_collector(tdbb, org_rpb, transaction->tra_number);
}
return true;
}
@ -4004,6 +4048,22 @@ bool VIO_writelock(thread_db* tdbb, record_param* org_rpb, jrd_tra* transaction)
{
case PREPARE_CONFLICT:
case PREPARE_DELETE:
if ((transaction->tra_flags & TRA_read_consistency))
{
jrd_req* top_request = tdbb->getRequest()->req_snapshot.m_owner;
if (top_request && !(top_request->req_flags & req_update_conflict))
{
if (!(top_request->req_flags & req_restart_ready))
{
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Int64(org_rpb->rpb_transaction_nr));
}
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = org_rpb->rpb_transaction_nr;
}
}
org_rpb->rpb_runtime_flags |= RPB_refetch;
return false;
case PREPARE_LOCKERR:
@ -5607,7 +5667,7 @@ static int prepare_update( thread_db* tdbb,
delete_record(tdbb, temp, 0, NULL);
if (writelock)
if (writelock || (transaction->tra_flags & TRA_read_consistency))
{
tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id);
return PREPARE_DELETE;
@ -5730,16 +5790,17 @@ static int prepare_update( thread_db* tdbb,
switch (state)
{
case tra_committed:
// We need to loop waiting in read committed with no read consistency transactions only
if (!(transaction->tra_flags & TRA_read_committed) ||
(transaction->tra_flags & TRA_read_consistency))
// For SNAPSHOT mode transactions raise error early
if (!(transaction->tra_flags & TRA_read_committed))
{
tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id);
// Cannot use Arg::Num here because transaction number is 64-bit unsigned integer
ERR_post(Arg::Gds(isc_update_conflict) <<
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Int64(update_conflict_trans));
}
return PREPARE_CONFLICT;
case tra_limbo:
if (!(transaction->tra_flags & TRA_ignore_limbo))

View File

@ -42,7 +42,7 @@ bool VIO_chase_record_version(Jrd::thread_db*, Jrd::record_param*,
Jrd::jrd_tra*, MemoryPool*, bool, bool);
void VIO_copy_record(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*);
void VIO_data(Jrd::thread_db*, Jrd::record_param*, MemoryPool*);
void VIO_erase(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*);
bool VIO_erase(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*);
void VIO_fini(Jrd::thread_db*);
bool VIO_garbage_collect(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*);
Jrd::Record* VIO_gc_record(Jrd::thread_db*, Jrd::jrd_rel*);
@ -51,7 +51,7 @@ bool VIO_get_current(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*,
MemoryPool*, bool, bool&);
void VIO_init(Jrd::thread_db*);
bool VIO_writelock(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*);
void VIO_modify(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*, Jrd::jrd_tra*);
bool VIO_modify(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*, Jrd::jrd_tra*);
bool VIO_next_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, MemoryPool*, bool);
Jrd::Record* VIO_record(Jrd::thread_db*, Jrd::record_param*, const Jrd::Format*, MemoryPool*);
bool VIO_refetch_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, bool, bool);