8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 17:23:03 +01:00

Merge pull request #7872 from FirebirdSQL/work/gh-7810-2

Additional fixes for #7810 (Improve SKIP LOCKED implementation)
This commit is contained in:
Vlad Khorsun 2023-11-24 19:51:19 +02:00 committed by GitHub
commit 5b240c4916
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 50 additions and 20 deletions

View File

@ -46,9 +46,9 @@ DELETE FROM <sometable>
## Notes ## Notes
As it happens with subclauses `FIRST`/`SKIP`/`ROWS`/`OFFSET`/`FETCH` record lock If statement have both `SKIP LOCKED` and `SKIP`/`ROWS` subclauses, some locked rows
(and "skip locked" check) is done in between of skip (`SKIP`/`ROWS`/`OFFSET`/`FETCH`) and could be skipped before `SKIP`/`ROWS` subclause would account it, thus skipping more
limit (`FIRST`/`ROWS`/`OFFSET`/`FETCH`) checks. rows than specified in `SKIP`/`ROWS`.
## Examples ## Examples

View File

@ -178,11 +178,11 @@ namespace
m_savepoint.rollback(); m_savepoint.rollback();
} }
private:
// Prohibit unwanted creation/copying // Prohibit unwanted creation/copying
CondSavepointAndMarker(const CondSavepointAndMarker&) = delete; CondSavepointAndMarker(const CondSavepointAndMarker&) = delete;
CondSavepointAndMarker& operator=(const CondSavepointAndMarker&) = delete; CondSavepointAndMarker& operator=(const CondSavepointAndMarker&) = delete;
private:
AutoSavePoint m_savepoint; AutoSavePoint m_savepoint;
Savepoint::ChangeMarker m_marker; Savepoint::ChangeMarker m_marker;
}; };
@ -2273,9 +2273,9 @@ const StmtNode* DeclareVariableNode::execute(thread_db* tdbb, Request* request,
//-------------------- //--------------------
static RegisterNode<EraseNode> regEraseNode({blr_erase}); static RegisterNode<EraseNode> regEraseNode({blr_erase, blr_erase2});
DmlNode* EraseNode::parse(thread_db* /*tdbb*/, MemoryPool& pool, CompilerScratch* csb, const UCHAR /*blrOp*/) DmlNode* EraseNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* csb, const UCHAR blrOp)
{ {
const USHORT n = csb->csb_blr_reader.getByte(); const USHORT n = csb->csb_blr_reader.getByte();
@ -2288,6 +2288,9 @@ DmlNode* EraseNode::parse(thread_db* /*tdbb*/, MemoryPool& pool, CompilerScratch
if (csb->csb_blr_reader.peekByte() == blr_marks) if (csb->csb_blr_reader.peekByte() == blr_marks)
node->marks |= PAR_marks(csb); node->marks |= PAR_marks(csb);
if (blrOp == blr_erase2)
node->returningStatement = PAR_parse_stmt(tdbb, csb);
return node; return node;
} }
@ -2385,6 +2388,7 @@ string EraseNode::internalPrint(NodePrinter& printer) const
NODE_PRINT(printer, dsqlSkipLocked); NODE_PRINT(printer, dsqlSkipLocked);
NODE_PRINT(printer, statement); NODE_PRINT(printer, statement);
NODE_PRINT(printer, subStatement); NODE_PRINT(printer, subStatement);
NODE_PRINT(printer, returningStatement);
NODE_PRINT(printer, stream); NODE_PRINT(printer, stream);
NODE_PRINT(printer, marks); NODE_PRINT(printer, marks);
@ -2397,11 +2401,14 @@ void EraseNode::genBlr(DsqlCompilerScratch* dsqlScratch)
{ {
std::optional<USHORT> tableNumber; std::optional<USHORT> tableNumber;
const bool skipLocked = dsqlRse && dsqlRse->hasSkipLocked();
if (dsqlReturning && !dsqlScratch->isPsql()) if (dsqlReturning && !dsqlScratch->isPsql())
{ {
if (dsqlCursorName.isEmpty()) if (dsqlCursorName.isEmpty())
{ {
dsqlScratch->appendUChar(blr_begin); if (!skipLocked)
dsqlScratch->appendUChar(blr_begin);
tableNumber = dsqlScratch->localTableNumber++; tableNumber = dsqlScratch->localTableNumber++;
dsqlGenReturningLocalTableDecl(dsqlScratch, tableNumber.value()); dsqlGenReturningLocalTableDecl(dsqlScratch, tableNumber.value());
@ -2422,12 +2429,13 @@ void EraseNode::genBlr(DsqlCompilerScratch* dsqlScratch)
const auto* context = dsqlContext ? dsqlContext : dsqlRelation->dsqlContext; const auto* context = dsqlContext ? dsqlContext : dsqlRelation->dsqlContext;
if (dsqlReturning) if (dsqlReturning && !skipLocked)
{ {
dsqlScratch->appendUChar(blr_begin); dsqlScratch->appendUChar(blr_begin);
dsqlGenReturning(dsqlScratch, dsqlReturning, tableNumber);
} }
dsqlScratch->appendUChar(blr_erase); dsqlScratch->appendUChar(dsqlReturning && skipLocked ? blr_erase2 : blr_erase);
GEN_stuff_context(dsqlScratch, context); GEN_stuff_context(dsqlScratch, context);
if (marks) if (marks)
@ -2435,15 +2443,17 @@ void EraseNode::genBlr(DsqlCompilerScratch* dsqlScratch)
if (dsqlReturning) if (dsqlReturning)
{ {
dsqlGenReturning(dsqlScratch, dsqlReturning, tableNumber); if (!skipLocked)
dsqlScratch->appendUChar(blr_end);
dsqlScratch->appendUChar(blr_end); else
dsqlGenReturning(dsqlScratch, dsqlReturning, tableNumber);
if (!dsqlScratch->isPsql() && dsqlCursorName.isEmpty()) if (!dsqlScratch->isPsql() && dsqlCursorName.isEmpty())
{ {
dsqlGenReturningLocalTableCursor(dsqlScratch, dsqlReturning, tableNumber.value()); dsqlGenReturningLocalTableCursor(dsqlScratch, dsqlReturning, tableNumber.value());
dsqlScratch->appendUChar(blr_end); if (!skipLocked)
dsqlScratch->appendUChar(blr_end);
} }
} }
} }
@ -2455,6 +2465,9 @@ EraseNode* EraseNode::pass1(thread_db* tdbb, CompilerScratch* csb)
doPass1(tdbb, csb, statement.getAddress()); doPass1(tdbb, csb, statement.getAddress());
doPass1(tdbb, csb, subStatement.getAddress()); doPass1(tdbb, csb, subStatement.getAddress());
AutoSetRestore<bool> autoReturningExpr(&csb->csb_returning_expr, true);
doPass1(tdbb, csb, returningStatement.getAddress());
return this; return this;
} }
@ -2571,6 +2584,7 @@ void EraseNode::pass1Erase(thread_db* tdbb, CompilerScratch* csb, EraseNode* nod
EraseNode* EraseNode::pass2(thread_db* tdbb, CompilerScratch* csb) EraseNode* EraseNode::pass2(thread_db* tdbb, CompilerScratch* csb)
{ {
doPass2(tdbb, csb, statement.getAddress(), this); doPass2(tdbb, csb, statement.getAddress(), this);
doPass2(tdbb, csb, returningStatement.getAddress(), this);
doPass2(tdbb, csb, subStatement.getAddress(), this); doPass2(tdbb, csb, subStatement.getAddress(), this);
const jrd_rel* const relation = csb->csb_rpt[stream].csb_relation; const jrd_rel* const relation = csb->csb_rpt[stream].csb_relation;
@ -2649,6 +2663,8 @@ const StmtNode* EraseNode::execute(thread_db* tdbb, Request* request, ExeState*
// Perform erase operation. // Perform erase operation.
const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger whichTrig) const const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger whichTrig) const
{ {
impure_state* impure = request->getImpure<impure_state>(impureOffset);
jrd_tra* transaction = request->req_transaction; jrd_tra* transaction = request->req_transaction;
record_param* rpb = &request->req_rpb[stream]; record_param* rpb = &request->req_rpb[stream];
jrd_rel* relation = rpb->rpb_relation; jrd_rel* relation = rpb->rpb_relation;
@ -2674,6 +2690,12 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger
} }
case Request::req_return: case Request::req_return:
if (impure->sta_state == 1)
{
impure->sta_state = 0;
rpb->rpb_number.setValid(false);
return parentStmt;
}
break; break;
default: default:
@ -2735,9 +2757,9 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger
if (!VIO_erase(tdbb, rpb, transaction)) if (!VIO_erase(tdbb, rpb, transaction))
{ {
// Record was not deleted, flow control should be passed to the // Record was not deleted, flow control should be passed to the parent
// parent ForNode. Note, If RETURNING clause was specified, then // ForNode. Note, If RETURNING clause was specified and SKIP LOCKED was
// parent node is CompoundStmtNode, not ForNode. If\when this // not, then parent node is CompoundStmtNode, not ForNode. If\when this
// will be changed, the code below should be changed accordingly. // will be changed, the code below should be changed accordingly.
if (skipLocked) if (skipLocked)
@ -2789,6 +2811,13 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger
} }
} }
if (returningStatement)
{
impure->sta_state = 1;
request->req_operation = Request::req_evaluate;
return returningStatement;
}
rpb->rpb_number.setValid(false); rpb->rpb_number.setValid(false);
return parentStmt; return parentStmt;
@ -7774,7 +7803,7 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, Request* request, WhichTrigg
SavepointChangeMarker scMarker(transaction); SavepointChangeMarker scMarker(transaction);
// Prepare to undo changed by PRE-triggers if record is locked by another // Prepare to undo changes by PRE-triggers if record is locked by another
// transaction and update should be skipped. // transaction and update should be skipped.
const bool skipLocked = orgRpb->rpb_stream_flags & RPB_s_skipLocked; const bool skipLocked = orgRpb->rpb_stream_flags & RPB_s_skipLocked;
CondSavepointAndMarker spPreTriggers(tdbb, transaction, CondSavepointAndMarker spPreTriggers(tdbb, transaction,

View File

@ -552,6 +552,7 @@ public:
dsql_ctx* dsqlContext = nullptr; dsql_ctx* dsqlContext = nullptr;
NestConst<StmtNode> statement; NestConst<StmtNode> statement;
NestConst<StmtNode> subStatement; NestConst<StmtNode> subStatement;
NestConst<StmtNode> returningStatement;
NestConst<ForNode> forNode; // parent implicit cursor, if present NestConst<ForNode> forNode; // parent implicit cursor, if present
StreamType stream = 0; StreamType stream = 0;
unsigned marks = 0; // see StmtNode::IUD_MARK_xxx unsigned marks = 0; // see StmtNode::IUD_MARK_xxx

View File

@ -309,8 +309,7 @@
#define blr_agg_list (unsigned char)170 #define blr_agg_list (unsigned char)170
#define blr_agg_list_distinct (unsigned char)171 #define blr_agg_list_distinct (unsigned char)171
#define blr_modify2 (unsigned char)172 #define blr_modify2 (unsigned char)172
#define blr_erase2 (unsigned char)173
// unused codes: 173
/* FB 1.0 specific BLR */ /* FB 1.0 specific BLR */

View File

@ -199,7 +199,7 @@ static const struct
{"agg_list", two}, // 170 {"agg_list", two}, // 170
{"agg_list_distinct", two}, {"agg_list_distinct", two},
{"modify2", modify2}, {"modify2", modify2},
{NULL, NULL}, {"erase2", erase2},
// New BLR in FB1 // New BLR in FB1
{"current_role", zero}, {"current_role", zero},
{"skip", one}, {"skip", one},

View File

@ -414,6 +414,7 @@ static const UCHAR
store3[] = { op_line, op_byte, op_line, op_verb, op_verb, op_verb, 0}, store3[] = { op_line, op_byte, op_line, op_verb, op_verb, op_verb, 0},
marks[] = { op_byte, op_literal, op_line, op_verb, 0}, marks[] = { op_byte, op_literal, op_line, op_verb, 0},
erase[] = { op_erase, 0}, erase[] = { op_erase, 0},
erase2[] = { op_erase, op_verb, 0},
local_table[] = { op_word, op_byte, op_literal, op_byte, op_line, 0}, local_table[] = { op_word, op_byte, op_literal, op_byte, op_line, 0},
outer_map[] = { op_outer_map, 0 }, outer_map[] = { op_outer_map, 0 },
in_list[] = { op_verb, op_line, op_word, op_line, op_args, 0}, in_list[] = { op_verb, op_line, op_word, op_line, op_args, 0},