8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 08:03:04 +01:00

Make locks unconditionally granted in the fair order. The legacy behavior is unsafe in the highly loaded systems.

This commit is contained in:
dimitr 2011-12-23 17:50:44 +00:00
parent 953d84de0e
commit e719c87753
5 changed files with 37 additions and 84 deletions

View File

@ -122,7 +122,6 @@ const Config::ConfigEntry Config::entries[MAX_CONFIG_KEY] =
{TYPE_INTEGER, "ConnectionTimeout", (ConfigValue) 180}, // seconds
{TYPE_INTEGER, "DummyPacketInterval", (ConfigValue) 0}, // seconds
{TYPE_INTEGER, "LockMemSize", (ConfigValue) 1048576}, // bytes
{TYPE_BOOLEAN, "LockGrantOrder", (ConfigValue) true},
{TYPE_INTEGER, "LockHashSlots", (ConfigValue) 1009}, // slots
{TYPE_INTEGER, "LockAcquireSpins", (ConfigValue) 0},
{TYPE_INTEGER, "EventMemSize", (ConfigValue) 65536}, // bytes
@ -441,11 +440,6 @@ int Config::getLockMemSize() const
return get<int>(KEY_LOCK_MEM_SIZE);
}
bool Config::getLockGrantOrder() const
{
return get<bool>(KEY_LOCK_GRANT_ORDER);
}
int Config::getLockHashSlots() const
{
return get<int>(KEY_LOCK_HASH_SLOTS);

View File

@ -95,7 +95,6 @@ public:
KEY_CONNECTION_TIMEOUT,
KEY_DUMMY_PACKET_INTERVAL,
KEY_LOCK_MEM_SIZE,
KEY_LOCK_GRANT_ORDER,
KEY_LOCK_HASH_SLOTS,
KEY_LOCK_ACQUIRE_SPINS,
KEY_EVENT_MEM_SIZE,
@ -241,9 +240,6 @@ public:
// Lock manager memory size
int getLockMemSize() const;
// Lock manager grant order
bool getLockGrantOrder() const;
// Lock manager hash slots
int getLockHashSlots() const;

View File

@ -1985,7 +1985,7 @@ lrq* LockManager::deadlock_walk(lrq* request, bool* maybe_deadlock)
{
lrq* block = (lrq*) ((UCHAR*) lock_srq - OFFSET(lrq*, lrq_lbl_requests));
if (!lockOrdering() || conversion)
if (conversion)
{
// Don't pursue our own lock-request again
@ -2012,7 +2012,9 @@ lrq* LockManager::deadlock_walk(lrq* request, bool* maybe_deadlock)
// Since lock ordering is in effect, granted locks and waiting
// requests that arrived before our request could block us
if (compatibility[request->lrq_requested][MAX(block->lrq_state, block->lrq_requested)])
const UCHAR max_state = MAX(block->lrq_state, block->lrq_requested);
if (compatibility[request->lrq_requested][max_state])
{
continue;
}
@ -2268,7 +2270,7 @@ bool LockManager::grant_or_que(Attachment* attachment, lrq* request, lbl* lock,
if (compatibility[request->lrq_requested][lock->lbl_state])
{
if (!lockOrdering() || request->lrq_requested == LCK_null || lock->lbl_pending_lrq_count == 0)
if (request->lrq_requested == LCK_null || lock->lbl_pending_lrq_count == 0)
{
grant(request, lock);
post_pending(lock);
@ -2414,9 +2416,6 @@ bool LockManager::initialize(bool initializeMemory)
// Set lock_ordering flag for the first time
if (m_config->getLockGrantOrder())
sh_mem_header->lhb_flags |= LHB_lock_ordering;
const ULONG length = sizeof(lhb) + (sh_mem_header->lhb_hash_slots * sizeof(sh_mem_header->lhb_hash[0]));
sh_mem_header->lhb_length = sh_mem_length_mapped;
sh_mem_header->lhb_used = FB_ALIGN(length, FB_ALIGNMENT);
@ -2862,11 +2861,8 @@ void LockManager::post_pending(lbl* lock)
++lock->lbl_counts[request->lrq_state];
own* owner = (own*) SRQ_ABS_PTR(request->lrq_owner);
post_wakeup(owner);
if (lockOrdering())
{
CHECK(lock->lbl_pending_lrq_count >= pending_counter);
break;
}
CHECK(lock->lbl_pending_lrq_count >= pending_counter);
break;
}
}
else if (compatibility[request->lrq_requested][lock->lbl_state])
@ -2878,11 +2874,8 @@ void LockManager::post_pending(lbl* lock)
#endif
own* owner = (own*) SRQ_ABS_PTR(request->lrq_owner);
post_wakeup(owner);
if (lockOrdering())
{
CHECK(lock->lbl_pending_lrq_count >= pending_counter);
break;
}
CHECK(lock->lbl_pending_lrq_count >= pending_counter);
break;
}
}
@ -3560,33 +3553,22 @@ void LockManager::validate_lock(const SRQ_PTR lock_ptr, USHORT freed, const SRQ_
// Request must be for this lock
CHECK(request->lrq_lock == lock_ptr);
// If the request is pending, then it must be incompatible with current
// state of the lock - OR lock_ordering is enabled and there is at
// least one pending request in the queue (before this request
// but not including it).
if (request->lrq_flags & LRQ_pending)
{
CHECK(!compatibility[request->lrq_requested][lock->lbl_state] ||
(lockOrdering() && found_pending));
// If the request is pending, then it must be incompatible with current
// state of the lock - OR lock_ordering is enabled and there is at
// least one pending request in the queue (before this request
// but not including it).
// The above condition would probably be more clear if we
// wrote it as the following:
//
// CHECK (!compatibility[request->lrq_requested][lock->lbl_state] ||
// (lockOrdering() && found_pending &&
// compatibility[request->lrq_requested][lock->lbl_state]));
//
// but that would be redundant
CHECK(!compatibility[request->lrq_requested][lock->lbl_state] || found_pending);
found_pending++;
}
// If the request is NOT pending, then it must be rejected or
// compatible with the current state of the lock
if (!(request->lrq_flags & LRQ_pending))
else
{
// If the request is NOT pending, then it must be rejected or
// compatible with the current state of the lock
CHECK((request->lrq_flags & LRQ_rejected) ||
(request->lrq_requested == lock->lbl_state) ||
compatibility[request->lrq_requested][lock->lbl_state]);
@ -3929,16 +3911,12 @@ void LockManager::wait_for_request(Attachment* attachment, lrq* request, SSHORT
lbl* lock = (lbl*) SRQ_ABS_PTR(lock_offset);
lock->lbl_pending_lrq_count++;
if (lockOrdering())
if (!request->lrq_state)
{
if (!request->lrq_state)
{
// If ordering is in effect, and this is a conversion of
// an existing lock in LCK_none state - put the lock to the
// end of the list so it's not taking cuts in the lineup
remove_que(&request->lrq_lbl_requests);
insert_tail(&lock->lbl_requests, &request->lrq_lbl_requests);
}
// If this is a conversion of an existing lock in LCK_none state -
// put the lock to the end of the list so it's not taking cuts in the lineup
remove_que(&request->lrq_lbl_requests);
insert_tail(&lock->lbl_requests, &request->lrq_lbl_requests);
}
if (lck_wait <= 0)

View File

@ -126,7 +126,6 @@ struct lhb : public Jrd::MemoryHeader
ULONG lhb_length; // Size of lock table
ULONG lhb_used; // Bytes of lock table in use
USHORT lhb_hash_slots; // Number of hash slots allocated
USHORT lhb_flags; // Miscellaneous info
struct mtx lhb_mutex; // Mutex controlling access
SRQ_PTR lhb_history;
ULONG lhb_scan_interval; // Deadlock scan interval (secs)
@ -154,9 +153,6 @@ struct lhb : public Jrd::MemoryHeader
srq lhb_hash[1]; // Hash table
};
// lhb_flags
const USHORT LHB_lock_ordering = 1; // Lock ordering is enabled
// Secondary header block -- exists only in V3.3 and later lock managers.
// It is pointed to by the word in the lhb that used to contain a pattern.
@ -341,11 +337,6 @@ private:
explicit LockManager(const Firebird::string&, Firebird::RefPtr<Config>);
~LockManager();
bool lockOrdering() const
{
return (sh_mem_header->lhb_flags & LHB_lock_ordering) ? true : false;
}
void acquire_shmem(SRQ_PTR);
UCHAR* alloc(USHORT, Firebird::Arg::StatusVector*);
lbl* alloc_lock(USHORT, Firebird::Arg::StatusVector&);

View File

@ -698,9 +698,6 @@ int CLIB_ROUTINE main( int argc, char *argv[])
LOCK_header->mhb_version, (const TEXT*)HtmlLink(preOwn, LOCK_header->lhb_active_owner),
LOCK_header->lhb_length, LOCK_header->lhb_used);
FPRINTF(outfile, "\tFlags: 0x%04X\n",
LOCK_header->lhb_flags);
FPRINTF(outfile,
"\tEnqs: %6"UQUADFORMAT", Converts: %6"UQUADFORMAT
", Rejects: %6"UQUADFORMAT", Blocks: %6"UQUADFORMAT"\n",
@ -771,10 +768,6 @@ int CLIB_ROUTINE main( int argc, char *argv[])
prt_que(outfile, LOCK_header, "\tFree requests",
&LOCK_header->lhb_free_requests, OFFSET(lrq*, lrq_lbl_requests));
// Print lock ordering option
FPRINTF(outfile, "\tLock Ordering: %s\n",
(LOCK_header->lhb_flags & LHB_lock_ordering) ? "Enabled" : "Disabled");
FPRINTF(outfile, "\n");
// Print known owners
@ -1321,19 +1314,7 @@ static void prt_owner_wait_cycle(OUTFILE outfile,
const lrq* lock_request = (lrq*) ((UCHAR *) que_inst - OFFSET(lrq*, lrq_lbl_requests));
fb_assert(lock_request->lrq_type == type_lrq);
if (LOCK_header->lhb_flags & LHB_lock_ordering && !owner_conversion)
{
// Requests AFTER our request can't block us
if (owner_request == lock_request)
break;
if (compatibility[owner_request->lrq_requested]
[MAX(lock_request->lrq_state, lock_request->lrq_requested)])
{
continue;
}
}
else
if (owner_conversion)
{
// Requests AFTER our request CAN block us
if (lock_request == owner_request)
@ -1342,6 +1323,19 @@ static void prt_owner_wait_cycle(OUTFILE outfile,
if (compatibility[owner_request->lrq_requested][lock_request->lrq_state])
continue;
}
else
{
// Requests AFTER our request can't block us
if (owner_request == lock_request)
break;
const UCHAR max_state = MAX(lock_request->lrq_state, lock_request->lrq_requested);
if (compatibility[owner_request->lrq_requested][max_state])
{
continue;
}
}
const own* const lock_owner = (own*) SRQ_ABS_PTR(lock_request->lrq_owner);
prt_owner_wait_cycle(outfile, LOCK_header, lock_owner, indent + 4, waiters);