8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 22:03:03 +01:00

Implement CORE-1819 : More efficient solution for CORE-1300

This commit is contained in:
hvlad 2008-04-02 23:46:59 +00:00
parent bc3d30ad67
commit 259fafa647
5 changed files with 96 additions and 32 deletions

View File

@ -54,6 +54,7 @@
#include "../jrd/gds_proto.h" #include "../jrd/gds_proto.h"
#include "../jrd/intl_proto.h" #include "../jrd/intl_proto.h"
#include "../jrd/jrd_proto.h" #include "../jrd/jrd_proto.h"
#include "../jrd/lck_proto.h"
#include "../jrd/met_proto.h" #include "../jrd/met_proto.h"
#include "../jrd/mov_proto.h" #include "../jrd/mov_proto.h"
#include "../jrd/nav_proto.h" #include "../jrd/nav_proto.h"
@ -209,6 +210,62 @@ static void update_selectivity(index_root_page*, USHORT, const SelectivityList&)
static void checkForLowerKeySkip(bool&, const bool, const IndexNode&, const temporary_key&, static void checkForLowerKeySkip(bool&, const bool, const IndexNode&, const temporary_key&,
const index_desc&, const IndexRetrieval*); const index_desc&, const IndexRetrieval*);
class Jrd::BtrPageGCLock : public Lock
{
// We want to put 8 bytes (PageNumber) in lock key, one long is already
// reserved by Lock::lck_long this is the second long. It is really unused
// as second long used for 8-byte key already present because of alignment.
long unused;
public:
BtrPageGCLock(thread_db *tdbb)
{
Database* dbb = tdbb->getDatabase();
lck_parent = dbb->dbb_lock;
lck_dbb = dbb;
lck_length = PageNumber::getLockLen();
lck_type = LCK_btr_dont_gc;
lck_owner_handle = LCK_get_owner_handle(tdbb, lck_type);
}
~BtrPageGCLock()
{
// assert in debug build
fb_assert(!lck_id);
// lck_id might be set only if exception occurs
if (lck_id) {
LCK_release(JRD_get_thread_data(), this);
}
}
void disablePageGC(thread_db *tdbb, const PageNumber &page)
{
page.getLockStr(lck_key.lck_string);
LCK_lock(tdbb, this, LCK_read, LCK_WAIT);
}
void enablePageGC(thread_db *tdbb)
{
LCK_release(tdbb, this);
}
static bool isPageGCAllowed(thread_db *tdbb, const PageNumber &page)
{
BtrPageGCLock lock(tdbb);
page.getLockStr(lock.lck_key.lck_string);
const bool res = LCK_lock(tdbb, &lock, LCK_write, LCK_NO_WAIT);
if (res) {
LCK_release(tdbb, &lock);
}
return res;
}
};
USHORT BTR_all(thread_db* tdbb, USHORT BTR_all(thread_db* tdbb,
jrd_rel* relation, jrd_rel* relation,
IndexDescAlloc** csb_idx, IndexDescAlloc** csb_idx,
@ -918,8 +975,12 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion)
index_desc* idx = insertion->iib_descriptor; index_desc* idx = insertion->iib_descriptor;
RelationPages* relPages = insertion->iib_relation->getPages(tdbb); RelationPages* relPages = insertion->iib_relation->getPages(tdbb);
WIN window(relPages->rel_pg_space_id, idx->idx_root); WIN window(relPages->rel_pg_space_id, idx->idx_root);
btree_page* bucket = (btree_page*) CCH_FETCH(tdbb, &window, LCK_write, pag_index); btree_page* bucket = (btree_page*) CCH_FETCH(tdbb, &window, LCK_read, pag_index);
if (bucket->btr_level == 0) {
CCH_RELEASE(tdbb, &window);
CCH_FETCH(tdbb, &window, LCK_write, pag_index);
}
CCH_RELEASE(tdbb, root_window); CCH_RELEASE(tdbb, root_window);
temporary_key key; temporary_key key;
@ -927,6 +988,8 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion)
key.key_length = 0; key.key_length = 0;
RecordNumber recordNumber(0); RecordNumber recordNumber(0);
BtrPageGCLock lock(tdbb);
insertion->iib_dont_gc_lock = &lock;
SLONG split_page = add_node(tdbb, &window, insertion, &key, SLONG split_page = add_node(tdbb, &window, insertion, &key,
&recordNumber, NULL, NULL); &recordNumber, NULL, NULL);
if (split_page == NO_SPLIT) { if (split_page == NO_SPLIT) {
@ -947,6 +1010,7 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion)
// in the existing "top" page instead of making a new "top" page. // in the existing "top" page instead of making a new "top" page.
CCH_RELEASE(tdbb, root_window); CCH_RELEASE(tdbb, root_window);
lock.enablePageGC(tdbb);
index_insertion propagate = *insertion; index_insertion propagate = *insertion;
propagate.iib_number.setValue(split_page); propagate.iib_number.setValue(split_page);
@ -963,16 +1027,18 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion)
if (split_page == NO_VALUE_PAGE) { if (split_page == NO_VALUE_PAGE) {
CCH_RELEASE(tdbb, &window); CCH_RELEASE(tdbb, &window);
} }
else {
lock.enablePageGC(tdbb);
}
BUGCHECK(204); // msg 204 index inconsistent BUGCHECK(204); // msg 204 index inconsistent
} }
return; return;
} }
// the original page was marked as not garbage-collectable, but // the original page was marked as not garbage-collectable, but
// since it is the top page it won't be garbage-collected anyway, // since it is the root page it won't be garbage-collected anyway,
// so go ahead and mark it as garbage-collectable now. // so go ahead and mark it as garbage-collectable now.
CCH_MARK(tdbb, &window); lock.enablePageGC(tdbb);
bucket->btr_header.pag_flags &= ~btr_dont_gc;
WIN new_window(relPages->rel_pg_space_id, split_page); WIN new_window(relPages->rel_pg_space_id, split_page);
btree_page* new_bucket = btree_page* new_bucket =
@ -2245,30 +2311,30 @@ static SLONG add_node(thread_db* tdbb,
break; break;
} }
bucket = (btree_page*) CCH_HANDOFF(tdbb, window, bucket->btr_sibling, bucket = (btree_page*) CCH_HANDOFF(tdbb, window, bucket->btr_sibling,
LCK_write, pag_index); LCK_read, pag_index);
} }
CCH_MARK(tdbb, window); BtrPageGCLock lockCurrent(tdbb);
bucket->btr_header.pag_flags |= btr_dont_gc; lockCurrent.disablePageGC(tdbb, window->win_page);
// Fetch the page at the next level down. If the next level is leaf level, // Fetch the page at the next level down. If the next level is leaf level,
// fetch for write since we know we are going to write to the page (most likely). // fetch for write since we know we are going to write to the page (most likely).
const PageNumber index = window->win_page; const PageNumber index = window->win_page;
CCH_HANDOFF(tdbb, window, page, LCK_write, pag_index); CCH_HANDOFF(tdbb, window, page,
(SSHORT) ((bucket->btr_level == 1) ? LCK_write : LCK_read), pag_index);
// now recursively try to insert the node at the next level down // now recursively try to insert the node at the next level down
index_insertion propagate; index_insertion propagate;
BtrPageGCLock lockLower(tdbb);
propagate.iib_dont_gc_lock = insertion->iib_dont_gc_lock;
insertion->iib_dont_gc_lock = &lockLower;
SLONG split = add_node(tdbb, window, insertion, new_key, SLONG split = add_node(tdbb, window, insertion, new_key,
new_record_number, &page, &propagate.iib_sibling); new_record_number, &page, &propagate.iib_sibling);
if (split == NO_SPLIT) if (split == NO_SPLIT)
{ {
window->win_page = index; lockCurrent.enablePageGC(tdbb);
bucket = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index); insertion->iib_dont_gc_lock = propagate.iib_dont_gc_lock;
CCH_MARK(tdbb, window);
bucket->btr_header.pag_flags &= ~btr_dont_gc;
CCH_RELEASE(tdbb, window);
return NO_SPLIT; return NO_SPLIT;
} }
@ -2310,17 +2376,10 @@ static SLONG add_node(thread_db* tdbb,
// the split page on the lower level has been propogated, so we can go back to // the split page on the lower level has been propogated, so we can go back to
// the page it was split from, and mark it as garbage-collectable now // the page it was split from, and mark it as garbage-collectable now
window->win_page = page; lockLower.enablePageGC(tdbb);
bucket = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index); insertion->iib_dont_gc_lock = propagate.iib_dont_gc_lock;
CCH_MARK(tdbb, window);
bucket->btr_header.pag_flags &= ~btr_dont_gc;
CCH_RELEASE(tdbb, window);
window->win_page = index; lockCurrent.enablePageGC(tdbb);
bucket = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index);
CCH_MARK(tdbb, window);
bucket->btr_header.pag_flags &= ~btr_dont_gc;
CCH_RELEASE(tdbb, window);
if (original_page) { if (original_page) {
*original_page = original_page2; *original_page = original_page2;
@ -4763,7 +4822,7 @@ static CONTENTS garbage_collect(thread_db* tdbb, WIN * window, SLONG parent_numb
CONTENTS result = contents_above_threshold; CONTENTS result = contents_above_threshold;
// check to see if the page was marked not to be garbage collected // check to see if the page was marked not to be garbage collected
if (gc_page->btr_header.pag_flags & btr_dont_gc) { if ( !BtrPageGCLock::isPageGCAllowed(tdbb, window->win_page) ) {
CCH_RELEASE(tdbb, window); CCH_RELEASE(tdbb, window);
return contents_above_threshold; return contents_above_threshold;
} }
@ -4892,7 +4951,7 @@ static CONTENTS garbage_collect(thread_db* tdbb, WIN * window, SLONG parent_numb
// below the threshold for garbage collection. // below the threshold for garbage collection.
gc_page = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index); gc_page = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index);
if ((gc_page->btr_length >= GARBAGE_COLLECTION_BELOW_THRESHOLD) if ((gc_page->btr_length >= GARBAGE_COLLECTION_BELOW_THRESHOLD)
|| (gc_page->btr_header.pag_flags & btr_dont_gc)) || !BtrPageGCLock::isPageGCAllowed(tdbb, window->win_page) )
{ {
CCH_RELEASE(tdbb, &parent_window); CCH_RELEASE(tdbb, &parent_window);
CCH_RELEASE(tdbb, &left_window); CCH_RELEASE(tdbb, &left_window);
@ -6062,7 +6121,7 @@ static SLONG insert_node(thread_db* tdbb,
// mark the bucket as non garbage-collectable until we can propagate // mark the bucket as non garbage-collectable until we can propagate
// the split page up to the parent; otherwise its possible that the // the split page up to the parent; otherwise its possible that the
// split page we just created will be lost. // split page we just created will be lost.
bucket->btr_header.pag_flags |= btr_dont_gc; insertion->iib_dont_gc_lock->disablePageGC(tdbb, window->win_page);
if (original_page) { if (original_page) {
*original_page = window->win_page.getPageNum(); *original_page = window->win_page.getPageNum();
@ -6853,4 +6912,3 @@ void update_selectivity(index_root_page* root, USHORT id,
} }
irt_desc->irt_stuff.irt_selectivity = selectivity.back(); irt_desc->irt_stuff.irt_selectivity = selectivity.back();
} }

View File

@ -50,6 +50,7 @@ template <typename T> class vec;
class jrd_req; class jrd_req;
struct temporary_key; struct temporary_key;
class jrd_tra; class jrd_tra;
class BtrPageGCLock;
enum idx_null_state { enum idx_null_state {
idx_nulls_none, idx_nulls_none,
@ -136,6 +137,7 @@ struct index_insertion {
temporary_key* iib_key; /* varying string for insertion */ temporary_key* iib_key; /* varying string for insertion */
RecordBitmap* iib_duplicates; /* spare bit map of duplicates */ RecordBitmap* iib_duplicates; /* spare bit map of duplicates */
jrd_tra* iib_transaction; /* insertion transaction */ jrd_tra* iib_transaction; /* insertion transaction */
BtrPageGCLock* iib_dont_gc_lock; // lock to prevent removal of splitted page
}; };

View File

@ -429,6 +429,7 @@ SLONG LCK_get_owner_handle(thread_db* tdbb, enum lck_t lock_type)
case LCK_dsql_cache: case LCK_dsql_cache:
case LCK_backup_end: case LCK_backup_end:
case LCK_cancel: case LCK_cancel:
case LCK_btr_dont_gc:
return *LCK_OWNER_HANDLE_ATT(tdbb); return *LCK_OWNER_HANDLE_ATT(tdbb);
default: default:
bug_lck("Invalid lock type in LCK_get_owner_handle ()"); bug_lck("Invalid lock type in LCK_get_owner_handle ()");

View File

@ -55,7 +55,8 @@ enum lck_t {
LCK_monitor, /* Lock to dump the monitoring data */ LCK_monitor, /* Lock to dump the monitoring data */
LCK_instance, /* Lock to identify a dbb instance */ LCK_instance, /* Lock to identify a dbb instance */
LCK_tt_exist, /* TextType existence lock */ LCK_tt_exist, /* TextType existence lock */
LCK_cancel /* Cancellation lock */ LCK_cancel, /* Cancellation lock */
LCK_btr_dont_gc // Prevent removal of b-tree page from index
}; };
/* Lock owner types */ /* Lock owner types */
@ -89,7 +90,7 @@ public:
lck_data(0) lck_data(0)
{ {
lck_key.lck_long = 0; lck_key.lck_long = 0;
lck_tail[0] = 0; lck_tail[0] = 0;
} }
int lck_test_field; int lck_test_field;

View File

@ -840,7 +840,9 @@ static void prt_lock(
lock->lbl_series, lock->lbl_parent, lock->lbl_state, lock->lbl_series, lock->lbl_parent, lock->lbl_state,
lock->lbl_size, lock->lbl_length, lock->lbl_data); lock->lbl_size, lock->lbl_length, lock->lbl_data);
if (lock->lbl_series == Jrd::LCK_bdb && lock->lbl_length == Jrd::PageNumber::getLockLen()) { if ((lock->lbl_series == Jrd::LCK_bdb || lock->lbl_series == Jrd::LCK_btr_dont_gc) &&
lock->lbl_length == Jrd::PageNumber::getLockLen())
{
// Since fb 2.1 lock keys for page numbers (series == 3) contains // Since fb 2.1 lock keys for page numbers (series == 3) contains
// page space number in high long of two-longs key. Lets print it // page space number in high long of two-longs key. Lets print it
// in <page_space>:<page_number> format // in <page_space>:<page_number> format