8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 20:03:02 +01:00

SKIP LOCKED clause for SELECT WITH LOCK, UPDATE and DELETE (#7350)

* SKIP LOCKED clause for SELECT WITH LOCK, UPDATE and DELETE.

* Misc.

* Change as Dmitry suggested and fix Windows build.

* Change order of LOCK to between SKIP and FIRST.

* Avoid refetch with READ COMMITTED transaction as suggested by Dmitry.

* Revert changes to TRA_wait and use tra_probe in vio/prepare_update
when skipping locked.

* Add to CHANGELOG.
This commit is contained in:
Adriano dos Santos Fernandes 2022-10-28 07:09:01 -03:00 committed by GitHub
parent 9425992633
commit 5cc8a8f7fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 376 additions and 197 deletions

View File

@ -2,6 +2,10 @@
## New features
* [#7350](https://github.com/FirebirdSQL/firebird/pull/7350): SKIP LOCKED clause for SELECT WITH LOCK, UPDATE and DELETE
Reference(s): [/doc/sql.extensions/README.skip_locked.md](https://github.com/FirebirdSQL/firebird/raw/master/doc/sql.extensions/README.skip_locked.md)
Contributor(s): Adriano dos Santos Fernandes
* [#7216](https://github.com/FirebirdSQL/firebird/pull/7216): New built-in function `BLOB_APPEND`
Reference(s): [/doc/sql.extensions/README.blob_append.md](https://github.com/FirebirdSQL/firebird/raw/master/doc/sql.extensions/README.blob_append.md)
Contributor(s): Vlad Khorsun

View File

@ -0,0 +1,93 @@
# SKIP LOCKED clause (FB 5.0)
`SKIP LOCKED` clause can be used with `SELECT ... WITH LOCK`, `UPDATE` and `DELETE` statements.
It makes engine skip records locked by others transactions instead of wait on them or raise conflict errors.
This is very useful to implement work queues where one or more processes post work to a table and issue
an event while workers listen for events and read/delete items from the table. Using `SKIP LOCKED` multiple
workers can get exclusive work items from the table without conflicts.
## Syntax
```
SELECT
[FIRST ...]
[SKIP ...]
FROM <sometable>
[WHERE ...]
[PLAN ...]
[ORDER BY ...]
[{ ROWS ... } | {OFFSET ...} | {FETCH ...}]
[FOR UPDATE [OF ...]]
[WITH LOCK [SKIP LOCKED]]
```
```
UPDATE <sometable>
SET ...
[WHERE ...]
[PLAN ...]
[ORDER BY ...]
[ROWS ...]
[SKIP LOCKED]
[RETURNING ...]
```
```
DELETE FROM <sometable>
[WHERE ...]
[PLAN ...]
[ORDER BY ...]
[ROWS ...]
[SKIP LOCKED]
[RETURNING ...]
```
## Notes
As it happens with subclauses `FIRST`/`SKIP`/`ROWS`/`OFFSET`/`FETCH` record lock
(and "skip locked" check) is done in between of skip (`SKIP`/`ROWS`/`OFFSET`/`FETCH`) and
limit (`FIRST`/`ROWS`/`OFFSET`/`FETCH`) checks.
## Examples
### Prepare metadata
```
create table emails_queue (
subject varchar(60) not null,
text blob sub_type text not null
);
set term !;
create trigger emails_queue_ins after insert on emails_queue
as
begin
post_event('EMAILS_QUEUE');
end!
set term ;!
```
### Sender application or routine
```
insert into emails_queue (subject, text)
values ('E-mail subject', 'E-mail text...');
commit;
```
### Client application
Client application can listen to event `EMAILS_QUEUE` to actually send e-mails using this query:
```
delete from emails_queue
rows 10
skip locked
returning subject, text;
```
More than one instance of the application may be running, for example to load balance work.

View File

@ -290,6 +290,7 @@ static const TOK tokens[] =
{TOK_LOCALTIME, "LOCALTIME", false},
{TOK_LOCALTIMESTAMP, "LOCALTIMESTAMP", false},
{TOK_LOCK, "LOCK", true},
{TOK_LOCKED, "LOCKED", true},
{TOK_LOG, "LOG", true},
{TOK_LOG10, "LOG10", true},
{TOK_LONG, "LONG", false},

View File

@ -1117,7 +1117,7 @@ BoolExprNode* ComparativeBoolNode::createRseNode(DsqlCompilerScratch* dsqlScratc
const DsqlContextStack::iterator baseDT(dsqlScratch->derivedContext);
const DsqlContextStack::iterator baseUnion(dsqlScratch->unionContext);
RseNode* rse = PASS1_rse(dsqlScratch, select_expr, false);
RseNode* rse = PASS1_rse(dsqlScratch, select_expr, false, false);
rse->flags |= RseNode::FLAG_DSQL_COMPARATIVE;
// Create a conjunct to be injected.
@ -1480,7 +1480,7 @@ BoolExprNode* RseBoolNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
const DsqlContextStack::iterator base(*dsqlScratch->context);
RseBoolNode* node = FB_NEW_POOL(dsqlScratch->getPool()) RseBoolNode(dsqlScratch->getPool(), blrOp,
PASS1_rse(dsqlScratch, nodeAs<SelectExprNode>(dsqlRse), false));
PASS1_rse(dsqlScratch, nodeAs<SelectExprNode>(dsqlRse), false, false));
// Finish off by cleaning up contexts
dsqlScratch->context->clear(base);

View File

@ -8735,7 +8735,7 @@ void CreateAlterViewNode::execute(thread_db* tdbb, DsqlCompilerScratch* dsqlScra
dsqlScratch->resetContextStack();
++dsqlScratch->contextNumber;
RseNode* rse = PASS1_rse(dsqlScratch, selectExpr, false);
RseNode* rse = PASS1_rse(dsqlScratch, selectExpr, false, false);
dsqlScratch->getBlrData().clear();
dsqlScratch->appendUChar(dsqlScratch->isVersion4() ? blr_version4 : blr_version5);

View File

@ -11091,7 +11091,7 @@ ValueExprNode* SubQueryNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
const DsqlContextStack::iterator base(*dsqlScratch->context);
RseNode* rse = PASS1_rse(dsqlScratch, nodeAs<SelectExprNode>(dsqlRse), false);
RseNode* rse = PASS1_rse(dsqlScratch, nodeAs<SelectExprNode>(dsqlRse), false, false);
SubQueryNode* node = FB_NEW_POOL(dsqlScratch->getPool()) SubQueryNode(dsqlScratch->getPool(), blrOp, rse,
rse->dsqlSelectList->items[0], NullNode::instance());

View File

@ -1238,7 +1238,7 @@ DeclareCursorNode* DeclareCursorNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
dt->querySpec = dsqlSelect->dsqlExpr;
dt->alias = dsqlName.c_str();
rse = PASS1_derived_table(dsqlScratch, dt, NULL, dsqlSelect->dsqlWithLock);
rse = PASS1_derived_table(dsqlScratch, dt, NULL, dsqlSelect->dsqlWithLock, dsqlSelect->dsqlSkipLocked);
// Assign number and store in the dsqlScratch stack.
cursorNumber = dsqlScratch->cursorNumber++;
@ -2254,6 +2254,7 @@ StmtNode* EraseNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
const auto node = FB_NEW_POOL(dsqlScratch->getPool()) EraseNode(dsqlScratch->getPool());
node->dsqlCursorName = dsqlCursorName;
node->dsqlSkipLocked = dsqlSkipLocked;
if (dsqlCursorName.hasData() && dsqlScratch->isPsql())
{
@ -2302,6 +2303,9 @@ StmtNode* EraseNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
if (dsqlRows)
PASS1_limit(dsqlScratch, dsqlRows->length, dsqlRows->skip, rse);
if (dsqlSkipLocked)
rse->flags |= RseNode::FLAG_WRITELOCK | RseNode::FLAG_SKIP_LOCKED;
}
if (dsqlReturning && dsqlScratch->isPsql())
@ -4917,7 +4921,7 @@ ForNode* ForNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
dt->querySpec = dsqlSelect->dsqlExpr;
dt->alias = dsqlCursor->dsqlName.c_str();
node->rse = PASS1_derived_table(dsqlScratch, dt, NULL, dsqlSelect->dsqlWithLock);
node->rse = PASS1_derived_table(dsqlScratch, dt, NULL, dsqlSelect->dsqlWithLock, dsqlSelect->dsqlSkipLocked);
dsqlCursor->rse = node->rse;
dsqlCursor->cursorNumber = dsqlScratch->cursorNumber++;
@ -6583,6 +6587,9 @@ StmtNode* ModifyNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch, bool up
if (dsqlRows)
PASS1_limit(dsqlScratch, dsqlRows->length, dsqlRows->skip, rse);
if (dsqlSkipLocked)
rse->flags |= RseNode::FLAG_WRITELOCK | RseNode::FLAG_SKIP_LOCKED;
}
node->dsqlReturning = dsqlProcessReturning(dsqlScratch,
@ -7508,7 +7515,7 @@ StmtNode* StoreNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch,
if (dsqlRse && dsqlScratch->isPsql() && dsqlReturning)
selExpr->dsqlFlags |= RecordSourceNode::DFLAG_SINGLETON;
RseNode* rse = PASS1_rse(dsqlScratch, selExpr, false);
RseNode* rse = PASS1_rse(dsqlScratch, selExpr, false, false);
node->dsqlRse = rse;
values = rse->dsqlSelectList;
needSavePoint = false;
@ -8188,7 +8195,7 @@ SelectNode* SelectNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
node->dsqlForUpdate = dsqlForUpdate;
const DsqlContextStack::iterator base(*dsqlScratch->context);
node->dsqlRse = PASS1_rse(dsqlScratch, dsqlExpr, dsqlWithLock);
node->dsqlRse = PASS1_rse(dsqlScratch, dsqlExpr, dsqlWithLock, dsqlSkipLocked);
dsqlScratch->context->clear(base);
if (dsqlForUpdate)
@ -10532,7 +10539,7 @@ static void forceWriteLock(thread_db * tdbb, record_param * rpb, jrd_tra * trans
// VIO_writelock returns false if record has been deleted or modified
// by someone else.
if (VIO_writelock(tdbb, rpb, transaction))
if (VIO_writelock(tdbb, rpb, transaction, false) == WriteLockResult::LOCKED)
break;
}
}

View File

@ -537,19 +537,7 @@ class EraseNode final : public TypedNode<StmtNode, StmtNode::TYPE_ERASE>
public:
explicit EraseNode(MemoryPool& pool)
: TypedNode<StmtNode, StmtNode::TYPE_ERASE>(pool),
dsqlRelation(NULL),
dsqlBoolean(NULL),
dsqlPlan(NULL),
dsqlOrder(NULL),
dsqlRows(NULL),
dsqlCursorName(pool),
dsqlReturning(NULL),
dsqlRse(NULL),
dsqlContext(NULL),
statement(NULL),
subStatement(NULL),
stream(0),
marks(0)
dsqlCursorName(pool)
{
}
@ -576,12 +564,13 @@ public:
MetaName dsqlCursorName;
NestConst<ReturningClause> dsqlReturning;
NestConst<RseNode> dsqlRse;
dsql_ctx* dsqlContext;
bool dsqlSkipLocked = false;
dsql_ctx* dsqlContext = nullptr;
NestConst<StmtNode> statement;
NestConst<StmtNode> subStatement;
NestConst<ForNode> forNode; // parent implicit cursor, if present
StreamType stream;
unsigned marks; // see StmtNode::IUD_MARK_xxx
StreamType stream = 0;
unsigned marks = 0; // see StmtNode::IUD_MARK_xxx
};
@ -1186,6 +1175,7 @@ public:
NestConst<ReturningClause> dsqlReturning;
NestConst<RecordSourceNode> dsqlRse;
dsql_ctx* dsqlContext = nullptr;
bool dsqlSkipLocked = false;
NestConst<StmtNode> statement;
NestConst<StmtNode> statement2;
NestConst<StmtNode> subMod;
@ -1333,11 +1323,7 @@ class SelectNode final : public TypedNode<StmtNode, StmtNode::TYPE_SELECT>
public:
explicit SelectNode(MemoryPool& pool)
: TypedNode<StmtNode, StmtNode::TYPE_SELECT>(pool),
dsqlExpr(NULL),
dsqlRse(NULL),
statements(pool),
dsqlForUpdate(false),
dsqlWithLock(false)
statements(pool)
{
}
@ -1355,8 +1341,9 @@ public:
NestConst<SelectExprNode> dsqlExpr;
NestConst<RseNode> dsqlRse;
Firebird::Array<NestConst<StmtNode>> statements;
bool dsqlForUpdate;
bool dsqlWithLock;
bool dsqlForUpdate = false;
bool dsqlWithLock = false;
bool dsqlSkipLocked = false;
};

View File

@ -550,6 +550,9 @@ void GEN_rse(DsqlCompilerScratch* dsqlScratch, RseNode* rse)
if (rse->flags & RseNode::FLAG_WRITELOCK)
dsqlScratch->appendUChar(blr_writelock);
if (rse->flags & RseNode::FLAG_SKIP_LOCKED)
dsqlScratch->appendUChar(blr_skip_locked);
if (rse->dsqlFirst)
{
dsqlScratch->appendUChar(blr_first);

View File

@ -1 +1 @@
68 shift/reduce conflicts, 18 reduce/reduce conflicts.
68 shift/reduce conflicts, 19 reduce/reduce conflicts.

View File

@ -688,6 +688,7 @@ using namespace Firebird;
// tokens added for Firebird 5.0
%token <metaNamePtr> LOCKED
%token <metaNamePtr> TARGET
%token <metaNamePtr> TIMEZONE_NAME
%token <metaNamePtr> UNICODE_CHAR
@ -726,6 +727,7 @@ using namespace Firebird;
BaseNullable<bool> nullableBoolVal;
BaseNullable<Jrd::TriggerDefinition::SqlSecurity> nullableSqlSecurityVal;
BaseNullable<Jrd::OverrideClause> nullableOverrideClause;
struct { bool first; bool second; } boolPair;
bool boolVal;
int intVal;
unsigned uintVal;
@ -5766,7 +5768,8 @@ select
SelectNode* node = newNode<SelectNode>();
node->dsqlExpr = $1;
node->dsqlForUpdate = $2;
node->dsqlWithLock = $3;
node->dsqlWithLock = $3.first;
node->dsqlSkipLocked = $3.second;
$$ = node;
}
;
@ -5783,10 +5786,16 @@ for_update_list
| OF column_list { $$ = $2; }
;
%type <boolVal> lock_clause
%type <boolPair> lock_clause
lock_clause
: /* nothing */ { $$ = {false, false}; }
| WITH LOCK skip_locked_clause_opt { $$ = {true, $3}; }
;
%type <boolVal> skip_locked_clause_opt
skip_locked_clause_opt
: /* nothing */ { $$ = false; }
| WITH LOCK { $$ = true; }
| SKIP LOCKED { $$ = true; }
;
@ -6656,15 +6665,22 @@ delete
%type <stmtNode> delete_searched
delete_searched
: DELETE FROM table_name where_clause plan_clause order_clause_opt rows_clause_optional returning_clause
: DELETE FROM table_name
where_clause
plan_clause
order_clause_opt
rows_clause_optional
skip_locked_clause_opt
returning_clause
{
EraseNode* node = newNode<EraseNode>();
const auto node = newNode<EraseNode>();
node->dsqlRelation = $3;
node->dsqlBoolean = $4;
node->dsqlPlan = $5;
node->dsqlOrder = $6;
node->dsqlRows = $7;
node->dsqlReturning = $8;
node->dsqlSkipLocked = $8;
node->dsqlReturning = $9;
$$ = node;
}
;
@ -6692,8 +6708,14 @@ update
%type <stmtNode> update_searched
update_searched
: UPDATE table_name SET update_assignments(NOTRIAL(&$2->dsqlName)) where_clause plan_clause
order_clause_opt rows_clause_optional returning_clause
: UPDATE table_name
SET update_assignments(NOTRIAL(&$2->dsqlName))
where_clause
plan_clause
order_clause_opt
rows_clause_optional
skip_locked_clause_opt
returning_clause
{
ModifyNode* node = newNode<ModifyNode>();
node->dsqlRelation = $2;
@ -6702,7 +6724,8 @@ update_searched
node->dsqlPlan = $6;
node->dsqlOrder = $7;
node->dsqlRows = $8;
node->dsqlReturning = $9;
node->dsqlSkipLocked = $9;
node->dsqlReturning = $10;
$$ = node;
}
;
@ -9085,7 +9108,8 @@ non_reserved_word
| SERVERWIDE
| INCREMENT
| TRUSTED
| BASE64_DECODE // added in FB 4.0
// added in FB 4.0
| BASE64_DECODE
| BASE64_ENCODE
| BIND
| CLEAR
@ -9149,10 +9173,14 @@ non_reserved_word
| TOTALORDER
| TRAPS
| ZONE
| DEBUG // added in FB 4.0.1
// added in FB 4.0.1
| DEBUG
| PKCS_1_5
| BLOB_APPEND // added in FB 4.0.2
| TARGET // added in FB 5.0
// added in FB 4.0.2
| BLOB_APPEND
// added in FB 5.0
| LOCKED
| TARGET
| TIMEZONE_NAME
| UNICODE_CHAR
| UNICODE_VAL

View File

@ -180,10 +180,10 @@ using namespace Firebird;
static string pass1_alias_concat(const string&, const string&);
static ValueListNode* pass1_group_by_list(DsqlCompilerScratch*, ValueListNode*, ValueListNode*);
static ValueExprNode* pass1_make_derived_field(thread_db*, DsqlCompilerScratch*, ValueExprNode*);
static RseNode* pass1_rse(DsqlCompilerScratch*, RecordSourceNode*, ValueListNode*, RowsClause*, bool, USHORT);
static RseNode* pass1_rse_impl(DsqlCompilerScratch*, RecordSourceNode*, ValueListNode*, RowsClause*, bool, USHORT);
static RseNode* pass1_rse(DsqlCompilerScratch*, RecordSourceNode*, ValueListNode*, RowsClause*, bool, bool, USHORT);
static RseNode* pass1_rse_impl(DsqlCompilerScratch*, RecordSourceNode*, ValueListNode*, RowsClause*, bool, bool, USHORT);
static ValueListNode* pass1_sel_list(DsqlCompilerScratch*, ValueListNode*);
static RseNode* pass1_union(DsqlCompilerScratch*, UnionSourceNode*, ValueListNode*, RowsClause*, bool, USHORT);
static RseNode* pass1_union(DsqlCompilerScratch*, UnionSourceNode*, ValueListNode*, RowsClause*, bool, bool, USHORT);
static void pass1_union_auto_cast(DsqlCompilerScratch*, ExprNode*, const dsc&, FB_SIZE_T position);
static void remap_streams_to_parent_context(ExprNode*, dsql_ctx*);
@ -578,13 +578,13 @@ dsql_ctx* PASS1_make_context(DsqlCompilerScratch* dsqlScratch, RecordSourceNode*
// Compile a record selection expression, bumping up the statement scope level everytime an rse is
// seen. The scope level controls parsing of aliases.
RseNode* PASS1_rse(DsqlCompilerScratch* dsqlScratch, SelectExprNode* input, bool updateLock)
RseNode* PASS1_rse(DsqlCompilerScratch* dsqlScratch, SelectExprNode* input, bool updateLock, bool skipLocked)
{
DEV_BLKCHK(dsqlScratch, dsql_type_req);
DEV_BLKCHK(input, dsql_type_nod);
dsqlScratch->scopeLevel++;
RseNode* node = pass1_rse(dsqlScratch, input, NULL, NULL, updateLock, 0);
RseNode* node = pass1_rse(dsqlScratch, input, NULL, NULL, updateLock, skipLocked, 0);
dsqlScratch->scopeLevel--;
return node;
@ -975,7 +975,7 @@ void PASS1_expand_contexts(DsqlContextStack& contexts, dsql_ctx* context)
// Process derived table which is part of a from clause.
RseNode* PASS1_derived_table(DsqlCompilerScratch* dsqlScratch, SelectExprNode* input,
const char* cte_alias, bool updateLock)
const char* cte_alias, bool updateLock, bool skipLocked)
{
DEV_BLKCHK(dsqlScratch, dsql_type_req);
@ -1037,7 +1037,7 @@ RseNode* PASS1_derived_table(DsqlCompilerScratch* dsqlScratch, SelectExprNode* i
recursive_map_ctx = dsqlScratch->contextNumber++;
dsqlScratch->recursiveCtxId = dsqlScratch->contextNumber;
rse = pass1_union(dsqlScratch, unionQuery, NULL, NULL, false, 0);
rse = pass1_union(dsqlScratch, unionQuery, NULL, NULL, false, false, 0);
dsqlScratch->contextNumber = dsqlScratch->recursiveCtxId + 1;
// recursive union always has exactly 2 members
@ -1073,10 +1073,10 @@ RseNode* PASS1_derived_table(DsqlCompilerScratch* dsqlScratch, SelectExprNode* i
unionExpr->dsqlClauses = FB_NEW_POOL(pool) RecSourceListNode(pool, 1);
unionExpr->dsqlClauses->items[0] = input;
unionExpr->dsqlAll = true;
rse = pass1_union(dsqlScratch, unionExpr, NULL, NULL, false, 0);
rse = pass1_union(dsqlScratch, unionExpr, NULL, NULL, false, false, 0);
}
else
rse = PASS1_rse(dsqlScratch, input, updateLock);
rse = PASS1_rse(dsqlScratch, input, updateLock, skipLocked);
// Finish off by cleaning up contexts and put them into derivedContext
// so create view (ddl) can deal with it.
@ -1237,7 +1237,7 @@ RseNode* PASS1_derived_table(DsqlCompilerScratch* dsqlScratch, SelectExprNode* i
dsqlScratch->currCteAlias ? *dsqlScratch->currCteAlias : NULL;
dsqlScratch->resetCTEAlias(alias);
rse = PASS1_rse(dsqlScratch, input, updateLock);
rse = PASS1_rse(dsqlScratch, input, updateLock, skipLocked);
if (saveCteAlias)
dsqlScratch->resetCTEAlias(*saveCteAlias);
@ -1755,7 +1755,7 @@ static string pass1_alias_concat(const string& input1, const string& input2)
// Wrapper for pass1_rse_impl. Substitute recursive CTE alias (if needed) and call pass1_rse_impl.
static RseNode* pass1_rse(DsqlCompilerScratch* dsqlScratch, RecordSourceNode* input,
ValueListNode* order, RowsClause* rows, bool updateLock, USHORT flags)
ValueListNode* order, RowsClause* rows, bool updateLock, bool skipLocked, USHORT flags)
{
string save_alias;
RseNode* rseNode = nodeAs<RseNode>(input);
@ -1774,7 +1774,7 @@ static RseNode* pass1_rse(DsqlCompilerScratch* dsqlScratch, RecordSourceNode* in
dsqlScratch->scopeLevel = dsqlScratch->recursiveCtx->ctx_scope_level;
}
RseNode* ret = pass1_rse_impl(dsqlScratch, input, order, rows, updateLock, flags);
RseNode* ret = pass1_rse_impl(dsqlScratch, input, order, rows, updateLock, skipLocked, flags);
if (isRecursive)
dsqlScratch->recursiveCtx->ctx_alias = save_alias;
@ -1786,7 +1786,7 @@ static RseNode* pass1_rse(DsqlCompilerScratch* dsqlScratch, RecordSourceNode* in
// Compile a record selection expression. The input node may either be a "select_expression"
// or a "list" (an implicit union) or a "query specification".
static RseNode* pass1_rse_impl(DsqlCompilerScratch* dsqlScratch, RecordSourceNode* input,
ValueListNode* order, RowsClause* rows, bool updateLock, USHORT flags)
ValueListNode* order, RowsClause* rows, bool updateLock, bool skipLocked, USHORT flags)
{
DEV_BLKCHK(dsqlScratch, dsql_type_req);
@ -1802,7 +1802,7 @@ static RseNode* pass1_rse_impl(DsqlCompilerScratch* dsqlScratch, RecordSourceNod
dsqlScratch->addCTEs(withClause);
RseNode* ret = pass1_rse(dsqlScratch, selNode->querySpec, selNode->orderClause,
selNode->rowsClause, updateLock, selNode->dsqlFlags);
selNode->rowsClause, updateLock, skipLocked, selNode->dsqlFlags);
if (withClause)
{
@ -1822,7 +1822,7 @@ static RseNode* pass1_rse_impl(DsqlCompilerScratch* dsqlScratch, RecordSourceNod
else if (auto unionNode = nodeAs<UnionSourceNode>(input))
{
fb_assert(unionNode->dsqlClauses->items.hasData());
return pass1_union(dsqlScratch, unionNode, order, rows, updateLock, flags);
return pass1_union(dsqlScratch, unionNode, order, rows, updateLock, skipLocked, flags);
}
RseNode* inputRse = nodeAs<RseNode>(input);
@ -1836,6 +1836,9 @@ static RseNode* pass1_rse_impl(DsqlCompilerScratch* dsqlScratch, RecordSourceNod
if (updateLock)
rse->flags |= RseNode::FLAG_WRITELOCK;
if (skipLocked)
rse->flags |= RseNode::FLAG_SKIP_LOCKED;
rse->dsqlStreams = Node::doDsqlPass(dsqlScratch, inputRse->dsqlFrom, false);
RecSourceListNode* streamList = rse->dsqlStreams;
@ -2180,6 +2183,12 @@ static RseNode* pass1_rse_impl(DsqlCompilerScratch* dsqlScratch, RecordSourceNod
rse->flags &= ~RseNode::FLAG_WRITELOCK;
}
if (rse->flags & RseNode::FLAG_SKIP_LOCKED)
{
parentRse->flags |= RseNode::FLAG_SKIP_LOCKED;
rse->flags &= ~RseNode::FLAG_SKIP_LOCKED;
}
if (rse->dsqlFirst)
{
parentRse->dsqlFirst = rse->dsqlFirst;
@ -2410,7 +2419,7 @@ ValueListNode* PASS1_sort(DsqlCompilerScratch* dsqlScratch, ValueListNode* input
// Handle a UNION of substreams, generating a mapping of all the fields and adding an implicit
// PROJECT clause to ensure that all the records returned are unique.
static RseNode* pass1_union(DsqlCompilerScratch* dsqlScratch, UnionSourceNode* input,
ValueListNode* orderList, RowsClause* rows, bool updateLock, USHORT flags)
ValueListNode* orderList, RowsClause* rows, bool updateLock, bool skipLocked, USHORT flags)
{
DEV_BLKCHK(dsqlScratch, dsql_type_req);
@ -2466,12 +2475,15 @@ static RseNode* pass1_union(DsqlCompilerScratch* dsqlScratch, UnionSourceNode* i
++ptr, ++uptr)
{
dsqlScratch->scopeLevel++;
*uptr = pass1_rse(dsqlScratch, *ptr, NULL, NULL, updateLock, 0);
*uptr = pass1_rse(dsqlScratch, *ptr, NULL, NULL, updateLock, skipLocked, 0);
dsqlScratch->scopeLevel--;
if (updateLock)
nodeAs<RseNode>(*uptr)->flags &= ~RseNode::FLAG_WRITELOCK;
if (skipLocked)
nodeAs<RseNode>(*uptr)->flags &= ~RseNode::FLAG_SKIP_LOCKED;
while (*(dsqlScratch->context) != base)
dsqlScratch->unionContext.push(dsqlScratch->context->pop());
@ -2648,6 +2660,9 @@ static RseNode* pass1_union(DsqlCompilerScratch* dsqlScratch, UnionSourceNode* i
if (updateLock)
unionRse->flags |= RseNode::FLAG_WRITELOCK;
if (skipLocked)
unionRse->flags |= RseNode::FLAG_SKIP_LOCKED;
unionRse->dsqlFlags = flags;
return unionRse;

View File

@ -41,7 +41,7 @@ void PASS1_ambiguity_check(Jrd::DsqlCompilerScratch*, const Jrd::MetaName&, cons
void PASS1_check_unique_fields_names(Jrd::StrArray& names, const Jrd::CompoundStmtNode* fields);
Jrd::BoolExprNode* PASS1_compose(Jrd::BoolExprNode*, Jrd::BoolExprNode*, UCHAR);
Jrd::DeclareCursorNode* PASS1_cursor_name(Jrd::DsqlCompilerScratch*, const Jrd::MetaName&, USHORT, bool);
Jrd::RseNode* PASS1_derived_table(Jrd::DsqlCompilerScratch*, Jrd::SelectExprNode*, const char*, bool);
Jrd::RseNode* PASS1_derived_table(Jrd::DsqlCompilerScratch*, Jrd::SelectExprNode*, const char*, bool, bool);
void PASS1_expand_contexts(Jrd::DsqlContextStack& contexts, Jrd::dsql_ctx* context);
Jrd::ValueListNode* PASS1_expand_select_list(Jrd::DsqlCompilerScratch*, Jrd::ValueListNode*, Jrd::RecSourceListNode*);
void PASS1_expand_select_node(Jrd::DsqlCompilerScratch*, Jrd::ExprNode*, Jrd::ValueListNode*, bool);
@ -55,7 +55,7 @@ bool PASS1_node_match(Jrd::DsqlCompilerScratch*, const Jrd::ExprNode*, const Jrd
Jrd::DsqlMapNode* PASS1_post_map(Jrd::DsqlCompilerScratch*, Jrd::ValueExprNode*, Jrd::dsql_ctx*,
Jrd::WindowClause*);
Jrd::RecordSourceNode* PASS1_relation(Jrd::DsqlCompilerScratch*, Jrd::RecordSourceNode*);
Jrd::RseNode* PASS1_rse(Jrd::DsqlCompilerScratch*, Jrd::SelectExprNode*, bool);
Jrd::RseNode* PASS1_rse(Jrd::DsqlCompilerScratch*, Jrd::SelectExprNode*, bool, bool);
bool PASS1_set_parameter_type(Jrd::DsqlCompilerScratch*, Jrd::ValueExprNode*, std::function<void (dsc*)>, bool);
bool PASS1_set_parameter_type(Jrd::DsqlCompilerScratch*, Jrd::ValueExprNode*, NestConst<Jrd::ValueExprNode>, bool);
Jrd::ValueListNode* PASS1_sort(Jrd::DsqlCompilerScratch*, Jrd::ValueListNode*, Jrd::ValueListNode*);

View File

@ -322,6 +322,7 @@
#define blr_writelock (unsigned char)179
#define blr_nullslast (unsigned char)180
/* FB 2.0 specific BLR */
#define blr_lowcase (unsigned char)181
@ -444,6 +445,8 @@
#define blr_marks (unsigned char) 217 // mark some blr code with specific flags
// FB 5.0 specific BLR
#define blr_dcl_local_table (unsigned char) 218
// subcodes of blr_dcl_local_table
@ -459,4 +462,6 @@
// json functions (reserved)
#define blr_json_function (unsigned char) 222
#define blr_skip_locked (unsigned char) 223
#endif // FIREBIRD_IMPL_BLR_H

View File

@ -3445,7 +3445,7 @@ string SelectExprNode::internalPrint(NodePrinter& printer) const
RseNode* SelectExprNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
{
fb_assert(dsqlFlags & DFLAG_DERIVED);
return PASS1_derived_table(dsqlScratch, this, NULL, false);
return PASS1_derived_table(dsqlScratch, this, NULL, false, false);
}
@ -3514,7 +3514,7 @@ static RecordSourceNode* dsqlPassRelProc(DsqlCompilerScratch* dsqlScratch, Recor
dsqlScratch->currCtes.push(cte);
RseNode* derivedNode = PASS1_derived_table(dsqlScratch,
cte, (isRecursive ? relAlias.c_str() : NULL), false);
cte, (isRecursive ? relAlias.c_str() : NULL), false, false);
if (!isRecursive)
cte->alias = saveCteName;

View File

@ -724,6 +724,7 @@ public:
static const USHORT FLAG_DSQL_COMPARATIVE = 0x10; // transformed from DSQL ComparativeBoolNode
static const USHORT FLAG_OPT_FIRST_ROWS = 0x20; // optimize retrieval for first rows
static const USHORT FLAG_LATERAL = 0x40; // lateral derived table
static const USHORT FLAG_SKIP_LOCKED = 0x80; // skip locked
explicit RseNode(MemoryPool& pool)
: TypedNode<RecordSourceNode, RecordSourceNode::TYPE_RSE>(pool),

View File

@ -253,5 +253,6 @@ static const struct
{"local_table_truncate", one_word},
{"local_table_id", local_table},
{"outer_map", outer_map},
{"skip_locked", zero},
{0, 0}
};

View File

@ -1027,20 +1027,13 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
if (invariantBoolean)
rsb = FB_NEW_POOL(getPool()) PreFilteredStream(csb, rsb, invariantBoolean);
// Handle first and/or skip. The skip MUST (if present)
// appear in the rsb list AFTER the first. Since the gen_first and gen_skip
// functions add their nodes at the beginning of the rsb list we MUST call
// gen_skip before gen_first.
// Handle SKIP, WITH LOCK and FIRST.
// The SKIP must (if present) appear in the rsb list deeper than FIRST.
// WITH LOCK must appear between them to work correct with SKIP LOCKED.
if (rse->rse_skip)
rsb = FB_NEW_POOL(getPool()) SkipRowsStream(csb, rsb, rse->rse_skip);
if (rse->rse_first)
rsb = FB_NEW_POOL(getPool()) FirstRowsStream(csb, rsb, rse->rse_first);
if (rse->flags & RseNode::FLAG_SINGULAR)
rsb = FB_NEW_POOL(getPool()) SingularStream(csb, rsb);
if (rse->flags & RseNode::FLAG_WRITELOCK)
{
for (const auto compileStream : compileStreams)
@ -1055,9 +1048,15 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
SCL_update, obj_relations, tail->csb_relation->rel_name);
}
rsb = FB_NEW_POOL(getPool()) LockedStream(csb, rsb);
rsb = FB_NEW_POOL(getPool()) LockedStream(csb, rsb, (rse->flags & RseNode::FLAG_SKIP_LOCKED));
}
if (rse->rse_first)
rsb = FB_NEW_POOL(getPool()) FirstRowsStream(csb, rsb, rse->rse_first);
if (rse->flags & RseNode::FLAG_SINGULAR)
rsb = FB_NEW_POOL(getPool()) SingularStream(csb, rsb);
if (rse->flags & RseNode::FLAG_SCROLLABLE)
rsb = FB_NEW_POOL(getPool()) BufferedStream(csb, rsb);

View File

@ -1403,6 +1403,17 @@ RseNode* PAR_rse(thread_db* tdbb, CompilerScratch* csb, SSHORT rse_op)
rse->flags |= RseNode::FLAG_WRITELOCK;
break;
case blr_skip_locked:
if (!(rse->flags & RseNode::FLAG_WRITELOCK))
{
PAR_error(csb,
Arg::Gds(isc_random) <<
"blr_skip_locked cannot be used without previous blr_writelock",
false);
}
rse->flags |= RseNode::FLAG_SKIP_LOCKED;
break;
default:
if (op == (UCHAR) blr_end)
{

View File

@ -109,10 +109,9 @@ bool BaseAggWinStream<ThisType, NextType>::refetchRecord(thread_db* tdbb) const
}
template <typename ThisType, typename NextType>
bool BaseAggWinStream<ThisType, NextType>::lockRecord(thread_db* /*tdbb*/) const
WriteLockResult BaseAggWinStream<ThisType, NextType>::lockRecord(thread_db* /*tdbb*/, bool /*skipLocked*/) const
{
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
return false; // compiler silencer
}
template <typename ThisType, typename NextType>

View File

@ -309,9 +309,9 @@ bool BufferedStream::refetchRecord(thread_db* tdbb) const
return m_next->refetchRecord(tdbb);
}
bool BufferedStream::lockRecord(thread_db* tdbb) const
WriteLockResult BufferedStream::lockRecord(thread_db* tdbb, bool skipLocked) const
{
return m_next->lockRecord(tdbb);
return m_next->lockRecord(tdbb, skipLocked);
}
void BufferedStream::getChildren(Array<const RecordSource*>& children) const

View File

@ -103,15 +103,15 @@ bool ConditionalStream::refetchRecord(thread_db* tdbb) const
return impure->irsb_next->refetchRecord(tdbb);
}
bool ConditionalStream::lockRecord(thread_db* tdbb) const
WriteLockResult ConditionalStream::lockRecord(thread_db* tdbb, bool skipLocked) const
{
Request* const request = tdbb->getRequest();
Impure* const impure = request->getImpure<Impure>(m_impure);
if (!(impure->irsb_flags & irsb_open))
return false;
return WriteLockResult::CONFLICTED;
return impure->irsb_next->lockRecord(tdbb);
return impure->irsb_next->lockRecord(tdbb, skipLocked);
}
void ConditionalStream::getChildren(Array<const RecordSource*>& children) const

View File

@ -108,12 +108,11 @@ bool ExternalTableScan::refetchRecord(thread_db* /*tdbb*/) const
return true;
}
bool ExternalTableScan::lockRecord(thread_db* tdbb) const
WriteLockResult ExternalTableScan::lockRecord(thread_db* tdbb, bool skipLocked) const
{
SET_TDBB(tdbb);
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
return false; // compiler silencer
}
void ExternalTableScan::getChildren(Array<const RecordSource*>& children) const

View File

@ -111,9 +111,9 @@ bool FilteredStream::refetchRecord(thread_db* tdbb) const
m_boolean->execute(tdbb, request);
}
bool FilteredStream::lockRecord(thread_db* tdbb) const
WriteLockResult FilteredStream::lockRecord(thread_db* tdbb, bool skipLocked) const
{
return m_next->lockRecord(tdbb);
return m_next->lockRecord(tdbb, skipLocked);
}
void FilteredStream::getChildren(Array<const RecordSource*>& children) const

View File

@ -115,9 +115,9 @@ bool FirstRowsStream::refetchRecord(thread_db* tdbb) const
return m_next->refetchRecord(tdbb);
}
bool FirstRowsStream::lockRecord(thread_db* tdbb) const
WriteLockResult FirstRowsStream::lockRecord(thread_db* tdbb, bool skipLocked) const
{
return m_next->lockRecord(tdbb);
return m_next->lockRecord(tdbb, skipLocked);
}
void FirstRowsStream::getChildren(Array<const RecordSource*>& children) const

View File

@ -105,12 +105,11 @@ bool FullOuterJoin::refetchRecord(thread_db* /*tdbb*/) const
return true;
}
bool FullOuterJoin::lockRecord(thread_db* tdbb) const
WriteLockResult FullOuterJoin::lockRecord(thread_db* tdbb, bool skipLocked) const
{
SET_TDBB(tdbb);
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
return false; // compiler silencer
}
void FullOuterJoin::getChildren(Array<const RecordSource*>& children) const

View File

@ -445,10 +445,9 @@ bool HashJoin::refetchRecord(thread_db* /*tdbb*/) const
return true;
}
bool HashJoin::lockRecord(thread_db* /*tdbb*/) const
WriteLockResult HashJoin::lockRecord(thread_db* /*tdbb*/, bool /*skipLocked*/) const
{
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
return false; // compiler silencer
}
void HashJoin::getChildren(Array<const RecordSource*>& children) const

View File

@ -108,10 +108,9 @@ bool LocalTableStream::refetchRecord(thread_db* tdbb) const
return true;
}
bool LocalTableStream::lockRecord(thread_db* tdbb) const
WriteLockResult LocalTableStream::lockRecord(thread_db* tdbb, bool skipLocked) const
{
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
return false; // compiler silencer
}
void LocalTableStream::print(thread_db* tdbb, string& plan, bool detailed, unsigned level, bool recurse) const

View File

@ -24,6 +24,7 @@
#include "../jrd/jrd.h"
#include "../jrd/req.h"
#include "../jrd/cmp_proto.h"
#include "../jrd/vio_proto.h"
#include "RecordSource.h"
@ -34,9 +35,10 @@ using namespace Jrd;
// Data access: stream locked for write
// ------------------------------------
LockedStream::LockedStream(CompilerScratch* csb, RecordSource* next)
LockedStream::LockedStream(CompilerScratch* csb, RecordSource* next, bool skipLocked)
: RecordSource(csb),
m_next(next)
m_next(next),
m_skipLocked(skipLocked)
{
fb_assert(m_next);
@ -84,9 +86,14 @@ bool LockedStream::internalGetRecord(thread_db* tdbb) const
{
do {
// Attempt to lock the record
if (m_next->lockRecord(tdbb))
const auto lockResult = m_next->lockRecord(tdbb, m_skipLocked);
if (lockResult == WriteLockResult::LOCKED)
return true; // locked
if (lockResult == WriteLockResult::SKIPPED)
break; // skip locked record
// Refetch the record and ensure it still fulfils the search condition
} while (m_next->refetchRecord(tdbb));
}
@ -99,9 +106,9 @@ bool LockedStream::refetchRecord(thread_db* tdbb) const
return m_next->refetchRecord(tdbb);
}
bool LockedStream::lockRecord(thread_db* tdbb) const
WriteLockResult LockedStream::lockRecord(thread_db* tdbb, bool skipLocked) const
{
return m_next->lockRecord(tdbb);
return m_next->lockRecord(tdbb, skipLocked);
}
void LockedStream::getChildren(Array<const RecordSource*>& children) const

View File

@ -337,10 +337,9 @@ bool MergeJoin::refetchRecord(thread_db* /*tdbb*/) const
return true;
}
bool MergeJoin::lockRecord(thread_db* /*tdbb*/) const
WriteLockResult MergeJoin::lockRecord(thread_db* /*tdbb*/, bool /*skipLocked*/) const
{
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
return false; // compiler silencer
}
void MergeJoin::getChildren(Array<const RecordSource*>& children) const

View File

@ -203,10 +203,9 @@ bool NestedLoopJoin::refetchRecord(thread_db* /*tdbb*/) const
return true;
}
bool NestedLoopJoin::lockRecord(thread_db* /*tdbb*/) const
WriteLockResult NestedLoopJoin::lockRecord(thread_db* /*tdbb*/, bool /*skipLocked*/) const
{
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
return false; // compiler silencer
}
void NestedLoopJoin::getChildren(Array<const RecordSource*>& children) const

View File

@ -243,10 +243,9 @@ bool ProcedureScan::refetchRecord(thread_db* /*tdbb*/) const
return true;
}
bool ProcedureScan::lockRecord(thread_db* /*tdbb*/) const
WriteLockResult ProcedureScan::lockRecord(thread_db* /*tdbb*/, bool /*skipLocked*/) const
{
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
return false; // compiler silencer
}
void ProcedureScan::getChildren(Array<const RecordSource*>& children) const

View File

@ -26,6 +26,7 @@
#include "../jrd/intl.h"
#include "../jrd/req.h"
#include "../jrd/ProfilerManager.h"
#include "../jrd/tra.h"
#include "../jrd/cmp_proto.h"
#include "../jrd/dpm_proto.h"
#include "../jrd/err_proto.h"
@ -287,7 +288,7 @@ bool RecordStream::refetchRecord(thread_db* tdbb) const
return false;
}
bool RecordStream::lockRecord(thread_db* tdbb) const
WriteLockResult RecordStream::lockRecord(thread_db* tdbb, bool skipLocked) const
{
Request* const request = tdbb->getRequest();
jrd_tra* const transaction = request->req_transaction;
@ -299,7 +300,7 @@ bool RecordStream::lockRecord(thread_db* tdbb) const
RLCK_reserve_relation(tdbb, transaction, relation, true);
return VIO_writelock(tdbb, rpb, transaction);
return VIO_writelock(tdbb, rpb, transaction, skipLocked);
}
void RecordStream::markRecursive()

View File

@ -31,6 +31,7 @@
#include "../jrd/RecordBuffer.h"
#include "firebird/impl/inf_pub.h"
#include "../jrd/evl_proto.h"
#include "../jrd/vio_proto.h"
namespace Jrd
{
@ -60,7 +61,7 @@ namespace Jrd
virtual void close(thread_db* tdbb) const = 0;
virtual bool refetchRecord(thread_db* tdbb) const = 0;
virtual bool lockRecord(thread_db* tdbb) const = 0;
virtual WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const = 0;
virtual void getChildren(Firebird::Array<const RecordSource*>& children) const = 0;
@ -151,7 +152,7 @@ namespace Jrd
RecordStream(CompilerScratch* csb, StreamType stream, const Format* format = NULL);
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void markRecursive() override;
void invalidateRecords(Request* request) const override;
@ -305,7 +306,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -330,7 +331,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -366,7 +367,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -399,7 +400,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -426,12 +427,12 @@ namespace Jrd
class LockedStream : public RecordSource
{
public:
LockedStream(CompilerScratch* csb, RecordSource* next);
LockedStream(CompilerScratch* csb, RecordSource* next, bool skipLocked);
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -450,6 +451,7 @@ namespace Jrd
private:
NestConst<RecordSource> m_next;
const bool m_skipLocked;
};
class FirstRowsStream : public RecordSource
@ -465,7 +467,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -505,7 +507,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -541,7 +543,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -662,7 +664,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -809,7 +811,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void markRecursive() override;
void invalidateRecords(Request* request) const override;
@ -991,7 +993,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -1059,7 +1061,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -1103,7 +1105,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -1136,7 +1138,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -1190,7 +1192,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -1256,7 +1258,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -1294,7 +1296,7 @@ namespace Jrd
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void print(thread_db* tdbb, Firebird::string& plan,
bool detailed, unsigned level, bool recurse) const override;
@ -1322,7 +1324,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -1367,7 +1369,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -1409,7 +1411,7 @@ namespace Jrd
void close(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;

View File

@ -233,10 +233,9 @@ bool RecursiveStream::refetchRecord(thread_db* /*tdbb*/) const
return true;
}
bool RecursiveStream::lockRecord(thread_db* /*tdbb*/) const
WriteLockResult RecursiveStream::lockRecord(thread_db* /*tdbb*/, bool /*skipLocked*/) const
{
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
return false; // compiler silencer
}
void RecursiveStream::getChildren(Array<const RecordSource*>& children) const

View File

@ -141,9 +141,9 @@ bool SingularStream::refetchRecord(thread_db* tdbb) const
return m_next->refetchRecord(tdbb);
}
bool SingularStream::lockRecord(thread_db* tdbb) const
WriteLockResult SingularStream::lockRecord(thread_db* tdbb, bool skipLocked) const
{
return m_next->lockRecord(tdbb);
return m_next->lockRecord(tdbb, skipLocked);
}
void SingularStream::getChildren(Array<const RecordSource*>& children) const

View File

@ -111,9 +111,9 @@ bool SkipRowsStream::refetchRecord(thread_db* tdbb) const
return m_next->refetchRecord(tdbb);
}
bool SkipRowsStream::lockRecord(thread_db* tdbb) const
WriteLockResult SkipRowsStream::lockRecord(thread_db* tdbb, bool skipLocked) const
{
return m_next->lockRecord(tdbb);
return m_next->lockRecord(tdbb, skipLocked);
}
void SkipRowsStream::getChildren(Array<const RecordSource*>& children) const

View File

@ -116,9 +116,9 @@ bool SortedStream::refetchRecord(thread_db* tdbb) const
return m_next->refetchRecord(tdbb);
}
bool SortedStream::lockRecord(thread_db* tdbb) const
WriteLockResult SortedStream::lockRecord(thread_db* tdbb, bool skipLocked) const
{
return m_next->lockRecord(tdbb);
return m_next->lockRecord(tdbb, skipLocked);
}
void SortedStream::getChildren(Array<const RecordSource*>& children) const

View File

@ -151,15 +151,15 @@ bool Union::refetchRecord(thread_db* tdbb) const
return m_args[impure->irsb_count]->refetchRecord(tdbb);
}
bool Union::lockRecord(thread_db* tdbb) const
WriteLockResult Union::lockRecord(thread_db* tdbb, bool skipLocked) const
{
Request* const request = tdbb->getRequest();
Impure* const impure = request->getImpure<Impure>(m_impure);
if (impure->irsb_count >= m_args.getCount())
return false;
return WriteLockResult::CONFLICTED;
return m_args[impure->irsb_count]->lockRecord(tdbb);
return m_args[impure->irsb_count]->lockRecord(tdbb, skipLocked);
}
void Union::getChildren(Array<const RecordSource*>& children) const

View File

@ -104,10 +104,9 @@ bool VirtualTableScan::refetchRecord(thread_db* /*tdbb*/) const
return true;
}
bool VirtualTableScan::lockRecord(thread_db* /*tdbb*/) const
WriteLockResult VirtualTableScan::lockRecord(thread_db* /*tdbb*/, bool /*skipLocked*/) const
{
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
return false; // compiler silencer
}
void VirtualTableScan::getChildren(Array<const RecordSource*>& children) const

View File

@ -26,6 +26,7 @@
#include "../jrd/evl_proto.h"
#include "../jrd/exe_proto.h"
#include "../jrd/par_proto.h"
#include "../jrd/vio_proto.h"
#include "../jrd/optimizer/Optimizer.h"
#include "RecordSource.h"
@ -56,7 +57,7 @@ namespace
bool internalGetRecord(thread_db* tdbb) const override;
bool refetchRecord(thread_db* tdbb) const override;
bool lockRecord(thread_db* tdbb) const override;
WriteLockResult lockRecord(thread_db* tdbb, bool skipLocked) const override;
void getChildren(Firebird::Array<const RecordSource*>& children) const override;
@ -141,9 +142,9 @@ namespace
return m_next->refetchRecord(tdbb);
}
bool BufferedStreamWindow::lockRecord(thread_db* tdbb) const
WriteLockResult BufferedStreamWindow::lockRecord(thread_db* tdbb, bool skipLocked) const
{
return m_next->lockRecord(tdbb);
return m_next->lockRecord(tdbb, skipLocked);
}
void BufferedStreamWindow::getChildren(Array<const RecordSource*>& children) const
@ -392,10 +393,9 @@ bool WindowedStream::refetchRecord(thread_db* tdbb) const
return m_joinedStream->refetchRecord(tdbb);
}
bool WindowedStream::lockRecord(thread_db* /*tdbb*/) const
WriteLockResult WindowedStream::lockRecord(thread_db* /*tdbb*/, bool /*skipLocked*/) const
{
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
return false; // compiler silencer
}
void WindowedStream::getChildren(Array<const RecordSource*>& children) const

View File

@ -156,13 +156,17 @@ static void list_staying_fast(thread_db*, record_param*, RecordStack&, record_pa
static void notify_garbage_collector(thread_db* tdbb, record_param* rpb,
TraNumber tranid = MAX_TRA_NUMBER);
const int PREPARE_OK = 0;
const int PREPARE_CONFLICT = 1;
const int PREPARE_DELETE = 2;
const int PREPARE_LOCKERR = 3;
enum class PrepareResult
{
SUCCESS,
CONFLICT,
DELETED,
SKIP_LOCKED,
LOCK_ERROR
};
static int prepare_update(thread_db*, jrd_tra*, TraNumber commit_tid_read, record_param*,
record_param*, record_param*, PageStack&, bool);
static PrepareResult prepare_update(thread_db*, jrd_tra*, TraNumber commit_tid_read, record_param*,
record_param*, record_param*, PageStack&, TriState writeLockSkipLocked = {});
static void protect_system_table_insert(thread_db* tdbb, const Request* req, const jrd_rel* relation,
bool force_flag = false);
@ -1877,7 +1881,8 @@ void VIO_data(thread_db* tdbb, record_param* rpb, MemoryPool* pool)
}
static bool check_prepare_result(int prepare_result, jrd_tra* transaction, Request* request, record_param* rpb)
static bool check_prepare_result(PrepareResult prepare_result, jrd_tra* transaction,
Request* request, record_param* rpb)
{
/**************************************
*
@ -1892,7 +1897,9 @@ static bool check_prepare_result(int prepare_result, jrd_tra* transaction, Reque
* handle request restart.
*
**************************************/
if (prepare_result == PREPARE_OK)
fb_assert(prepare_result != PrepareResult::SKIP_LOCKED);
if (prepare_result == PrepareResult::SUCCESS)
return true;
Request* top_request = request->req_snapshot.m_owner;
@ -1905,9 +1912,9 @@ static bool check_prepare_result(int prepare_result, jrd_tra* transaction, Reque
// cursor. In this case all we can do is restart whole request immediately.
const bool secondary = top_request &&
(top_request->req_flags & req_update_conflict) &&
(prepare_result != PREPARE_LOCKERR);
(prepare_result != PrepareResult::LOCK_ERROR);
if (!(transaction->tra_flags & TRA_read_consistency) || prepare_result == PREPARE_LOCKERR ||
if (!(transaction->tra_flags & TRA_read_consistency) || prepare_result == PrepareResult::LOCK_ERROR ||
secondary || !restart_ready)
{
if (secondary)
@ -2351,7 +2358,7 @@ bool VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
{
// Update stub didn't find one page -- do a long, hard update
PageStack stack;
int prepare_result = prepare_update(tdbb, transaction, tid_fetch, rpb, &temp, 0, stack, false);
const auto prepare_result = prepare_update(tdbb, transaction, tid_fetch, rpb, &temp, 0, stack);
if (!check_prepare_result(prepare_result, transaction, request, rpb))
return false;
@ -3649,8 +3656,8 @@ bool VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j
const bool backVersion = (org_rpb->rpb_b_page != 0);
record_param temp;
PageStack stack;
int prepare_result = prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb,
&temp, new_rpb, stack, false);
const auto prepare_result = prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb,
&temp, new_rpb, stack);
if (!check_prepare_result(prepare_result, transaction, tdbb->getRequest(), org_rpb))
return false;
@ -4436,7 +4443,7 @@ bool VIO_sweep(thread_db* tdbb, jrd_tra* transaction, TraceSweepEvent* traceSwee
}
bool VIO_writelock(thread_db* tdbb, record_param* org_rpb, jrd_tra* transaction)
WriteLockResult VIO_writelock(thread_db* tdbb, record_param* org_rpb, jrd_tra* transaction, bool skipLocked)
{
/**************************************
*
@ -4467,13 +4474,13 @@ bool VIO_writelock(thread_db* tdbb, record_param* org_rpb, jrd_tra* transaction)
if (transaction->tra_flags & TRA_system)
{
// Explicit locks are not needed in system transactions
return true;
return WriteLockResult::LOCKED;
}
if (org_rpb->rpb_runtime_flags & (RPB_refetch | RPB_undo_read))
{
if (!VIO_refetch_record(tdbb, org_rpb, transaction, true, true))
return false;
return WriteLockResult::CONFLICTED;
org_rpb->rpb_runtime_flags &= ~RPB_refetch;
fb_assert(!(org_rpb->rpb_runtime_flags & RPB_undo_read));
@ -4482,7 +4489,7 @@ bool VIO_writelock(thread_db* tdbb, record_param* org_rpb, jrd_tra* transaction)
if (org_rpb->rpb_transaction_nr == transaction->tra_number)
{
// We already own this record, thus no writelock is required
return true;
return WriteLockResult::LOCKED;
}
transaction->tra_flags |= TRA_write;
@ -4528,10 +4535,14 @@ bool VIO_writelock(thread_db* tdbb, record_param* org_rpb, jrd_tra* transaction)
record_param temp;
PageStack stack;
switch (prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb, &temp, &new_rpb,
stack, true))
stack, TriState(skipLocked)))
{
case PREPARE_CONFLICT:
case PREPARE_DELETE:
case PrepareResult::DELETED:
if (skipLocked && (transaction->tra_flags & TRA_read_committed))
return WriteLockResult::SKIPPED;
// fall thru
case PrepareResult::CONFLICT:
if ((transaction->tra_flags & TRA_read_consistency))
{
Request* top_request = tdbb->getRequest()->req_snapshot.m_owner;
@ -4549,8 +4560,15 @@ bool VIO_writelock(thread_db* tdbb, record_param* org_rpb, jrd_tra* transaction)
}
}
org_rpb->rpb_runtime_flags |= RPB_refetch;
return false;
case PREPARE_LOCKERR:
return WriteLockResult::CONFLICTED;
case PrepareResult::SKIP_LOCKED:
fb_assert(skipLocked);
if (skipLocked)
return WriteLockResult::SKIPPED;
// fall thru
case PrepareResult::LOCK_ERROR:
// We got some kind of locking error (deadlock, timeout or lock_conflict)
// Error details should be stuffed into status vector at this point
// hvlad: we have no details as TRA_wait has already cleared the status vector
@ -4602,7 +4620,7 @@ bool VIO_writelock(thread_db* tdbb, record_param* org_rpb, jrd_tra* transaction)
notify_garbage_collector(tdbb, org_rpb, transaction->tra_number);
}
return true;
return WriteLockResult::LOCKED;
}
@ -5963,14 +5981,8 @@ static void notify_garbage_collector(thread_db* tdbb, record_param* rpb, TraNumb
}
static int prepare_update( thread_db* tdbb,
jrd_tra* transaction,
TraNumber commit_tid_read,
record_param* rpb,
record_param* temp,
record_param* new_rpb,
PageStack& stack,
bool writelock)
static PrepareResult prepare_update(thread_db* tdbb, jrd_tra* transaction, TraNumber commit_tid_read,
record_param* rpb, record_param* temp, record_param* new_rpb, PageStack& stack, TriState writeLockSkipLocked)
{
/**************************************
*
@ -6117,7 +6129,7 @@ static int prepare_update( thread_db* tdbb,
delete_record(tdbb, temp, 0, NULL);
tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id);
return PREPARE_DELETE;
return PrepareResult::DELETED;
}
}
@ -6159,10 +6171,10 @@ static int prepare_update( thread_db* tdbb,
delete_record(tdbb, temp, 0, NULL);
if (writelock || (transaction->tra_flags & TRA_read_consistency))
if (writeLockSkipLocked.isAssigned() || (transaction->tra_flags & TRA_read_consistency))
{
tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id);
return PREPARE_DELETE;
return PrepareResult::DELETED;
}
IBERROR(188); // msg 188 cannot update erased record
@ -6183,7 +6195,7 @@ static int prepare_update( thread_db* tdbb,
delete_record(tdbb, temp, 0, NULL);
tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id);
return PREPARE_CONFLICT;
return PrepareResult::CONFLICT;
}
/*
@ -6240,7 +6252,7 @@ static int prepare_update( thread_db* tdbb,
const USHORT pageSpaceID = temp->getWindow(tdbb).win_page.getPageSpaceID();
stack.push(PageNumber(pageSpaceID, temp->rpb_page));
}
return PREPARE_OK;
return PrepareResult::SUCCESS;
case tra_active:
case tra_limbo:
@ -6252,10 +6264,12 @@ static int prepare_update( thread_db* tdbb,
#endif
CCH_RELEASE(tdbb, &rpb->getWindow(tdbb));
// Wait as long as it takes for an active transaction which has modified
// the record.
// Wait as long as it takes (if not skipping locks) for an active
// transaction which has modified the record.
state = wait(tdbb, transaction, rpb);
state = writeLockSkipLocked == true ?
TRA_wait(tdbb, transaction, rpb->rpb_transaction_nr, jrd_tra::tra_probe) :
wait(tdbb, transaction, rpb);
if (state == tra_committed)
state = check_precommitted(transaction, rpb);
@ -6287,12 +6301,15 @@ static int prepare_update( thread_db* tdbb,
{
tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id);
if (writeLockSkipLocked == true)
return PrepareResult::SKIP_LOCKED;
// Cannot use Arg::Num here because transaction number is 64-bit unsigned integer
ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Int64(update_conflict_trans));
}
return PREPARE_CONFLICT;
return PrepareResult::CONFLICT;
case tra_limbo:
if (!(transaction->tra_flags & TRA_ignore_limbo))
@ -6303,7 +6320,7 @@ static int prepare_update( thread_db* tdbb,
// fall thru
case tra_active:
return PREPARE_LOCKERR;
return writeLockSkipLocked == true ? PrepareResult::SKIP_LOCKED : PrepareResult::LOCK_ERROR;
case tra_dead:
break;
@ -6327,7 +6344,7 @@ static int prepare_update( thread_db* tdbb,
VIO_backout(tdbb, rpb, transaction);
}
return PREPARE_OK;
return PrepareResult::SUCCESS;
}

View File

@ -43,6 +43,13 @@ namespace Jrd
DPM_next_data_page, // one data page only
DPM_next_pointer_page // data pages from one pointer page
};
enum class WriteLockResult
{
LOCKED,
CONFLICTED,
SKIPPED
};
}
void VIO_backout(Jrd::thread_db*, Jrd::record_param*, const Jrd::jrd_tra*);
@ -58,7 +65,7 @@ bool VIO_get(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, MemoryPool*);
bool VIO_get_current(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*,
MemoryPool*, bool, bool&);
void VIO_init(Jrd::thread_db*);
bool VIO_writelock(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*);
Jrd::WriteLockResult VIO_writelock(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, bool skipLocked);
bool VIO_modify(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*, Jrd::jrd_tra*);
bool VIO_next_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, MemoryPool*, Jrd::FindNextRecordScope);
Jrd::Record* VIO_record(Jrd::thread_db*, Jrd::record_param*, const Jrd::Format*, MemoryPool*);