diff --git a/src/jrd/btr.cpp b/src/jrd/btr.cpp index b8b563ff22..35d1f46ba3 100644 --- a/src/jrd/btr.cpp +++ b/src/jrd/btr.cpp @@ -55,6 +55,7 @@ #include "../jrd/gds_proto.h" #include "../jrd/intl_proto.h" #include "../jrd/jrd_proto.h" +#include "../jrd/lck_proto.h" #include "../jrd/met_proto.h" #include "../jrd/mov_proto.h" #include "../jrd/nav_proto.h" @@ -68,6 +69,7 @@ using namespace Jrd; using namespace Ods; +//#define DEBUG_BTR_SPLIT /********************************************* eliminate this conversion - kk @@ -217,6 +219,56 @@ static void update_selectivity(index_root_page*, USHORT, const SelectivityList&) static void checkForLowerKeySkip(bool&, const bool, const IndexNode&, const temporary_key&, const index_desc&, const IndexRetrieval*); +class Jrd::BtrPageGCLock : public Lock +{ +public: + BtrPageGCLock(thread_db* tdbb) + { + Database* dbb = tdbb->tdbb_database; + lck_parent = dbb->dbb_lock; + lck_dbb = dbb; + lck_length = sizeof(SLONG); + lck_type = LCK_btr_dont_gc; + lck_owner_handle = LCK_get_owner_handle(tdbb, lck_type); + } + + ~BtrPageGCLock() + { + // assert in debug build + fb_assert(!lck_id); + + // lck_id might be set only if exception occurs + if (lck_id) { + LCK_release(JRD_get_thread_data(), this); + } + } + + void disablePageGC(thread_db* tdbb, SLONG page) + { + lck_key.lck_long = page; + LCK_lock(tdbb, this, LCK_read, LCK_WAIT); + } + + void enablePageGC(thread_db* tdbb) + { + LCK_release(tdbb, this); + } + + static bool isPageGCAllowed(thread_db* tdbb, SLONG page) + { + BtrPageGCLock lock(tdbb); + lock.lck_key.lck_long = page; + + const bool res = LCK_lock(tdbb, &lock, LCK_write, LCK_NO_WAIT); + + if (res) { + LCK_release(tdbb, &lock); + } + + return res; + } +}; + USHORT BTR_all(thread_db* tdbb, jrd_rel* relation, @@ -895,8 +947,12 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion) index_desc* idx = insertion->iib_descriptor; WIN window(idx->idx_root); - btree_page* bucket = (btree_page*) CCH_FETCH(tdbb, &window, LCK_write, pag_index); + btree_page* bucket = (btree_page*) CCH_FETCH(tdbb, &window, LCK_read, pag_index); + if (bucket->btr_level == 0) { + CCH_RELEASE(tdbb, &window); + CCH_FETCH(tdbb, &window, LCK_write, pag_index); + } CCH_RELEASE(tdbb, root_window); temporary_key key; @@ -904,6 +960,8 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion) key.key_length = 0; RecordNumber recordNumber(0); + BtrPageGCLock lock(tdbb); + insertion->iib_dont_gc_lock = &lock; SLONG split_page = add_node(tdbb, &window, insertion, &key, &recordNumber, NULL, NULL); if (split_page == NO_SPLIT) { @@ -924,6 +982,7 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion) // in the existing "top" page instead of making a new "top" page. CCH_RELEASE(tdbb, root_window); + lock.enablePageGC(tdbb); index_insertion propagate = *insertion; propagate.iib_number.setValue(split_page); @@ -940,16 +999,18 @@ void BTR_insert(thread_db* tdbb, WIN * root_window, index_insertion* insertion) if (split_page == NO_VALUE_PAGE) { CCH_RELEASE(tdbb, &window); } + else { + lock.enablePageGC(tdbb); + } BUGCHECK(204); // msg 204 index inconsistent } return; } // the original page was marked as not garbage-collectable, but - // since it is the top page it won't be garbage-collected anyway, + // since it is the root page it won't be garbage-collected anyway, // so go ahead and mark it as garbage-collectable now. - CCH_MARK(tdbb, &window); - bucket->btr_header.pag_flags &= ~btr_dont_gc; + lock.enablePageGC(tdbb); WIN new_window(split_page); btree_page* new_bucket = @@ -2190,32 +2251,39 @@ static SLONG add_node(thread_db* tdbb, break; } bucket = (btree_page*) CCH_HANDOFF(tdbb, window, bucket->btr_sibling, - LCK_write, pag_index); + LCK_read, pag_index); } - CCH_MARK(tdbb, window); - bucket->btr_header.pag_flags |= btr_dont_gc; + BtrPageGCLock lockCurrent(tdbb); + lockCurrent.disablePageGC(tdbb, window->win_page); // Fetch the page at the next level down. If the next level is leaf level, // fetch for write since we know we are going to write to the page (most likely). const SLONG index = window->win_page; - CCH_HANDOFF(tdbb, window, page, LCK_write, pag_index); + CCH_HANDOFF(tdbb, window, page, + (SSHORT) ((bucket->btr_level == 1) ? LCK_write : LCK_read), pag_index); // now recursively try to insert the node at the next level down index_insertion propagate; + BtrPageGCLock lockLower(tdbb); + propagate.iib_dont_gc_lock = insertion->iib_dont_gc_lock; + insertion->iib_dont_gc_lock = &lockLower; SLONG split = add_node(tdbb, window, insertion, new_key, new_record_number, &page, &propagate.iib_sibling); if (split == NO_SPLIT) { - window->win_page = index; - bucket = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index); - CCH_MARK(tdbb, window); - bucket->btr_header.pag_flags &= ~btr_dont_gc; - CCH_RELEASE(tdbb, window); - + lockCurrent.enablePageGC(tdbb); + insertion->iib_dont_gc_lock = propagate.iib_dont_gc_lock; return NO_SPLIT; } +#ifdef DEBUG_BTR_SPLIT + Firebird::string s; + s.printf("page %6ld splitted. split %6ld, right %6ld, parent %6ld", + page, split, propagate.iib_sibling, index); + gds__trace(s.c_str()); +#endif + // The page at the lower level split, so we need to insert a pointer // to the new page to the page at this level. window->win_page = index; @@ -2247,17 +2315,10 @@ static SLONG add_node(thread_db* tdbb, // the split page on the lower level has been propogated, so we can go back to // the page it was split from, and mark it as garbage-collectable now - window->win_page = page; - bucket = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index); - CCH_MARK(tdbb, window); - bucket->btr_header.pag_flags &= ~btr_dont_gc; - CCH_RELEASE(tdbb, window); + lockLower.enablePageGC(tdbb); + insertion->iib_dont_gc_lock = propagate.iib_dont_gc_lock; - window->win_page = index; - bucket = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index); - CCH_MARK(tdbb, window); - bucket->btr_header.pag_flags &= ~btr_dont_gc; - CCH_RELEASE(tdbb, window); + lockCurrent.enablePageGC(tdbb); if (original_page) { *original_page = original_page2; @@ -4719,7 +4780,7 @@ static CONTENTS garbage_collect(thread_db* tdbb, WIN * window, SLONG parent_numb CONTENTS result = contents_above_threshold; // check to see if the page was marked not to be garbage collected - if (gc_page->btr_header.pag_flags & btr_dont_gc) { + if ( !BtrPageGCLock::isPageGCAllowed(tdbb, window->win_page) ) { CCH_RELEASE(tdbb, window); return contents_above_threshold; } @@ -4849,8 +4910,8 @@ static CONTENTS garbage_collect(thread_db* tdbb, WIN * window, SLONG parent_numb // now refetch the original page and make sure it is still // below the threshold for garbage collection. gc_page = (btree_page*) CCH_FETCH(tdbb, window, LCK_write, pag_index); - if ((gc_page->btr_length >= GARBAGE_COLLECTION_BELOW_THRESHOLD) - || (gc_page->btr_header.pag_flags & btr_dont_gc)) + if ((gc_page->btr_length >= GARBAGE_COLLECTION_BELOW_THRESHOLD) || + !BtrPageGCLock::isPageGCAllowed(tdbb, window->win_page)) { CCH_RELEASE(tdbb, &parent_window); CCH_RELEASE(tdbb, &left_window); @@ -5049,6 +5110,12 @@ static CONTENTS garbage_collect(thread_db* tdbb, WIN * window, SLONG parent_numb return contents_above_threshold; } +#ifdef DEBUG_BTR_SPLIT + Firebird::string s; + s.printf("node with page %6ld removed from parent page %6ld", + parentNode.pageNumber, parent_window.win_page); + gds__trace(s.c_str()); +#endif // Update the parent first. If the parent is not written out first, // we will be pointing to a page which is not in the doubly linked // sibling list, and therefore navigation back and forth won't work. @@ -5122,6 +5189,14 @@ static CONTENTS garbage_collect(thread_db* tdbb, WIN * window, SLONG parent_numb // we will be pointing to a page which is not in the doubly linked // sibling list, and therefore navigation back and forth won't work. // AB: Parent is always a index pointer page. + +#ifdef DEBUG_BTR_SPLIT + Firebird::string s; + s.printf("node with page %6ld removed from parent page %6ld", + parentNode.pageNumber, parent_window.win_page); + gds__trace(s.c_str()); +#endif + result = delete_node(tdbb, &parent_window, parentNode.nodePointer); CCH_RELEASE(tdbb, &parent_window); @@ -5194,6 +5269,15 @@ static CONTENTS garbage_collect(thread_db* tdbb, WIN * window, SLONG parent_numb CCH_RELEASE(tdbb, &left_window); +#ifdef DEBUG_BTR_SPLIT + Firebird::string s; + s.printf("page %6ld is removed from index. parent %6ld, left %6ld, right %6ld", + window->win_page, parent_window.win_page, + left_page ? left_window.win_page : 0, + right_page ? right_window.win_page : 0 ); + gds__trace(s.c_str()); +#endif + // finally, release the page, and indicate that we should write the // previous page out before we write the TIP page out CCH_RELEASE(tdbb, window); @@ -5993,7 +6077,7 @@ static SLONG insert_node(thread_db* tdbb, // mark the bucket as non garbage-collectable until we can propagate // the split page up to the parent; otherwise its possible that the // split page we just created will be lost. - bucket->btr_header.pag_flags |= btr_dont_gc; + insertion->iib_dont_gc_lock->disablePageGC(tdbb, window->win_page); if (original_page) { *original_page = window->win_page; diff --git a/src/jrd/btr.h b/src/jrd/btr.h index 7f729902d2..62d0f6effd 100644 --- a/src/jrd/btr.h +++ b/src/jrd/btr.h @@ -46,11 +46,11 @@ struct dsc; namespace Jrd { class jrd_rel; -class jrd_tra; template class vec; class jrd_req; struct temporary_key; class jrd_tra; +class BtrPageGCLock; enum idx_null_state { idx_nulls_none, @@ -137,6 +137,7 @@ struct index_insertion { temporary_key* iib_key; /* varying string for insertion */ RecordBitmap* iib_duplicates; /* spare bit map of duplicates */ jrd_tra* iib_transaction; /* insertion transaction */ + BtrPageGCLock* iib_dont_gc_lock; // lock to prevent removal of splitted page }; diff --git a/src/jrd/lck.cpp b/src/jrd/lck.cpp index 234e10583f..b3e329c57b 100644 --- a/src/jrd/lck.cpp +++ b/src/jrd/lck.cpp @@ -546,6 +546,7 @@ SLONG LCK_get_owner_handle(thread_db* tdbb, enum lck_t lock_type) case LCK_record: case LCK_update_shadow: case LCK_backup_end: + case LCK_btr_dont_gc: return *LCK_OWNER_HANDLE_ATT(attachment); default: bug_lck("Invalid lock type in LCK_get_owner_handle ()"); diff --git a/src/jrd/lck.h b/src/jrd/lck.h index cde640ff46..7f0719ea1f 100644 --- a/src/jrd/lck.h +++ b/src/jrd/lck.h @@ -53,7 +53,8 @@ enum lck_t { LCK_backup_alloc, /* Lock for page allocation table in backup spare file */ LCK_backup_database, /* Lock to protect writing to database file */ LCK_backup_end, /* Lock to protect end_backup consistency */ - LCK_rel_partners /* Relation partners lock */ + LCK_rel_partners, /* Relation partners lock */ + LCK_btr_dont_gc // lock for b-tree page to prevent its removing from index }; /* Lock owner types */