diff --git a/src/jrd/btr.cpp b/src/jrd/btr.cpp index e147f6066c..6d3e1fa6d8 100644 --- a/src/jrd/btr.cpp +++ b/src/jrd/btr.cpp @@ -54,6 +54,7 @@ #include "../jrd/gds_proto.h" #include "../jrd/intl_proto.h" #include "../jrd/jrd_proto.h" +#include "../jrd/lck_proto.h" #include "../jrd/met_proto.h" #include "../jrd/mov_proto.h" #include "../jrd/nav_proto.h" @@ -209,6 +210,62 @@ static void update_selectivity(index_root_page*, USHORT, const SelectivityList&) static void checkForLowerKeySkip(bool&, const bool, const IndexNode&, const temporary_key&, const index_desc&, const IndexRetrieval*); +class Jrd::BtrPageGCLock : public Lock +{ + // We want to put 8 bytes (PageNumber) in lock key, one long is already + // reserved by Lock::lck_long this is the second long. It is really unused + // as second long used for 8-byte key already present because of alignment. + long unused; + +public: + BtrPageGCLock(thread_db *tdbb) + { + Database* dbb = tdbb->getDatabase(); + lck_parent = dbb->dbb_lock; + lck_dbb = dbb; + lck_length = PageNumber::getLockLen(); + lck_type = LCK_btr_dont_gc; + lck_owner_handle = LCK_get_owner_handle(tdbb, lck_type); + } + + ~BtrPageGCLock() + { + // assert in debug build + fb_assert(!lck_id); + + // lck_id might be set only if exception occurs + if (lck_id) { + LCK_release(JRD_get_thread_data(), this); + } + } + + void disablePageGC(thread_db *tdbb, const PageNumber &page) + { + page.getLockStr(lck_key.lck_string); + LCK_lock(tdbb, this, LCK_read, LCK_WAIT); + } + + void enablePageGC(thread_db *tdbb) + { + LCK_release(tdbb, this); + } + + static bool isPageGCAllowed(thread_db *tdbb, const PageNumber &page) + { + BtrPageGCLock lock(tdbb); + page.getLockStr(lock.lck_key.lck_string); + + const bool res = LCK_lock(tdbb, &lock, LCK_write, LCK_NO_WAIT); + + if (res) { + LCK_release(tdbb, &lock); + } + + return res; + } +}; + + USHORT BTR_all(thread_db* tdbb, jrd_rel* relation, IndexDescAlloc** csb_idx, @@ -918,8 +975,12 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion) index_desc* idx = insertion->iib_descriptor; RelationPages* relPages = insertion->iib_relation->getPages(tdbb); WIN window(relPages->rel_pg_space_id, idx->idx_root); - btree_page* bucket = (btree_page*) CCH_FETCH(tdbb, &window, LCK_write, pag_index); - + btree_page* bucket = (btree_page*) CCH_FETCH(tdbb, &window, LCK_read, pag_index); + + if (bucket->btr_level == 0) { + CCH_RELEASE(tdbb, &window); + CCH_FETCH(tdbb, &window, LCK_write, pag_index); + } CCH_RELEASE(tdbb, root_window); temporary_key key; @@ -927,6 +988,8 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion) key.key_length = 0; RecordNumber recordNumber(0); + BtrPageGCLock lock(tdbb); + insertion->iib_dont_gc_lock = &lock; SLONG split_page = add_node(tdbb, &window, insertion, &key, &recordNumber, NULL, NULL); if (split_page == NO_SPLIT) { @@ -947,6 +1010,7 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion) // in the existing "top" page instead of making a new "top" page. CCH_RELEASE(tdbb, root_window); + lock.enablePageGC(tdbb); index_insertion propagate = *insertion; propagate.iib_number.setValue(split_page); @@ -963,16 +1027,18 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion) if (split_page == NO_VALUE_PAGE) { CCH_RELEASE(tdbb, &window); } + else { + lock.enablePageGC(tdbb); + } BUGCHECK(204); // msg 204 index inconsistent } return; } // the original page was marked as not garbage-collectable, but - // since it is the top page it won't be garbage-collected anyway, + // since it is the root page it won't be garbage-collected anyway, // so go ahead and mark it as garbage-collectable now. - CCH_MARK(tdbb, &window); - bucket->btr_header.pag_flags &= ~btr_dont_gc; + lock.enablePageGC(tdbb); WIN new_window(relPages->rel_pg_space_id, split_page); btree_page* new_bucket = @@ -2245,30 +2311,30 @@ static SLONG add_node(thread_db* tdbb, break; } bucket = (btree_page*) CCH_HANDOFF(tdbb, window, bucket->btr_sibling, - LCK_write, pag_index); + LCK_read, pag_index); } - CCH_MARK(tdbb, window); - bucket->btr_header.pag_flags |= btr_dont_gc; + BtrPageGCLock lockCurrent(tdbb); + lockCurrent.disablePageGC(tdbb, window->win_page); // Fetch the page at the next level down. If the next level is leaf level, // fetch for write since we know we are going to write to the page (most likely). const PageNumber index = window->win_page; - CCH_HANDOFF(tdbb, window, page, LCK_write, pag_index); + CCH_HANDOFF(tdbb, window, page, + (SSHORT) ((bucket->btr_level == 1) ? LCK_write : LCK_read), pag_index); // now recursively try to insert the node at the next level down index_insertion propagate; + BtrPageGCLock lockLower(tdbb); + propagate.iib_dont_gc_lock = insertion->iib_dont_gc_lock; + insertion->iib_dont_gc_lock = &lockLower; SLONG split = add_node(tdbb, window, insertion, new_key, new_record_number, &page, &propagate.iib_sibling); if (split == NO_SPLIT) { - window->win_page = index; - bucket = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index); - CCH_MARK(tdbb, window); - bucket->btr_header.pag_flags &= ~btr_dont_gc; - CCH_RELEASE(tdbb, window); - + lockCurrent.enablePageGC(tdbb); + insertion->iib_dont_gc_lock = propagate.iib_dont_gc_lock; return NO_SPLIT; } @@ -2310,17 +2376,10 @@ static SLONG add_node(thread_db* tdbb, // the split page on the lower level has been propogated, so we can go back to // the page it was split from, and mark it as garbage-collectable now - window->win_page = page; - bucket = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index); - CCH_MARK(tdbb, window); - bucket->btr_header.pag_flags &= ~btr_dont_gc; - CCH_RELEASE(tdbb, window); + lockLower.enablePageGC(tdbb); + insertion->iib_dont_gc_lock = propagate.iib_dont_gc_lock; - window->win_page = index; - bucket = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index); - CCH_MARK(tdbb, window); - bucket->btr_header.pag_flags &= ~btr_dont_gc; - CCH_RELEASE(tdbb, window); + lockCurrent.enablePageGC(tdbb); if (original_page) { *original_page = original_page2; @@ -4763,7 +4822,7 @@ static CONTENTS garbage_collect(thread_db* tdbb, WIN * window, SLONG parent_numb CONTENTS result = contents_above_threshold; // check to see if the page was marked not to be garbage collected - if (gc_page->btr_header.pag_flags & btr_dont_gc) { + if ( !BtrPageGCLock::isPageGCAllowed(tdbb, window->win_page) ) { CCH_RELEASE(tdbb, window); return contents_above_threshold; } @@ -4892,7 +4951,7 @@ static CONTENTS garbage_collect(thread_db* tdbb, WIN * window, SLONG parent_numb // below the threshold for garbage collection. gc_page = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index); if ((gc_page->btr_length >= GARBAGE_COLLECTION_BELOW_THRESHOLD) - || (gc_page->btr_header.pag_flags & btr_dont_gc)) + || !BtrPageGCLock::isPageGCAllowed(tdbb, window->win_page) ) { CCH_RELEASE(tdbb, &parent_window); CCH_RELEASE(tdbb, &left_window); @@ -6062,7 +6121,7 @@ static SLONG insert_node(thread_db* tdbb, // mark the bucket as non garbage-collectable until we can propagate // the split page up to the parent; otherwise its possible that the // split page we just created will be lost. - bucket->btr_header.pag_flags |= btr_dont_gc; + insertion->iib_dont_gc_lock->disablePageGC(tdbb, window->win_page); if (original_page) { *original_page = window->win_page.getPageNum(); @@ -6853,4 +6912,3 @@ void update_selectivity(index_root_page* root, USHORT id, } irt_desc->irt_stuff.irt_selectivity = selectivity.back(); } - diff --git a/src/jrd/btr.h b/src/jrd/btr.h index 94d5373f18..03b6340319 100644 --- a/src/jrd/btr.h +++ b/src/jrd/btr.h @@ -50,6 +50,7 @@ template class vec; class jrd_req; struct temporary_key; class jrd_tra; +class BtrPageGCLock; enum idx_null_state { idx_nulls_none, @@ -136,6 +137,7 @@ struct index_insertion { temporary_key* iib_key; /* varying string for insertion */ RecordBitmap* iib_duplicates; /* spare bit map of duplicates */ jrd_tra* iib_transaction; /* insertion transaction */ + BtrPageGCLock* iib_dont_gc_lock; // lock to prevent removal of splitted page }; diff --git a/src/jrd/lck.cpp b/src/jrd/lck.cpp index a3f069d396..1588370660 100644 --- a/src/jrd/lck.cpp +++ b/src/jrd/lck.cpp @@ -429,6 +429,7 @@ SLONG LCK_get_owner_handle(thread_db* tdbb, enum lck_t lock_type) case LCK_dsql_cache: case LCK_backup_end: case LCK_cancel: + case LCK_btr_dont_gc: return *LCK_OWNER_HANDLE_ATT(tdbb); default: bug_lck("Invalid lock type in LCK_get_owner_handle ()"); diff --git a/src/jrd/lck.h b/src/jrd/lck.h index 8393af0eaf..7fb20584a2 100644 --- a/src/jrd/lck.h +++ b/src/jrd/lck.h @@ -55,7 +55,8 @@ enum lck_t { LCK_monitor, /* Lock to dump the monitoring data */ LCK_instance, /* Lock to identify a dbb instance */ LCK_tt_exist, /* TextType existence lock */ - LCK_cancel /* Cancellation lock */ + LCK_cancel, /* Cancellation lock */ + LCK_btr_dont_gc // Prevent removal of b-tree page from index }; /* Lock owner types */ @@ -89,7 +90,7 @@ public: lck_data(0) { lck_key.lck_long = 0; - lck_tail[0] = 0; + lck_tail[0] = 0; } int lck_test_field; diff --git a/src/lock/print.cpp b/src/lock/print.cpp index 0eb59c3888..7be49fb177 100644 --- a/src/lock/print.cpp +++ b/src/lock/print.cpp @@ -840,7 +840,9 @@ static void prt_lock( lock->lbl_series, lock->lbl_parent, lock->lbl_state, lock->lbl_size, lock->lbl_length, lock->lbl_data); - if (lock->lbl_series == Jrd::LCK_bdb && lock->lbl_length == Jrd::PageNumber::getLockLen()) { + if ((lock->lbl_series == Jrd::LCK_bdb || lock->lbl_series == Jrd::LCK_btr_dont_gc) && + lock->lbl_length == Jrd::PageNumber::getLockLen()) + { // Since fb 2.1 lock keys for page numbers (series == 3) contains // page space number in high long of two-longs key. Lets print it // in : format