8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 20:03:02 +01:00

Merge branch 'query_restarts' of https://github.com/samofatov/firebird into query_restarts

This commit is contained in:
hvlad 2019-08-26 20:11:36 +03:00
commit 043f7b2361
13 changed files with 222 additions and 142 deletions

View File

@ -10,7 +10,7 @@ need to raise an exception but do not want the database changes to be rolled-bac
If exceptions are raised inside the body of an autonomous transaction block, the changes are If exceptions are raised inside the body of an autonomous transaction block, the changes are
rolled-back. If the block runs till the end, the transaction is committed. rolled-back. If the block runs till the end, the transaction is committed.
The new transaction is initiated with the same isolation level of the existing one. The new transaction is initiated with snapshot isolation level and lock timeout of the existing one.
Should be used with caution to avoid deadlocks. Should be used with caution to avoid deadlocks.
Author: Author:

View File

@ -2643,7 +2643,19 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, jrd_req* request, WhichTrigger
VirtualTable::erase(tdbb, rpb); VirtualTable::erase(tdbb, rpb);
else if (!relation->rel_view_rse) else if (!relation->rel_view_rse)
{ {
VIO_erase(tdbb, rpb, transaction); // VIO_erase returns false if there is an update conflict in Read Consistency
// transaction. Before returning false it disables statement-level snapshot
// (via setting req_update_conflict flag) so re-fetch should see new data.
// Deleting new version blindly is generally unsafe, but is ok in this situation
// because all changes made by this request will certainly be undone and request
// will be restarted.
while (!VIO_erase(tdbb, rpb, transaction))
{
// VIO_refetch_record returns false if record has been deleted by someone else.
// Gently ignore this situation and proceed further.
if (!VIO_refetch_record(tdbb, rpb, transaction, true, true))
return parentStmt;
}
REPL_erase(tdbb, rpb, transaction); REPL_erase(tdbb, rpb, transaction);
} }
@ -3952,7 +3964,17 @@ const StmtNode* InAutonomousTransactionNode::execute(thread_db* tdbb, jrd_req* r
jrd_tra* const org_transaction = request->req_transaction; jrd_tra* const org_transaction = request->req_transaction;
fb_assert(tdbb->getTransaction() == org_transaction); fb_assert(tdbb->getTransaction() == org_transaction);
jrd_tra* const transaction = TRA_start(tdbb, org_transaction->tra_flags, // Use SNAPSHOT isolation mode in autonomous transactions by default
ULONG transaction_flags = 0;
// Simulate legacy behavior if Read Consistency is not used
if (!dbb->dbb_config->getReadConsistency() &&
!(org_transaction->tra_flags & TRA_read_consistency))
{
transaction_flags = org_transaction->tra_flags;
}
jrd_tra* const transaction = TRA_start(tdbb, transaction_flags,
org_transaction->tra_lock_timeout, org_transaction->tra_lock_timeout,
org_transaction); org_transaction);
@ -3977,12 +3999,6 @@ const StmtNode* InAutonomousTransactionNode::execute(thread_db* tdbb, jrd_req* r
const Savepoint* const savepoint = transaction->startSavepoint(); const Savepoint* const savepoint = transaction->startSavepoint();
impure->savNumber = savepoint->getNumber(); impure->savNumber = savepoint->getNumber();
if ((transaction->tra_flags & TRA_read_committed) &&
(transaction->tra_flags & TRA_read_consistency))
{
TRA_setup_request_snapshot(tdbb, request, true);
}
return action; return action;
} }
@ -3994,16 +4010,6 @@ const StmtNode* InAutonomousTransactionNode::execute(thread_db* tdbb, jrd_req* r
fb_assert(transaction->tra_number == impure->traNumber); fb_assert(transaction->tra_number == impure->traNumber);
if (request->req_operation == jrd_req::req_return ||
request->req_operation == jrd_req::req_unwind)
{
if ((transaction->tra_flags & TRA_read_committed) &&
(transaction->tra_flags & TRA_read_consistency))
{
TRA_release_request_snapshot(tdbb, request);
}
}
switch (request->req_operation) switch (request->req_operation)
{ {
case jrd_req::req_return: case jrd_req::req_return:
@ -6452,7 +6458,19 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, jrd_req* request, WhichTrigg
VirtualTable::modify(tdbb, orgRpb, newRpb); VirtualTable::modify(tdbb, orgRpb, newRpb);
else if (!relation->rel_view_rse) else if (!relation->rel_view_rse)
{ {
VIO_modify(tdbb, orgRpb, newRpb, transaction); // VIO_modify returns false if there is an update conflict in Read Consistency
// transaction. Before returning false it disables statement-level snapshot
// (via setting req_update_conflict flag) so re-fetch should see new data.
// Updating new version blindly is generally unsafe, but is ok in this situation
// because all changes made by this request will certainly be undone and request
// will be restarted.
while (!VIO_modify(tdbb, orgRpb, newRpb, transaction))
{
// VIO_refetch_record returns false if record has been deleted by someone else.
// Gently ignore this situation and proceed further.
if (!VIO_refetch_record(tdbb, orgRpb, transaction, true, true))
return parentStmt;
}
IDX_modify(tdbb, orgRpb, newRpb, transaction); IDX_modify(tdbb, orgRpb, newRpb, transaction);
REPL_modify(tdbb, orgRpb, newRpb, transaction); REPL_modify(tdbb, orgRpb, newRpb, transaction);
} }

View File

@ -286,7 +286,10 @@ bool DsqlDmlRequest::fetch(thread_db* tdbb, UCHAR* msgBuffer)
tdbb->checkCancelState(true); tdbb->checkCancelState(true);
UCHAR* dsqlMsgBuffer = req_msg_buffers[message->msg_buffer_number]; UCHAR* dsqlMsgBuffer = req_msg_buffers[message->msg_buffer_number];
JRD_receive(tdbb, req_request, message->msg_number, message->msg_length, dsqlMsgBuffer); if (prefetchedFirstRow)
prefetchedFirstRow = false;
else
JRD_receive(tdbb, req_request, message->msg_number, message->msg_length, dsqlMsgBuffer);
const dsql_par* const eof = statement->getEof(); const dsql_par* const eof = statement->getEof();
const USHORT* eofPtr = eof ? (USHORT*) (dsqlMsgBuffer + (IPTR) eof->par_desc.dsc_address) : NULL; const USHORT* eofPtr = eof ? (USHORT*) (dsqlMsgBuffer + (IPTR) eof->par_desc.dsc_address) : NULL;
@ -677,33 +680,14 @@ void DsqlDmlRequest::dsqlPass(thread_db* tdbb, DsqlCompilerScratch* scratch, boo
*destroyScratchPool = true; *destroyScratchPool = true;
} }
// Execute a dynamic SQL statement. // Execute a dynamic SQL statement
void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle, void DsqlDmlRequest::doExecute(thread_db* tdbb, jrd_tra** traHandle,
Firebird::IMessageMetadata* inMetadata, const UCHAR* inMsg, Firebird::IMessageMetadata* inMetadata, const UCHAR* inMsg,
Firebird::IMessageMetadata* outMetadata, UCHAR* outMsg, Firebird::IMessageMetadata* outMetadata, UCHAR* outMsg,
bool singleton) bool singleton)
{ {
if (!req_request) prefetchedFirstRow = false;
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-504) <<
Arg::Gds(isc_unprepared_stmt));
}
// If there is no data required, just start the request
const dsql_msg* message = statement->getSendMsg(); const dsql_msg* message = statement->getSendMsg();
if (message)
mapInOut(tdbb, false, message, inMetadata, NULL, inMsg);
// we need to mapInOut() before tracing of execution start to let trace
// manager know statement parameters values
TraceDSQLExecute trace(req_dbb->dbb_attachment, this);
// Setup and start timeout timer
const bool have_cursor = reqTypeWithCursor(statement->getType()) && !singleton;
setupTimer(tdbb);
thread_db::TimerGuard timerGuard(tdbb, req_timer, !have_cursor);
if (!message) if (!message)
JRD_start(tdbb, req_request, req_transaction); JRD_start(tdbb, req_request, req_transaction);
@ -805,6 +789,17 @@ void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle,
status_exception::raise(&localStatus); status_exception::raise(&localStatus);
} }
} }
else
{
// Prefetch first row of a query
if (reqTypeWithCursor(statement->getType())) {
dsql_msg* message = (dsql_msg*) statement->getReceiveMsg();
UCHAR* dsqlMsgBuffer = req_msg_buffers[message->msg_buffer_number];
JRD_receive(tdbb, req_request, message->msg_number, message->msg_length, dsqlMsgBuffer);
prefetchedFirstRow = true;
}
}
switch (statement->getType()) switch (statement->getType())
{ {
@ -826,6 +821,62 @@ void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle,
} }
break; break;
} }
}
// Execute a dynamic SQL statement with tracing, restart and timeout handler
void DsqlDmlRequest::execute(thread_db* tdbb, jrd_tra** traHandle,
Firebird::IMessageMetadata* inMetadata, const UCHAR* inMsg,
Firebird::IMessageMetadata* outMetadata, UCHAR* outMsg,
bool singleton)
{
if (!req_request)
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-504) <<
Arg::Gds(isc_unprepared_stmt));
}
// If there is no data required, just start the request
const dsql_msg* message = statement->getSendMsg();
if (message)
mapInOut(tdbb, false, message, inMetadata, NULL, inMsg);
// we need to mapInOut() before tracing of execution start to let trace
// manager know statement parameters values
TraceDSQLExecute trace(req_dbb->dbb_attachment, this);
// Setup and start timeout timer
const bool have_cursor = reqTypeWithCursor(statement->getType()) && !singleton;
setupTimer(tdbb);
thread_db::TimerGuard timerGuard(tdbb, req_timer, !have_cursor);
if (req_transaction && (req_transaction->tra_flags & TRA_read_consistency) &&
statement->getType() != DsqlCompiledStatement::TYPE_SAVEPOINT)
{
AutoSavePoint savePoint(tdbb, req_transaction);
int numTries = 0;
while (true)
{
doExecute(tdbb, traHandle, inMetadata, inMsg, outMetadata, outMsg, singleton);
if (!(req_request->req_flags & req_update_conflict))
break;
req_request->req_flags &= ~req_update_conflict;
if (numTries >= 10) {
gds__log("Update conflict: unable to get a stable set of rows in the source tables");
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-913) <<
Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(req_request->req_conflict_txn));
}
req_transaction->rollbackSavepoint(tdbb, true);
req_transaction->startSavepoint(tdbb);
numTries++;
}
savePoint.release(); // everything is ok
} else {
doExecute(tdbb, traHandle, inMetadata, inMsg, outMetadata, outMsg, singleton);
}
trace.finish(have_cursor, ITracePlugin::RESULT_SUCCESS); trace.finish(have_cursor, ITracePlugin::RESULT_SUCCESS);
} }

View File

@ -645,7 +645,8 @@ public:
explicit DsqlDmlRequest(MemoryPool& pool, StmtNode* aNode) explicit DsqlDmlRequest(MemoryPool& pool, StmtNode* aNode)
: dsql_req(pool), : dsql_req(pool),
node(aNode), node(aNode),
needDelayedFormat(false) needDelayedFormat(false),
prefetchedFirstRow(false)
{ {
} }
@ -664,9 +665,14 @@ public:
virtual void setDelayedFormat(thread_db* tdbb, Firebird::IMessageMetadata* metadata); virtual void setDelayedFormat(thread_db* tdbb, Firebird::IMessageMetadata* metadata);
private: private:
void doExecute(thread_db* tdbb, jrd_tra** traHandle,
Firebird::IMessageMetadata* inMetadata, const UCHAR* inMsg,
Firebird::IMessageMetadata* outMetadata, UCHAR* outMsg,
bool singleton);
NestConst<StmtNode> node; NestConst<StmtNode> node;
Firebird::RefPtr<Firebird::IMessageMetadata> delayedFormat; Firebird::RefPtr<Firebird::IMessageMetadata> delayedFormat;
bool needDelayedFormat; bool needDelayedFormat;
bool prefetchedFirstRow;
}; };
class DsqlDdlRequest : public dsql_req class DsqlDdlRequest : public dsql_req

View File

@ -234,7 +234,7 @@ void VerbAction::mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* next
release(transaction); release(transaction);
} }
void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction) void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks)
{ {
// Undo changes recorded for this verb action. // Undo changes recorded for this verb action.
// After that, clear the verb action and prepare it for later reuse. // After that, clear the verb action and prepare it for later reuse.
@ -258,7 +258,7 @@ void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction)
if (!DPM_get(tdbb, &rpb, LCK_read)) if (!DPM_get(tdbb, &rpb, LCK_read))
BUGCHECK(186); // msg 186 record disappeared BUGCHECK(186); // msg 186 record disappeared
if (have_undo && !(rpb.rpb_flags & rpb_deleted)) if ((have_undo || preserveLocks) && !(rpb.rpb_flags & rpb_deleted))
VIO_data(tdbb, &rpb, transaction->tra_pool); VIO_data(tdbb, &rpb, transaction->tra_pool);
else else
CCH_RELEASE(tdbb, &rpb.getWindow(tdbb)); CCH_RELEASE(tdbb, &rpb.getWindow(tdbb));
@ -267,7 +267,45 @@ void VerbAction::undo(thread_db* tdbb, jrd_tra* transaction)
BUGCHECK(185); // msg 185 wrong record version BUGCHECK(185); // msg 185 wrong record version
if (!have_undo) if (!have_undo)
VIO_backout(tdbb, &rpb, transaction); {
if (preserveLocks && rpb.rpb_b_page) {
// Fetch previous record version and update in place current version with it
record_param temp = rpb;
temp.rpb_page = rpb.rpb_b_page;
temp.rpb_line = rpb.rpb_b_line;
temp.rpb_record = NULL;
if (temp.rpb_flags & rpb_delta)
fb_assert(temp.rpb_prior != NULL);
else
fb_assert(temp.rpb_prior == NULL);
if (!DPM_fetch(tdbb, &temp, LCK_read))
BUGCHECK(291); // msg 291 cannot find record back version
if (!(temp.rpb_flags & rpb_chained) || (temp.rpb_flags & (rpb_blob | rpb_fragment)))
ERR_bugcheck_msg("invalid back version");
VIO_data(tdbb, &temp, tdbb->getDefaultPool());
Record* const save_record = rpb.rpb_record;
if (rpb.rpb_flags & rpb_deleted)
rpb.rpb_record = NULL;
Record* const dead_record = rpb.rpb_record;
VIO_update_in_place(tdbb, transaction, &rpb, &temp);
if (dead_record)
{
rpb.rpb_record = NULL; // VIO_garbage_collect_idx will play with this record dirty tricks
VIO_garbage_collect_idx(tdbb, transaction, &rpb, dead_record);
}
rpb.rpb_record = save_record;
delete temp.rpb_record;
} else
VIO_backout(tdbb, &rpb, transaction);
}
else else
{ {
AutoUndoRecord record(vct_undo->current().setupRecord(transaction)); AutoUndoRecord record(vct_undo->current().setupRecord(transaction));
@ -378,7 +416,7 @@ void Savepoint::cleanupTempData()
} }
} }
Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior) Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior, bool preserveLocks)
{ {
// Undo changes made in this savepoint. // Undo changes made in this savepoint.
// Perform index and BLOB cleanup if needed. // Perform index and BLOB cleanup if needed.
@ -399,7 +437,7 @@ Savepoint* Savepoint::rollback(thread_db* tdbb, Savepoint* prior)
{ {
VerbAction* const action = m_actions; VerbAction* const action = m_actions;
action->undo(tdbb, m_transaction); action->undo(tdbb, m_transaction, preserveLocks);
m_actions = action->vct_next; m_actions = action->vct_next;
action->vct_next = m_freeActions; action->vct_next = m_freeActions;

View File

@ -94,7 +94,7 @@ namespace Jrd
UndoItemTree* vct_undo; // Data for undo records UndoItemTree* vct_undo; // Data for undo records
void mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* nextAction); void mergeTo(thread_db* tdbb, jrd_tra* transaction, VerbAction* nextAction);
void undo(thread_db* tdbb, jrd_tra* transaction); void undo(thread_db* tdbb, jrd_tra* transaction, bool preserveLocks);
void garbageCollectIdxLite(thread_db* tdbb, jrd_tra* transaction, SINT64 recordNumber, void garbageCollectIdxLite(thread_db* tdbb, jrd_tra* transaction, SINT64 recordNumber,
VerbAction* nextAction, Record* goingRecord); VerbAction* nextAction, Record* goingRecord);
@ -231,7 +231,7 @@ namespace Jrd
void cleanupTempData(); void cleanupTempData();
Savepoint* rollback(thread_db* tdbb, Savepoint* prior = NULL); Savepoint* rollback(thread_db* tdbb, Savepoint* prior = NULL, bool preserveLocks = false);
Savepoint* rollforward(thread_db* tdbb, Savepoint* prior = NULL); Savepoint* rollforward(thread_db* tdbb, Savepoint* prior = NULL);
static void destroy(Savepoint*& savepoint) static void destroy(Savepoint*& savepoint)

View File

@ -8880,39 +8880,8 @@ void JRD_start(Jrd::thread_db* tdbb, jrd_req* request, jrd_tra* transaction)
* Get a record from the host program. * Get a record from the host program.
* *
**************************************/ **************************************/
EXE_unwind(tdbb, request);
// Repeat execution to handle update conflicts, if any EXE_start(tdbb, request, transaction);
int numTries = 0;
while (true)
{
try
{
EXE_unwind(tdbb, request);
EXE_start(tdbb, request, transaction);
break;
}
catch (const status_exception &ex)
{
const ISC_STATUS* v = ex.value();
if (// Update conflict error
v[0] == isc_arg_gds &&
v[1] == isc_update_conflict &&
// Read committed transaction with snapshots
(transaction->tra_flags & TRA_read_committed) &&
(transaction->tra_flags & TRA_read_consistency) &&
// Snapshot has been assigned to the request -
// it was top-level request
!TRA_get_prior_request(tdbb))
{
if (++numTries < 10)
{
fb_utils::init_status(tdbb->tdbb_status_vector);
continue;
}
}
throw;
}
}
check_autocommit(tdbb, request); check_autocommit(tdbb, request);
@ -9001,40 +8970,9 @@ void JRD_start_and_send(thread_db* tdbb, jrd_req* request, jrd_tra* transaction,
* Get a record from the host program. * Get a record from the host program.
* *
**************************************/ **************************************/
EXE_unwind(tdbb, request);
// Repeat execution to handle update conflicts, if any EXE_start(tdbb, request, transaction);
int numTries = 0; EXE_send(tdbb, request, msg_type, msg_length, msg);
while (true)
{
try
{
EXE_unwind(tdbb, request);
EXE_start(tdbb, request, transaction);
EXE_send(tdbb, request, msg_type, msg_length, msg);
break;
}
catch (const status_exception &ex)
{
const ISC_STATUS* v = ex.value();
if (// Update conflict error
v[0] == isc_arg_gds &&
v[1] == isc_update_conflict &&
// Read committed transaction with snapshots
(transaction->tra_flags & TRA_read_committed) &&
(transaction->tra_flags & TRA_read_consistency) &&
// Snapshot has been assigned to the request -
// it was top-level request
!TRA_get_prior_request(tdbb))
{
if (++numTries < 10)
{
fb_utils::init_status(tdbb->tdbb_status_vector);
continue;
}
}
throw;
}
}
check_autocommit(tdbb, request); check_autocommit(tdbb, request);

View File

@ -275,6 +275,7 @@ public:
SINT64 req_fetch_rowcount; // Total number of rows returned by this request SINT64 req_fetch_rowcount; // Total number of rows returned by this request
jrd_req* req_proc_caller; // Procedure's caller request jrd_req* req_proc_caller; // Procedure's caller request
const ValueListNode* req_proc_inputs; // and its node with input parameters const ValueListNode* req_proc_inputs; // and its node with input parameters
TraNumber req_conflict_txn; // Transaction number for update conflict in read consistency mode
ULONG req_src_line; ULONG req_src_line;
ULONG req_src_column; ULONG req_src_column;
@ -397,6 +398,7 @@ const ULONG req_continue_loop = 0x100L; // PSQL continue statement
const ULONG req_proc_fetch = 0x200L; // Fetch from procedure in progress const ULONG req_proc_fetch = 0x200L; // Fetch from procedure in progress
const ULONG req_same_tx_upd = 0x400L; // record was updated by same transaction const ULONG req_same_tx_upd = 0x400L; // record was updated by same transaction
const ULONG req_reserved = 0x800L; // Request reserved for client const ULONG req_reserved = 0x800L; // Request reserved for client
const ULONG req_update_conflict = 0x1000L; // We need to restart request due to update conflict
// Index lock block // Index lock block

View File

@ -145,7 +145,7 @@ jrd_req* TRA_get_prior_request(thread_db* tdbb)
return org_request; return org_request;
} }
void TRA_setup_request_snapshot(Jrd::thread_db* tdbb, Jrd::jrd_req* request, bool no_prior) void TRA_setup_request_snapshot(Jrd::thread_db* tdbb, Jrd::jrd_req* request)
{ {
// This function is called whenever request is started in a transaction. // This function is called whenever request is started in a transaction.
// Setup context to preserve read consistency in READ COMMITTED transactions. // Setup context to preserve read consistency in READ COMMITTED transactions.
@ -160,7 +160,7 @@ void TRA_setup_request_snapshot(Jrd::thread_db* tdbb, Jrd::jrd_req* request, boo
return; return;
// See if there is any request right above us in the call stack // See if there is any request right above us in the call stack
jrd_req* org_request = no_prior ? nullptr : TRA_get_prior_request(tdbb); jrd_req* org_request = TRA_get_prior_request(tdbb);
if (org_request && org_request->req_transaction == transaction) if (org_request && org_request->req_transaction == transaction)
{ {
@ -1616,8 +1616,11 @@ int TRA_snapshot_state(thread_db* tdbb, const jrd_tra* trans, TraNumber number,
// GC thread accesses data directly without any request // GC thread accesses data directly without any request
if (jrd_req* current_request = tdbb->getRequest()) if (jrd_req* current_request = tdbb->getRequest())
{ {
// There is no request snapshot when we build expression index // Notes:
if (jrd_req* snapshot_request = current_request->req_snapshot.m_owner) // 1) There is no request snapshot when we build expression index
// 2) Disable read committed snapshot after we encountered update conflict
jrd_req* snapshot_request = current_request->req_snapshot.m_owner;
if (snapshot_request && !(snapshot_request->req_flags & req_update_conflict))
{ {
if (stateCn > snapshot_request->req_snapshot.m_number) if (stateCn > snapshot_request->req_snapshot.m_number)
return tra_active; return tra_active;
@ -3847,7 +3850,7 @@ Savepoint* jrd_tra::startSavepoint(bool root)
return savepoint; return savepoint;
} }
void jrd_tra::rollbackSavepoint(thread_db* tdbb) void jrd_tra::rollbackSavepoint(thread_db* tdbb, bool preserveLocks)
/************************************** /**************************************
* *
* r o l l b a c k S a v e p o i n t * r o l l b a c k S a v e p o i n t
@ -3864,7 +3867,7 @@ void jrd_tra::rollbackSavepoint(thread_db* tdbb)
REPL_save_cleanup(tdbb, this, tra_save_point, true); REPL_save_cleanup(tdbb, this, tra_save_point, true);
Jrd::ContextPoolHolder context(tdbb, tra_pool); Jrd::ContextPoolHolder context(tdbb, tra_pool);
tra_save_point = tra_save_point->rollback(tdbb); tra_save_point = tra_save_point->rollback(tdbb, NULL, preserveLocks);
} }
} }

View File

@ -386,7 +386,7 @@ public:
Record* findNextUndo(VerbAction* before_this, jrd_rel* relation, SINT64 number); Record* findNextUndo(VerbAction* before_this, jrd_rel* relation, SINT64 number);
void listStayingUndo(jrd_rel* relation, SINT64 number, RecordStack &staying); void listStayingUndo(jrd_rel* relation, SINT64 number, RecordStack &staying);
Savepoint* startSavepoint(bool root = false); Savepoint* startSavepoint(bool root = false);
void rollbackSavepoint(thread_db* tdbb); void rollbackSavepoint(thread_db* tdbb, bool preserveLocks = false);
void rollbackToSavepoint(thread_db* tdbb, SavNumber number); void rollbackToSavepoint(thread_db* tdbb, SavNumber number);
void rollforwardSavepoint(thread_db* tdbb); void rollforwardSavepoint(thread_db* tdbb);
DbCreatorsList* getDbCreatorsList(); DbCreatorsList* getDbCreatorsList();

View File

@ -63,7 +63,7 @@ void TRA_update_counters(Jrd::thread_db*, Jrd::Database*);
int TRA_wait(Jrd::thread_db* tdbb, Jrd::jrd_tra* trans, TraNumber number, Jrd::jrd_tra::wait_t wait); int TRA_wait(Jrd::thread_db* tdbb, Jrd::jrd_tra* trans, TraNumber number, Jrd::jrd_tra::wait_t wait);
void TRA_attach_request(Jrd::jrd_tra* transaction, Jrd::jrd_req* request); void TRA_attach_request(Jrd::jrd_tra* transaction, Jrd::jrd_req* request);
void TRA_detach_request(Jrd::jrd_req* request); void TRA_detach_request(Jrd::jrd_req* request);
void TRA_setup_request_snapshot(Jrd::thread_db*, Jrd::jrd_req* request, bool no_prior = false); void TRA_setup_request_snapshot(Jrd::thread_db*, Jrd::jrd_req* request);
void TRA_release_request_snapshot(Jrd::thread_db*, Jrd::jrd_req* request); void TRA_release_request_snapshot(Jrd::thread_db*, Jrd::jrd_req* request);
Jrd::jrd_req* TRA_get_prior_request(Jrd::thread_db*); Jrd::jrd_req* TRA_get_prior_request(Jrd::thread_db*);

View File

@ -1441,7 +1441,7 @@ void VIO_data(thread_db* tdbb, record_param* rpb, MemoryPool* pool)
} }
void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction) bool VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
{ {
/************************************** /**************************************
* *
@ -1497,7 +1497,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
// hvlad: what if record was created\modified by user tx also, // hvlad: what if record was created\modified by user tx also,
// i.e. if there is backversion ??? // i.e. if there is backversion ???
VIO_backout(tdbb, rpb, transaction); VIO_backout(tdbb, rpb, transaction);
return; return true;
} }
transaction->tra_flags |= TRA_write; transaction->tra_flags |= TRA_write;
@ -1891,7 +1891,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
if (transaction->tra_save_point && transaction->tra_save_point->isChanging()) if (transaction->tra_save_point && transaction->tra_save_point->isChanging())
verb_post(tdbb, transaction, rpb, rpb->rpb_undo); verb_post(tdbb, transaction, rpb, rpb->rpb_undo);
return; return true;
} }
const bool backVersion = (rpb->rpb_b_page != 0); const bool backVersion = (rpb->rpb_b_page != 0);
@ -1910,12 +1910,20 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
{ {
// Update stub didn't find one page -- do a long, hard update // Update stub didn't find one page -- do a long, hard update
PageStack stack; PageStack stack;
if (prepare_update(tdbb, transaction, tid_fetch, rpb, &temp, 0, stack, false)) int prepare_result = prepare_update(tdbb, transaction, tid_fetch, rpb, &temp, 0, stack, false);
if (prepare_result &&
(!(transaction->tra_flags & TRA_read_consistency) || prepare_result == PREPARE_LOCKERR))
{ {
ERR_post(Arg::Gds(isc_deadlock) << ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) << Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(rpb->rpb_transaction_nr)); Arg::Gds(isc_concurrent_transaction) << Arg::Num(rpb->rpb_transaction_nr));
} }
if (prepare_result) {
jrd_req* top_request = request->req_snapshot.m_owner;
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = rpb->rpb_transaction_nr;
return false;
}
// Old record was restored and re-fetched for write. Now replace it. // Old record was restored and re-fetched for write. Now replace it.
@ -1974,6 +1982,7 @@ void VIO_erase(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
{ {
notify_garbage_collector(tdbb, rpb, transaction->tra_number); notify_garbage_collector(tdbb, rpb, transaction->tra_number);
} }
return true;
} }
@ -2769,7 +2778,7 @@ void VIO_init(thread_db* tdbb)
} }
} }
void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, jrd_tra* transaction) bool VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, jrd_tra* transaction)
{ {
/************************************** /**************************************
* *
@ -2835,7 +2844,7 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j
{ {
VIO_update_in_place(tdbb, transaction, org_rpb, new_rpb); VIO_update_in_place(tdbb, transaction, org_rpb, new_rpb);
tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id); tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id);
return; return true;
} }
check_gbak_cheating_insupd(tdbb, relation, "UPDATE"); check_gbak_cheating_insupd(tdbb, relation, "UPDATE");
@ -3198,19 +3207,27 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j
} }
tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id); tdbb->bumpRelStats(RuntimeStatistics::RECORD_UPDATES, relation->rel_id);
return; return true;
} }
const bool backVersion = (org_rpb->rpb_b_page != 0); const bool backVersion = (org_rpb->rpb_b_page != 0);
record_param temp; record_param temp;
PageStack stack; PageStack stack;
if (prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb, &temp, new_rpb, int prepare_result = prepare_update(tdbb, transaction, org_rpb->rpb_transaction_nr, org_rpb,
stack, false)) &temp, new_rpb, stack, false);
if (prepare_result &&
(!(transaction->tra_flags & TRA_read_consistency) || prepare_result == PREPARE_LOCKERR))
{ {
ERR_post(Arg::Gds(isc_deadlock) << ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) << Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(org_rpb->rpb_transaction_nr)); Arg::Gds(isc_concurrent_transaction) << Arg::Num(org_rpb->rpb_transaction_nr));
} }
if (prepare_result) {
jrd_req* top_request = tdbb->getRequest()->req_snapshot.m_owner;
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = org_rpb->rpb_transaction_nr;
return false;
}
IDX_modify_flag_uk_modified(tdbb, org_rpb, new_rpb, transaction); IDX_modify_flag_uk_modified(tdbb, org_rpb, new_rpb, transaction);
@ -3259,6 +3276,7 @@ void VIO_modify(thread_db* tdbb, record_param* org_rpb, record_param* new_rpb, j
{ {
notify_garbage_collector(tdbb, org_rpb, transaction->tra_number); notify_garbage_collector(tdbb, org_rpb, transaction->tra_number);
} }
return true;
} }
@ -4062,6 +4080,11 @@ bool VIO_writelock(thread_db* tdbb, record_param* org_rpb, jrd_tra* transaction)
{ {
case PREPARE_CONFLICT: case PREPARE_CONFLICT:
case PREPARE_DELETE: case PREPARE_DELETE:
if ((transaction->tra_flags & TRA_read_consistency)) {
jrd_req* top_request = tdbb->getRequest()->req_snapshot.m_owner;
top_request->req_flags |= req_update_conflict;
top_request->req_conflict_txn = org_rpb->rpb_transaction_nr;
}
org_rpb->rpb_runtime_flags |= RPB_refetch; org_rpb->rpb_runtime_flags |= RPB_refetch;
return false; return false;
case PREPARE_LOCKERR: case PREPARE_LOCKERR:
@ -5662,7 +5685,7 @@ static int prepare_update( thread_db* tdbb,
delete_record(tdbb, temp, 0, NULL); delete_record(tdbb, temp, 0, NULL);
if (writelock) if (writelock || (transaction->tra_flags & TRA_read_consistency))
{ {
tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id); tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id);
return PREPARE_DELETE; return PREPARE_DELETE;
@ -5785,15 +5808,16 @@ static int prepare_update( thread_db* tdbb,
switch (state) switch (state)
{ {
case tra_committed: case tra_committed:
// We need to loop waiting in read committed with no read consistency transactions only // For SNAPSHOT mode transactions raise error early
if (!(transaction->tra_flags & TRA_read_committed) || if (!(transaction->tra_flags & TRA_read_committed))
(transaction->tra_flags & TRA_read_consistency))
{ {
tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id); tdbb->bumpRelStats(RuntimeStatistics::RECORD_CONFLICTS, relation->rel_id);
ERR_post(Arg::Gds(isc_update_conflict) << ERR_post(Arg::Gds(isc_deadlock) <<
Arg::Gds(isc_update_conflict) <<
Arg::Gds(isc_concurrent_transaction) << Arg::Num(update_conflict_trans)); Arg::Gds(isc_concurrent_transaction) << Arg::Num(update_conflict_trans));
} }
return PREPARE_CONFLICT;
case tra_limbo: case tra_limbo:
if (!(transaction->tra_flags & TRA_ignore_limbo)) if (!(transaction->tra_flags & TRA_ignore_limbo))

View File

@ -42,7 +42,7 @@ bool VIO_chase_record_version(Jrd::thread_db*, Jrd::record_param*,
Jrd::jrd_tra*, MemoryPool*, bool, bool); Jrd::jrd_tra*, MemoryPool*, bool, bool);
void VIO_copy_record(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*); void VIO_copy_record(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*);
void VIO_data(Jrd::thread_db*, Jrd::record_param*, MemoryPool*); void VIO_data(Jrd::thread_db*, Jrd::record_param*, MemoryPool*);
void VIO_erase(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*); bool VIO_erase(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*);
void VIO_fini(Jrd::thread_db*); void VIO_fini(Jrd::thread_db*);
bool VIO_garbage_collect(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*); bool VIO_garbage_collect(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*);
Jrd::Record* VIO_gc_record(Jrd::thread_db*, Jrd::jrd_rel*); Jrd::Record* VIO_gc_record(Jrd::thread_db*, Jrd::jrd_rel*);
@ -51,7 +51,7 @@ bool VIO_get_current(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*,
MemoryPool*, bool, bool&); MemoryPool*, bool, bool&);
void VIO_init(Jrd::thread_db*); void VIO_init(Jrd::thread_db*);
bool VIO_writelock(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*); bool VIO_writelock(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*);
void VIO_modify(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*, Jrd::jrd_tra*); bool VIO_modify(Jrd::thread_db*, Jrd::record_param*, Jrd::record_param*, Jrd::jrd_tra*);
bool VIO_next_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, MemoryPool*, bool); bool VIO_next_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, MemoryPool*, bool);
Jrd::Record* VIO_record(Jrd::thread_db*, Jrd::record_param*, const Jrd::Format*, MemoryPool*); Jrd::Record* VIO_record(Jrd::thread_db*, Jrd::record_param*, const Jrd::Format*, MemoryPool*);
bool VIO_refetch_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, bool, bool); bool VIO_refetch_record(Jrd::thread_db*, Jrd::record_param*, Jrd::jrd_tra*, bool, bool);