8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 04:43:03 +01:00

Preserve write locks during query restarts due to update conflicts

This commit is contained in:
nikolay.samofatov 2019-08-09 19:28:52 +03:00
parent c6247f8a26
commit 87e720b918
9 changed files with 113 additions and 54 deletions

View File

@ -2643,7 +2643,11 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, jrd_req* request, WhichTrigger
VirtualTable::erase(tdbb, rpb); VirtualTable::erase(tdbb, rpb);
else if (!relation->rel_view_rse) else if (!relation->rel_view_rse)
{ {
VIO_erase(tdbb, rpb, transaction); while (!VIO_erase(tdbb, rpb, transaction))
{
if (!VIO_refetch_record(tdbb, rpb, transaction, true, true))
return parentStmt;
}
REPL_erase(tdbb, rpb, transaction); REPL_erase(tdbb, rpb, transaction);
} }
@ -6452,7 +6456,11 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, jrd_req* request, WhichTrigg
VirtualTable::modify(tdbb, orgRpb, newRpb); VirtualTable::modify(tdbb, orgRpb, newRpb);
else if (!relation->rel_view_rse) else if (!relation->rel_view_rse)
{ {
VIO_modify(tdbb, orgRpb, newRpb, transaction); while (!VIO_modify(tdbb, orgRpb, newRpb, transaction))
{
if (!VIO_refetch_record(tdbb, orgRpb, transaction, true, true))
return parentStmt;
}
IDX_modify(tdbb, orgRpb, newRpb, transaction); IDX_modify(tdbb, orgRpb, newRpb, transaction);
REPL_modify(tdbb, orgRpb, newRpb, transaction); REPL_modify(tdbb, orgRpb, newRpb, transaction);
} }

View File

@ -851,36 +851,26 @@ void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle,
setupTimer(tdbb); setupTimer(tdbb);
thread_db::TimerGuard timerGuard(tdbb, req_timer, !have_cursor); thread_db::TimerGuard timerGuard(tdbb, req_timer, !have_cursor);
int numTries = 0;
TraNumber prev_concurrent_tx = 0;
while (true)
{ {
try AutoSavePoint savePoint(tdbb, req_transaction);
int numTries = 0;
while (true)
{ {
doExecute(tdbb, traHandle, inMetadata, inMsg, outMetadata, outMsg, singleton); doExecute(tdbb, traHandle, inMetadata, inMsg, outMetadata, outMsg, singleton);
break; if (!(req_request->req_flags & req_update_conflict))
} break;
catch (const status_exception &ex) req_request->req_flags &= ~req_update_conflict;
{ if (numTries >= 10) {
const ISC_STATUS* v = ex.value(); gds__log("Update conflict: unable to get a stable set of rows in the source tables");
if (// Update conflict error ERR_post(Arg::Gds(isc_deadlock) <<
v[0] == isc_arg_gds && Arg::Gds(isc_update_conflict) <<
v[1] == isc_update_conflict && Arg::Gds(isc_concurrent_transaction) << Arg::Num(req_request->req_conflict_txn));
// Read committed transaction with snapshots
(req_transaction->tra_flags & TRA_read_committed) &&
(req_transaction->tra_flags & TRA_read_consistency) &&
// Snapshot has been assigned to the request -
// it was top-level request
!TRA_get_prior_request(tdbb))
{
if (++numTries < 10)
{
fb_utils::init_status(tdbb->tdbb_status_vector);
continue;
}
} }
throw; req_transaction->rollbackSavepoint(tdbb, true);
req_transaction->startSavepoint(tdbb);
numTries++;
} }
savePoint.release(); // everything is ok
} }
trace.finish(have_cursor, ITracePlugin::RESULT_SUCCESS); trace.finish(have_cursor, ITracePlugin::RESULT_SUCCESS);

View File

@ -234,7 +234,7 @@ void VerbAction::mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* next
release(transaction); release(transaction);
} }
void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction) void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks)
{ {
// Undo changes recorded for this verb action. // Undo changes recorded for this verb action.
// After that, clear the verb action and prepare it for later reuse. // After that, clear the verb action and prepare it for later reuse.
@ -258,7 +258,7 @@ void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction)
if (!DPM_get(tdbb, &rpb, LCK_read)) if (!DPM_get(tdbb, &rpb, LCK_read))
BUGCHECK(186); // msg 186 record disappeared BUGCHECK(186); // msg 186 record disappeared
if (have_undo && !(rpb.rpb_flags & rpb_deleted)) if ((have_undo || preserveLocks) && !(rpb.rpb_flags & rpb_deleted))
VIO_data(tdbb, &rpb, transaction->tra_pool); VIO_data(tdbb, &rpb, transaction->tra_pool);
else else
CCH_RELEASE(tdbb, &rpb.getWindow(tdbb)); CCH_RELEASE(tdbb, &rpb.getWindow(tdbb));
@ -267,7 +267,45 @@ void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction)
BUGCHECK(185); // msg 185 wrong record version BUGCHECK(185); // msg 185 wrong record version
if (!have_undo) if (!have_undo)
VIO_backout(tdbb, &rpb, transaction); {
if (preserveLocks) {
// Fetch previous record version and update in place current version with it
record_param temp = rpb;
temp.rpb_page = rpb.rpb_b_page;
temp.rpb_line = rpb.rpb_b_line;
temp.rpb_record = NULL;
if (temp.rpb_flags & rpb_delta)
fb_assert(temp.rpb_prior != NULL);
else
fb_assert(temp.rpb_prior == NULL);
if (!DPM_fetch(tdbb, &temp, LCK_read))
BUGCHECK(291); // msg 291 cannot find record back version
if (!(temp.rpb_flags & rpb_chained) || (temp.rpb_flags & (rpb_blob | rpb_fragment)))
ERR_bugcheck_msg("invalid back version");
VIO_data(tdbb, &temp, tdbb->getDefaultPool());
Record* const save_record = rpb.rpb_record;
if (rpb.rpb_flags & rpb_deleted)
rpb.rpb_record = NULL;
Record* const dead_record = rpb.rpb_record;
VIO_update_in_place(tdbb, transaction, &rpb, &temp);
if (dead_record)
{
rpb.rpb_record = NULL; // VIO_garbage_collect_idx will play with this record dirty tricks
VIO_garbage_collect_idx(tdbb, transaction, &rpb, dead_record);
}
rpb.rpb_record = save_record;
delete temp.rpb_record;
} else
VIO_backout(tdbb, &rpb, transaction);
}
else else
{ {
AutoUndoRecord record(vct_undo->current().setupRecord(transaction)); AutoUndoRecord record(vct_undo->current().setupRecord(transaction));
@ -378,7 +416,7 @@ void Savepoint::cleanupTempData()
} }
} }
Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior) Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior, bool preserveLocks)
{ {
// Undo changes made in this savepoint. // Undo changes made in this savepoint.
// Perform index and BLOB cleanup if needed. // Perform index and BLOB cleanup if needed.
@ -399,7 +437,7 @@ Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior)
{ {
VerbAction* const action = m_actions; VerbAction* const action = m_actions;
action->undo(tdbb, m_transaction); action->undo(tdbb, m_transaction, preserveLocks);
m_actions = action->vct_next; m_actions = action->vct_next;
action->vct_next = m_freeActions; action->vct_next = m_freeActions;

View File

@ -94,7 +94,7 @@ namespace Jrd
UndoItemTree* vct_undo; // Data for undo records UndoItemTree* vct_undo; // Data for undo records
void mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* nextAction); void mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* nextAction);
void undo(thread_db* tdbb, jrd_tra* transaction); void undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks);
void garbageCollectIdxLite(thread_db* tdbb, jrd_tra* transaction, SINT64 recordNumber, void garbageCollectIdxLite(thread_db* tdbb, jrd_tra* transaction, SINT64 recordNumber,
VerbAction* nextAction, Record* goingRecord); VerbAction* nextAction, Record* goingRecord);
@ -231,7 +231,7 @@ namespace Jrd
void cleanupTempData(); void cleanupTempData();
Savepoint* rollback(thread_db* tdbb, Savepoint* prior = NULL); Savepoint* rollback(thread_db* tdbb, Savepoint* prior = NULL, bool preserveLocks = false);
Savepoint* rollforward(thread_db* tdbb, Savepoint* prior = NULL); Savepoint* rollforward(thread_db* tdbb, Savepoint* prior = NULL);
static void destroy(Savepoint*& savepoint) static void destroy(Savepoint*& savepoint)

View File

@ -275,6 +275,7 @@ public:
SINT64 req_fetch_rowcount; // Total number of rows returned by this request SINT64 req_fetch_rowcount; // Total number of rows returned by this request
jrd_req* req_proc_caller; // Procedure's caller request jrd_req* req_proc_caller; // Procedure's caller request
const ValueListNode* req_proc_inputs; // and its node with input parameters const ValueListNode* req_proc_inputs; // and its node with input parameters
TraNumber req_conflict_txn; // Transaction number for update conflict in read consistency mode
ULONG req_src_line; ULONG req_src_line;
ULONG req_src_column; ULONG req_src_column;
@ -397,6 +398,7 @@ const ULONG req_continue_loop = 0x100L; // PSQL continue statement
const ULONG req_proc_fetch = 0x200L; // Fetch from procedure in progress const ULONG req_proc_fetch = 0x200L; // Fetch from procedure in progress
const ULONG req_same_tx_upd = 0x400L; // record was updated by same transaction const ULONG req_same_tx_upd = 0x400L; // record was updated by same transaction
const ULONG req_reserved = 0x800L; // Request reserved for client const ULONG req_reserved = 0x800L; // Request reserved for client
const ULONG req_update_conflict = 0x1000L; // We need to restart request due to update conflict
// Index lock block // Index lock block

View File

@ -1616,8 +1616,11 @@ int TRA_snapshot_state(thread_db* tdbb, const jrd_tra* trans, TraNumber number,
// GC thread accesses data directly without any request // GC thread accesses data directly without any request
if (jrd_req* current_request = tdbb->getRequest()) if (jrd_req* current_request = tdbb->getRequest())
{ {
// There is no request snapshot when we build expression index // Notes:
if (jrd_req* snapshot_request = current_request->req_snapshot.m_owner) // 1) There is no request snapshot when we build expression index
// 2) Disable read committed snapshot after we encountered update conflict
jrd_req* snapshot_request = current_request->req_snapshot.m_owner;
if (snapshot_request && !(snapshot_request->req_flags & req_update_conflict))
{ {
if (stateCn > snapshot_request->req_snapshot.m_number) if (stateCn > snapshot_request->req_snapshot.m_number)
return tra_active; return tra_active;
@ -3847,7 +3850,7 @@ Savepoint* jrd_tra::startSavepoint(bool root)
return savepoint; return savepoint;
} }
void jrd_tra::rollbackSavepoint(thread_db* tdbb) void jrd_tra::rollbackSavepoint(thread_db* tdbb, bool preserveLocks)
/************************************** /**************************************
* *
* r o l l b a c k S a v e p o i n t * r o l l b a c k S a v e p o i n t
@ -3864,7 +3867,7 @@ void jrd_tra::rollbackSavepoint(thread_db* tdbb)
REPL_save_cleanup(tdbb, this, tra_save_point, true); REPL_save_cleanup(tdbb, this, tra_save_point, true);
Jrd::ContextPoolHolder context(tdbb, tra_pool); Jrd::ContextPoolHolder context(tdbb, tra_pool);
tra_save_point = tra_save_point->rollback(tdbb); tra_save_point = tra_save_point->rollback(tdbb, NULL, preserveLocks);
} }
} }

View File

@ -386,7 +386,7 @@ public:
Record* findNextUndo(VerbAction* before_this, jrd_rel* relation, SINT64 number); Record* findNextUndo(VerbAction* before_this, jrd_rel* relation, SINT64 number);
void listStayingUndo(jrd_rel* relation, SINT64 number, RecordStack &staying); void listStayingUndo(jrd_rel* relation, SINT64 number, RecordStack &staying);
Savepoint* startSavepoint(bool root = false); Savepoint* startSavepoint(bool root = false);
void rollbackSavepoint(thread_db* tdbb); void rollbackSavepoint(thread_db* tdbb, bool preserveLocks = false);
void rollbackToSavepoint(thread_db* tdbb, SavNumber number); void rollbackToSavepoint(thread_db* tdbb, SavNumber number);
void rollforwardSavepoint(thread_db* tdbb); void rollforwardSavepoint(thread_db* tdbb);
DbCreatorsList* getDbCreatorsList(); DbCreatorsList* getDbCreatorsList();

View File

@ -1441,7 +1441,7 @@ void VIO_data(thread_db* tdbb, record_param* rpb, MemoryPool* pool)
} }
void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction) bool VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
{ {
/************************************** /**************************************
* *
@ -1497,7 +1497,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
// hvlad: what if record was created\modified by user tx also, // hvlad: what if record was created\modified by user tx also,
// i.e. if there is backversion ??? // i.e. if there is backversion ???
VIO_backout(tdbb, rpb, transaction); VIO_backout(tdbb, rpb, transaction);
return; return true;
} }
transaction->tra_flags |= TRA_write; transaction->tra_flags |= TRA_write;
@ -1891,7 +1891,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
if (transaction->tra_save_point && transaction->tra_save_point->isChanging()) if (transaction->tra_save_point && transaction->tra_save_point->isChanging())
verb_post(tdbb, transaction, rpb, rpb->rpb_undo); verb_post(tdbb, transaction, rpb, rpb->rpb_undo);
return; return true;
} }
const bool backVersion = (rpb->rpb_b_page != 0); const bool backVersion = (rpb->rpb_b_page != 0);
@ -1910,12 +1910,20 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
{ {
// Update stub didn't find one page -- do a long, hard update // Update stub didn't find one page -- do a long, hard update
PageStack stack; PageStack stack;
if (prepare_update(tdbb, transaction, tid_fetch, rpb, &temp, 0, stack, false)) int prepare_result = prepare_update(tdbb, transaction, tid_fetch, rpb, &temp, 0, stack, false);
if (prepare_result &&
(!(transaction->tra_flags & TRA_read_consistency) || prepare_result == PREPARE_LOCKERR))
{ {
ERR_post(Arg::Gds(isc_deadlock) << ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) << Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(rpb->rpb_transaction_nr)); Arg::Gds(isc_concurrent_transaction) << Arg::Num(rpb->rpb_transaction_nr));
} }
if (prepare_result) {
jrd_req* top_request = request->req_snapshot.m_owner;
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = rpb->rpb_transaction_nr;
return false;
}
// Old record was restored and re-fetched for write. Now replace it. // Old record was restored and re-fetched for write. Now replace it.
@ -1974,6 +1982,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
{ {
notify_garbage_collector(tdbb, rpb, transaction->tra_number); notify_garbage_collector(tdbb, rpb, transaction->tra_number);
} }
return true;
} }
@ -2769,7 +2778,7 @@ void VIO_init(thread_db* tdbb)
} }
} }
void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, jrd_tra* transaction) bool VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, jrd_tra* transaction)
{ {
/************************************** /**************************************
* *
@ -2835,7 +2844,7 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j
{ {
VIO_update_in_place(tdbb, transaction, org_rpb, new_rpb); VIO_update_in_place(tdbb, transaction, org_rpb, new_rpb);
tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id); tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id);
return; return true;
} }
check_gbak_cheating_insupd(tdbb, relation, "UPDATE"); check_gbak_cheating_insupd(tdbb, relation, "UPDATE");
@ -3198,19 +3207,27 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j
} }
tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id); tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id);
return; return true;
} }
const bool backVersion = (org_rpb->rpb_b_page != 0); const bool backVersion = (org_rpb->rpb_b_page != 0);
record_param temp; record_param temp;
PageStack stack; PageStack stack;
if (prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb, &temp, new_rpb, int prepare_result = prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb,
stack, false)) &temp, new_rpb, stack, false);
if (prepare_result &&
(!(transaction->tra_flags & TRA_read_consistency) || prepare_result == PREPARE_LOCKERR))
{ {
ERR_post(Arg::Gds(isc_deadlock) << ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) << Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(org_rpb->rpb_transaction_nr)); Arg::Gds(isc_concurrent_transaction) << Arg::Num(org_rpb->rpb_transaction_nr));
} }
if (prepare_result) {
jrd_req* top_request = tdbb->getRequest()->req_snapshot.m_owner;
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = org_rpb->rpb_transaction_nr;
return false;
}
IDX_modify_flag_uk_modified(tdbb, org_rpb, new_rpb, transaction); IDX_modify_flag_uk_modified(tdbb, org_rpb, new_rpb, transaction);
@ -3259,6 +3276,7 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j
{ {
notify_garbage_collector(tdbb, org_rpb, transaction->tra_number); notify_garbage_collector(tdbb, org_rpb, transaction->tra_number);
} }
return true;
} }
@ -5662,7 +5680,7 @@ static int prepare_update( thread_db* tdbb,
delete_record(tdbb, temp, 0, NULL); delete_record(tdbb, temp, 0, NULL);
if (writelock) if (writelock || (transaction->tra_flags & TRA_read_consistency))
{ {
tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id); tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id);
return PREPARE_DELETE; return PREPARE_DELETE;
@ -5785,15 +5803,15 @@ static int prepare_update( thread_db* tdbb,
switch (state) switch (state)
{ {
case tra_committed: case tra_committed:
// We need to loop waiting in read committed with no read consistency transactions only // For SNAPSHOT mode transactions raise error early
if (!(transaction->tra_flags & TRA_read_committed) || if (!(transaction->tra_flags & TRA_read_committed))
(transaction->tra_flags & TRA_read_consistency))
{ {
tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id); tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id);
ERR_post(Arg::Gds(isc_update_conflict) << ERR_post(Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(update_conflict_trans)); Arg::Gds(isc_concurrent_transaction) << Arg::Num(update_conflict_trans));
} }
return PREPARE_CONFLICT;
case tra_limbo: case tra_limbo:
if (!(transaction->tra_flags & TRA_ignore_limbo)) if (!(transaction->tra_flags & TRA_ignore_limbo))

View File

@ -42,7 +42,7 @@ bool VIO_chase_record_version(Jrd::thread_db*, Jrd::record_param*,
Jrd::jrd_tra*, MemoryPool*, bool, bool); Jrd::jrd_tra*, MemoryPool*, bool, bool);
void VIO_copy_record(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*); void VIO_copy_record(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*);
void VIO_data(Jrd::thread_db*, Jrd::record_param*, MemoryPool*); void VIO_data(Jrd::thread_db*, Jrd::record_param*, MemoryPool*);
void VIO_erase(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*); bool VIO_erase(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*);
void VIO_fini(Jrd::thread_db*); void VIO_fini(Jrd::thread_db*);
bool VIO_garbage_collect(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*); bool VIO_garbage_collect(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*);
Jrd::Record* VIO_gc_record(Jrd::thread_db*, Jrd::jrd_rel*); Jrd::Record* VIO_gc_record(Jrd::thread_db*, Jrd::jrd_rel*);
@ -51,7 +51,7 @@ bool VIO_get_current(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*,
MemoryPool*, bool, bool&); MemoryPool*, bool, bool&);
void VIO_init(Jrd::thread_db*); void VIO_init(Jrd::thread_db*);
bool VIO_writelock(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*); bool VIO_writelock(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*);
void VIO_modify(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*, Jrd::jrd_tra*); bool VIO_modify(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*, Jrd::jrd_tra*);
bool VIO_next_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, MemoryPool*, bool); bool VIO_next_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, MemoryPool*, bool);
Jrd::Record* VIO_record(Jrd::thread_db*, Jrd::record_param*, const Jrd::Format*, MemoryPool*); Jrd::Record* VIO_record(Jrd::thread_db*, Jrd::record_param*, const Jrd::Format*, MemoryPool*);
bool VIO_refetch_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, bool, bool); bool VIO_refetch_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, bool, bool);