/* * PROGRAM: JRD Access Method * MODULE: btr.c * DESCRIPTION: B-tree management code * * The contents of this file are subject to the Interbase Public * License Version 1.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy * of the License at http://www.Inprise.com/IPL.html * * Software distributed under the License is distributed on an * "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express * or implied. See the License for the specific language governing * rights and limitations under the License. * * The Original Code was created by Inprise Corporation * and its predecessors. Portions created by Inprise Corporation are * Copyright (C) Inprise Corporation. * * All Rights Reserved. * Contributor(s): ______________________________________. * * 2002.10.30 Sean Leyne - Removed support for obsolete "PC_PLATFORM" define * */ #include "firebird.h" #include #include #include "memory_routines.h" #include "../jrd/ib_stdio.h" #include "../jrd/jrd.h" #include "../jrd/ods.h" #include "../jrd/val.h" #include "../jrd/btr.h" #include "../jrd/req.h" #include "../jrd/tra.h" #include "../jrd/intl.h" #include "gen/iberror.h" #include "../jrd/common.h" #include "../jrd/jrn.h" #include "../jrd/lck.h" #include "../jrd/cch.h" #include "../jrd/sbm.h" #include "../jrd/sort.h" #include "../jrd/gdsassert.h" #include "../jrd/all_proto.h" #include "../jrd/btr_proto.h" #include "../jrd/cch_proto.h" #include "../jrd/dpm_proto.h" #include "../jrd/err_proto.h" #include "../jrd/evl_proto.h" #include "../jrd/gds_proto.h" #include "../jrd/intl_proto.h" #include "../jrd/jrd_proto.h" #include "../jrd/met_proto.h" #include "../jrd/mov_proto.h" #include "../jrd/nav_proto.h" #include "../jrd/dbg_proto.h" #include "../jrd/pag_proto.h" #include "../jrd/pcmet_proto.h" #include "../jrd/sbm_proto.h" #include "../jrd/sort_proto.h" #include "../jrd/thd_proto.h" #include "../jrd/tra_proto.h" /********************************************* eliminate this conversion - kk #ifdef VMS double MTH$CVT_G_D(); #endif **********************************************/ #define MAX_LEVELS 16 inline void MOVE_BYTE(UCHAR*& x_from, UCHAR*& x_to) { *x_to++ = *x_from++; } #define OVERSIZE (MAX_PAGE_SIZE + BTN_SIZE + MAX_KEY + sizeof (SLONG) - 1) / sizeof (SLONG) typedef union { SLONG n; SCHAR c[4]; } LONGCHAR; #define GARBAGE_COLLECTION_THRESHOLD (dbb->dbb_page_size / 4) typedef struct { double d_part; SSHORT s_part; } INT64_KEY; #define INT64_KEY_LENGTH (sizeof (double) + sizeof (SSHORT)) static const double pow10[] = { 1.e00, 1.e01, 1.e02, 1.e03, 1.e04, 1.e05, 1.e06, 1.e07, 1.e08, 1.e09, 1.e10, 1.e11, 1.e12, 1.e13, 1.e14, 1.e15, 1.e16, 1.e17, 1.e18, 1.e19, 1.e20, 1.e21, 1.e22, 1.e23, 1.e24, 1.e25, 1.e26, 1.e27, 1.e28, 1.e29, 1.e30, 1.e31, 1.e32, 1.e33, 1.e34, 1.e35, 1.e36 }; #define powerof10(s) ((s) <= 0 ? pow10[-(s)] : 1./pow10[-(s)]) static const struct { /* Used in make_int64_key() */ UINT64 limit; SINT64 factor; SSHORT scale_change; } int64_scale_control[] = { { QUADCONST(922337203685470000), QUADCONST(1), 0}, { QUADCONST(92233720368547000), QUADCONST(10), 1}, { QUADCONST(9223372036854700), QUADCONST(100), 2}, { QUADCONST(922337203685470), QUADCONST(1000), 3}, { QUADCONST(92233720368548), QUADCONST(10000), 4}, { QUADCONST(9223372036855), QUADCONST(100000), 5}, { QUADCONST(922337203686), QUADCONST(1000000), 6}, { QUADCONST(92233720369), QUADCONST(10000000), 7}, { QUADCONST(9223372035), QUADCONST(100000000), 8}, { QUADCONST(922337204), QUADCONST(1000000000), 9}, { QUADCONST(92233721), QUADCONST(10000000000), 10}, { QUADCONST(9223373), QUADCONST(100000000000), 11}, { QUADCONST(922338), QUADCONST(1000000000000), 12}, { QUADCONST(92234), QUADCONST(10000000000000), 13}, { QUADCONST(9224), QUADCONST(100000000000000), 14}, { QUADCONST(923), QUADCONST(1000000000000000), 15}, { QUADCONST(93), QUADCONST(10000000000000000), 16}, { QUADCONST(10), QUADCONST(100000000000000000), 17}, { QUADCONST(1), QUADCONST(1000000000000000000), 18}, { QUADCONST(0), QUADCONST(0), 0}}; /* The first four entries in the array int64_scale_control[] ends with the * limit having 0's in the end. This is to inhibit any rounding off that * DOUBLE precision can introduce. DOUBLE can easily store upto 92233720368547 * uniquely. Values after this tend to round off to the upper limit during * division. Hence the ending with 0's so that values will be bunched together * in the same limit range and scale control for INT64 index KEY calculation. * * This part was changed as a fix for bug 10267. - bsriram 04-Mar-1999 */ /* enumerate the possible outcomes of deleting a node */ typedef enum contents { contents_empty = 0, contents_single, contents_below_threshold, contents_above_threshold } CONTENTS; static SLONG add_node(TDBB, WIN *, IIB *, KEY *, SLONG *, SLONG *); static void complement_key(KEY *); static void compress(TDBB, DSC *, KEY *, USHORT, bool, bool, USHORT); static USHORT compress_root(TDBB, IRT); static USHORT compute_prefix(KEY *, UCHAR *, USHORT); static void copy_key(KEY *, KEY *); static CONTENTS delete_node(TDBB, WIN *, BTN); static void delete_tree(TDBB, USHORT, USHORT, SLONG, SLONG); static DSC *eval(TDBB, JRD_NOD, DSC *, bool *); static SLONG fast_load(TDBB, JRD_REL, IDX *, USHORT, SCB, float *); static IRT fetch_root(TDBB, WIN *, JRD_REL); static SLONG find_node(BTR, KEY *, bool); static CONTENTS garbage_collect(TDBB, WIN *, SLONG); static SLONG insert_node(TDBB, WIN *, IIB *, KEY *, SLONG *, SLONG *); static void journal_btree_segment(TDBB, WIN *, BTR); static bool key_equality(KEY *, BTN); static INT64_KEY make_int64_key(SINT64, SSHORT); #ifdef DEBUG_INDEXKEY static void print_int64_key(SINT64, SSHORT, INT64_KEY); #endif static void quad_put(SLONG, UCHAR *); static void quad_move(UCHAR*, UCHAR*); static CONTENTS remove_node(TDBB, IIB *, WIN *); static CONTENTS remove_leaf_node(TDBB, IIB *, WIN *); static bool scan(TDBB, BTN, SBM *, USHORT, KEY *, USHORT); USHORT BTR_all(TDBB tdbb, JRD_REL relation, IDX** start_buffer, IDX** csb_idx, STR* csb_idx_allocation, SLONG* idx_size) { /************************************** * * B T R _ a l l * ************************************** * * Functional description * Return descriptions of all indices for relation. If there isn't * a known index root, assume we were called during optimization * and return no indices. * **************************************/ DBB dbb; WIN window; IRT root; STR new_buffer; USHORT count, i; IDX *buffer; SLONG size; SET_TDBB(tdbb); dbb = tdbb->tdbb_database; CHECK_DBB(dbb); window.win_flags = 0; buffer = *start_buffer; if (!(root = fetch_root(tdbb, &window, relation))) { return 0; } if ((SLONG) (root->irt_count * sizeof(IDX)) > *idx_size) { size = (sizeof(IDX) * dbb->dbb_max_idx) + ALIGNMENT; *csb_idx_allocation = new_buffer = FB_NEW_RPT(*dbb->dbb_permanent, size) str(); buffer = *start_buffer = (IDX *) FB_ALIGN((U_IPTR) new_buffer->str_data, ALIGNMENT); *idx_size = size - ALIGNMENT; } count = 0; for (i = 0; i < root->irt_count; i++) { if (BTR_description(relation, root, buffer, i)) { count++; buffer = NEXT_IDX(buffer->idx_rpt, buffer->idx_count); } } *csb_idx = *start_buffer; *idx_size = *idx_size - ((UCHAR *) buffer - (UCHAR *) * start_buffer); *start_buffer = buffer; CCH_RELEASE(tdbb, &window); return count; } void BTR_create(TDBB tdbb, JRD_REL relation, IDX * idx, USHORT key_length, SCB sort_handle, float *selectivity) { /************************************** * * B T R _ c r e a t e * ************************************** * * Functional description * Create a new index. * **************************************/ SET_TDBB(tdbb); DBB dbb = tdbb->tdbb_database; CHECK_DBB(dbb); // Now that the index id has been checked out, create the index. idx->idx_root = fast_load(tdbb, relation, idx, key_length, sort_handle, selectivity); // Index is created. Go back to the index root page and update it to // point to the index. WIN window; window.win_page = relation->rel_index_root; window.win_flags = 0; IRT root = (IRT) CCH_FETCH(tdbb, &window, LCK_write, pag_root); CCH_MARK(tdbb, &window); root->irt_rpt[idx->idx_id].irt_root = idx->idx_root; root->irt_rpt[idx->idx_id].irt_stuff.irt_selectivity = *selectivity; root->irt_rpt[idx->idx_id].irt_flags &= ~irt_in_progress; if (dbb->dbb_wal) { CCH_journal_page(tdbb, &window); } CCH_RELEASE(tdbb, &window); } void BTR_delete_index(TDBB tdbb, WIN * window, USHORT id) { /************************************** * * B T R _ d e l e t e _ i n d e x * ************************************** * * Functional description * Delete an index if it exists. * **************************************/ USHORT relation_id; SLONG prior, next; irt::irt_repeat * irt_desc; SET_TDBB(tdbb); DBB dbb = tdbb->tdbb_database; CHECK_DBB(dbb); // Get index descriptor. If index doesn't exist, just leave. IRT root = (IRT) window->win_buffer; if (id >= root->irt_count) { CCH_RELEASE(tdbb, window); } else { irt_desc = root->irt_rpt + id; CCH_MARK(tdbb, window); next = irt_desc->irt_root; // remove the pointer to the top-level index page before we delete it irt_desc->irt_root = 0; irt_desc->irt_flags = 0; prior = window->win_page; relation_id = root->irt_relation; // Journal update of index root page if (dbb->dbb_wal) { CCH_journal_page(tdbb, window); } CCH_RELEASE(tdbb, window); delete_tree(tdbb, relation_id, id, next, prior); } } bool BTR_description(JRD_REL relation, IRT root, IDX * idx, SSHORT id) { /************************************** * * B T R _ d e s c r i p t i o n * ************************************** * * Functional description * See if index exists, and if so, pick up its description. * Index id's must fit in a short - formerly a UCHAR. * **************************************/ irt::irt_repeat * irt_desc; idx::idx_repeat * idx_desc; irtd* field_type_desc; USHORT i; if (id >= root->irt_count) { return false; } irt_desc = &root->irt_rpt[id]; if (irt_desc->irt_root == 0) { return false; } //fb_assert(id <= MAX_USHORT); idx->idx_id = (USHORT)id; idx->idx_root = irt_desc->irt_root; idx->idx_selectivity = irt_desc->irt_stuff.irt_selectivity; idx->idx_count = irt_desc->irt_keys; idx->idx_flags = irt_desc->irt_flags; idx->idx_runtime_flags = 0; idx->idx_foreign_primaries = NULL; idx->idx_foreign_relations = NULL; idx->idx_foreign_indexes = NULL; idx->idx_primary_relation = 0; idx->idx_primary_index = 0; idx->idx_expression = NULL; idx->idx_expression_request = NULL; // pick up field ids and type descriptions for each of the fields field_type_desc = (irtd*) ((UCHAR*) root + irt_desc->irt_desc); idx_desc = idx->idx_rpt; for (i = 0; i < idx->idx_count; i++, field_type_desc++, idx_desc++) { idx_desc->idx_field = field_type_desc->irtd_field; idx_desc->idx_itype = field_type_desc->irtd_itype; } #ifdef EXPRESSION_INDICES if (idx->idx_flags & idx_expressn) { PCMET_lookup_index(relation, idx); } #endif return true; } void BTR_evaluate(TDBB tdbb, IRB retrieval, SBM * bitmap) { /************************************** * * B T R _ e v a l u a t e * ************************************** * * Functional description * Do an index scan and return a bitmap * of all candidate record numbers. * **************************************/ SET_TDBB(tdbb); SBM_reset(bitmap); IDX idx; WIN window; window.win_flags = 0; KEY lower, upper; USHORT prefix; BTR page = BTR_find_page(tdbb, retrieval, &window, &idx, &lower, &upper, false); // If there is a starting descriptor, search down index to starting position. // This may involve sibling buckets if splits are in progress. If there // isn't a starting descriptor, walk down the left side of the index. BTN node; if (retrieval->irb_lower_count) { while (!(node = BTR_find_leaf(page, &lower, 0, &prefix, idx.idx_flags & idx_descending, true))) { page =(BTR) CCH_HANDOFF(tdbb, &window, page->btr_sibling, LCK_read, pag_index); } // Compute the number of matching characters in lower and upper bounds if (retrieval->irb_upper_count) { prefix = compute_prefix(&upper, lower.key_data, lower.key_length); } } else { node = page->btr_nodes; prefix = 0; } // if there is an upper bound, scan the index pages looking for it if (retrieval->irb_upper_count) { while (scan(tdbb, node, bitmap, prefix, &upper, (USHORT) (retrieval->irb_generic & (irb_partial | irb_descending | irb_starting | irb_equality)))) { page = (BTR) CCH_HANDOFF(tdbb, &window, page->btr_sibling, LCK_read, pag_index); node = page->btr_nodes; prefix = 0; } } else { // if there isn't an upper bound, just walk the index to the end of the level SLONG number; while (true) { number = get_long(node->btn_number); if (number == END_LEVEL) { break; } if (number != END_BUCKET) { SBM_set(tdbb, bitmap, number); node = NEXT_NODE(node); continue; } page = (BTR) CCH_HANDOFF(tdbb, &window, page->btr_sibling, LCK_read, pag_index); node = page->btr_nodes; } } CCH_RELEASE(tdbb, &window); } BTN BTR_find_leaf(BTR bucket, KEY * key, UCHAR * value, USHORT * return_value, int descending, bool retrieval) { /************************************** * * B T R _ f i n d _ l e a f * ************************************** * * Functional description * Locate and return a pointer to the insertion point. * If the key doesn't belong in this bucket, return NULL. * A flag indicates the index is descending. * **************************************/ UCHAR *key_end, *node_end; UCHAR *p, *q, *r; USHORT l; SLONG number; BTN node = bucket->btr_nodes; USHORT prefix = 0; p = key->key_data; key_end = p + key->key_length; // If this is an non-leaf bucket of a descending index, the dummy node on the // front will trip us up. NOTE: This code may be apocryphal. I don't see // anywhere that a dummy node is stored for a descending index. - deej if (bucket->btr_level && descending && !node->btn_length) { node = NEXT_NODE(node); } while (true) { // Pick up data from node if (value && (l = node->btn_length)) { r = value + node->btn_prefix; q = node->btn_data; do { *r++ = *q++; } while (--l); } // If the page/record number is -1, the node is the last in the level // and, by definition, is the insertion point. Otherwise, if the // prefix of the current node is less than the running prefix, the // node must have a value greater than the key, so it is the insertion // point. number = get_long(node->btn_number); if (number == END_LEVEL || node->btn_prefix < prefix) { if (return_value) { *return_value = prefix; } return node; } // If the node prefix is greater than current prefix , it must be less // than the key, so we can skip it. If it has zero length, then // it is a duplicate, and can also be skipped. if (node->btn_prefix == prefix) { q = node->btn_data; node_end = q + node->btn_length; if (descending) { while (true) { if (q == node_end || retrieval && p == key_end) { goto done; } else if (p == key_end || *p > *q) { break; } else if (*p++ < *q++) { goto done; } } } else if (node->btn_length > 0) { while (true) { if (p == key_end) { goto done; } else if (q == node_end || *p > *q) { break; } else if (*p++ < *q++) { goto done; } } } prefix = (p - key->key_data); } // this part of the code moved up for IGNORE_NULL... if (number == END_BUCKET) { return NULL; } node = NEXT_NODE(node); } done: if (return_value) { *return_value = prefix; } return node; } BTR BTR_find_page(TDBB tdbb, IRB retrieval, WIN * window, IDX * idx, KEY * lower, KEY * upper, bool backwards) { /************************************** * * B T R _ f i n d _ p a g e * ************************************** * * Functional description * Initialize for an index retrieval. * **************************************/ SET_TDBB(tdbb); // Generate keys before we get any pages locked to avoid unwind // problems -- if we already have a key, assume that we // are looking for an equality if (retrieval->irb_key) { copy_key(retrieval->irb_key, lower); copy_key(retrieval->irb_key, upper); } else { if (retrieval->irb_upper_count) BTR_make_key(tdbb, retrieval->irb_upper_count, retrieval->irb_value + retrieval->irb_desc.idx_count, &retrieval->irb_desc, upper, (USHORT) (retrieval->irb_generic & irb_starting)); if (retrieval->irb_lower_count) BTR_make_key(tdbb, retrieval->irb_lower_count, retrieval->irb_value, &retrieval->irb_desc, lower, (USHORT) (retrieval->irb_generic & irb_starting)); } window->win_page = retrieval->irb_relation->rel_index_root; IRT rpage = (IRT) CCH_FETCH(tdbb, window, LCK_read, pag_root); if (!BTR_description (retrieval->irb_relation, rpage, idx, retrieval->irb_index)) { CCH_RELEASE(tdbb, window); IBERROR(260); // msg 260 index unexpectedly deleted } BTR page = (BTR) CCH_HANDOFF(tdbb, window, idx->idx_root, LCK_read, pag_index); // If there is a starting descriptor, search down index to starting position. // This may involve sibling buckets if splits are in progress. If there // isn't a starting descriptor, walk down the left side of the index (right // side if we are going backwards). SLONG number; if ((!backwards && retrieval->irb_lower_count) || (backwards && retrieval->irb_upper_count)) { while (page->btr_level > 0) { while (true) { number = find_node(page, backwards ? upper : lower, (idx->idx_flags & idx_descending)); if (number != END_BUCKET) { page = (BTR) CCH_HANDOFF(tdbb, window, number, LCK_read, pag_index); break; } page = (BTR) CCH_HANDOFF(tdbb, window, page->btr_sibling, LCK_read, pag_index); } } } else { BTN node; while (page->btr_level > 0) { #ifdef SCROLLABLE_CURSORS if (backwards) node = BTR_last_node(page, NAV_expand_index(window, 0), 0); else #endif node = page->btr_nodes; number = get_long(node->btn_number); page = (BTR) CCH_HANDOFF(tdbb, window, number, LCK_read, pag_index); // make sure that we are actually on the last page on this // level when scanning in the backward direction if (backwards) { while (page->btr_sibling) { page = (BTR) CCH_HANDOFF(tdbb, window, page->btr_sibling, LCK_read, pag_index); } } } } return page; } void BTR_insert(TDBB tdbb, WIN * root_window, IIB * insertion) { /************************************** * * B T R _ i n s e r t * ************************************** * * Functional description * Insert a node into an index. * **************************************/ DBB dbb = tdbb->tdbb_database; IDX *idx; WIN window; idx = insertion->iib_descriptor; window.win_page = idx->idx_root; window.win_flags = 0; BTR bucket = (BTR) CCH_FETCH(tdbb, &window, LCK_read, pag_index); if (bucket->btr_level == 0) { CCH_RELEASE(tdbb, &window); CCH_FETCH(tdbb, &window, LCK_write, pag_index); } CCH_RELEASE(tdbb, root_window); KEY key; SLONG split_page = add_node(tdbb, &window, insertion, &key, NULL, NULL); if (split_page == 0) { return; } // The top of the index has split. We need to make a new level and // update the index root page. Oh boy. IRT root = (IRT) CCH_FETCH(tdbb, root_window, LCK_write, pag_root); window.win_page = root->irt_rpt[idx->idx_id].irt_root; bucket = (BTR) CCH_FETCH(tdbb, &window, LCK_write, pag_index); // the original page was marked as not garbage-collectable, but // since it is the root page it won't be garbage-collected anyway, // so go ahead and mark it as garbage-collectable now. CCH_MARK(tdbb, &window); bucket->btr_header.pag_flags &= ~btr_dont_gc; WIN new_window; new_window.win_page = split_page; new_window.win_flags = 0; BTR new_bucket = (BTR) CCH_FETCH(tdbb, &new_window, LCK_read, pag_index); if (bucket->btr_level != new_bucket->btr_level) { CCH_RELEASE(tdbb, &new_window); CCH_RELEASE(tdbb, &window); CORRUPT(204); // msg 204 index inconsistent } CCH_RELEASE(tdbb, &new_window); CCH_RELEASE(tdbb, &window); // Allocate and format new bucket new_bucket = (BTR) DPM_allocate(tdbb, &new_window); CCH_precedence(tdbb, &new_window, window.win_page); new_bucket->btr_header.pag_type = pag_index; new_bucket->btr_relation = bucket->btr_relation; new_bucket->btr_level = bucket->btr_level + 1; new_bucket->btr_id = bucket->btr_id; new_bucket->btr_header.pag_flags |= (bucket->btr_header.pag_flags & btr_descending); // Set up first node as degenerate, but pointing to first bucket on // next level. BTN node = new_bucket->btr_nodes; quad_put(window.win_page, node->btn_number); node->btn_prefix = 0; node->btn_length = 0; node = NEXT_NODE(node); // Move in the split node UCHAR *p, *q; USHORT l; quad_put(split_page, node->btn_number); node->btn_prefix = 0; fb_assert(key.key_length <= MAX_UCHAR); l = node->btn_length = (UCHAR) key.key_length; q = node->btn_data; p = key.key_data; if (l) { do { MOVE_BYTE(p, q); } while (--l); } node = NEXT_NODE(node); // mark end of level node->btn_prefix = 0; node->btn_length = 0; quad_put((SLONG) END_LEVEL, node->btn_number); node = NEXT_NODE(node); new_bucket->btr_length = (UCHAR *) node - (UCHAR *) new_bucket; // update the root page to point to the new top-level page, // and make sure the new page has higher precedence so that // it will be written out first--this will make sure that the // root page doesn't point into space CCH_RELEASE(tdbb, &new_window); CCH_precedence(tdbb, root_window, new_window.win_page); CCH_MARK(tdbb, root_window); root->irt_rpt[idx->idx_id].irt_root = new_window.win_page; // journal root page change if (dbb->dbb_wal) { JRNRP journal; journal.jrnrp_type = JRNP_ROOT_PAGE; journal.jrnrp_id = idx->idx_id; journal.jrnrp_page = new_window.win_page; CCH_journal_record(tdbb, root_window, (UCHAR *) & journal, JRNRP_SIZE, 0, 0); } CCH_RELEASE(tdbb, root_window); } IDX_E BTR_key(TDBB tdbb, JRD_REL relation, REC record, IDX * idx, KEY * key, idx_null_state * null_state) { /************************************** * * B T R _ k e y * ************************************** * * Functional description * Compute a key from an record and an index descriptor. * Note that compound keys are expanded by 25%. If this * changes, both BTR_key_length and GDEF exe.e have to * change. * **************************************/ KEY temp; DSC desc, *desc_ptr; SSHORT stuff_count; USHORT n, l; UCHAR *p, *q; IDX_E result; idx::idx_repeat * tail; bool missing; int missing_unique_segments = 0; result = idx_e_ok; tail = idx->idx_rpt; try { // Special case single segment indices if (idx->idx_count == 1) { #ifdef EXPRESSION_INDICES // for expression indices, compute the value of the expression if (idx->idx_expression) { JRD_REQ current_request; current_request = tdbb->tdbb_request; tdbb->tdbb_request = idx->idx_expression_request; tdbb->tdbb_request->req_rpb[0].rpb_record = record; if (!(desc_ptr = EVL_expr(tdbb, idx->idx_expression))) { desc_ptr = &idx->idx_expression_desc; } missing = ((tdbb->tdbb_request->req_flags & req_null) == req_null); tdbb->tdbb_request = current_request; } else #endif { desc_ptr = &desc; // In order to "map a null to a default" value (in EVL_field()), // the relation block is referenced. // Reference: Bug 10116, 10424 // missing = !EVL_field(relation, record, tail->idx_field, desc_ptr); } if (missing && (idx->idx_flags & idx_unique)) { missing_unique_segments++; } compress(tdbb, desc_ptr, key, tail->idx_itype, missing, (idx->idx_flags & idx_descending), (USHORT) FALSE); } else { p = key->key_data; stuff_count = 0; for (n = 0; n < idx->idx_count; n++, tail++) { for (; stuff_count; --stuff_count) { *p++ = 0; } desc_ptr = &desc; // In order to "map a null to a default" value (in EVL_field()), // the relation block is referenced. // Reference: Bug 10116, 10424 missing = !EVL_field(relation, record, tail->idx_field, desc_ptr); if (missing && (idx->idx_flags & idx_unique)) { missing_unique_segments++; } compress(tdbb, desc_ptr, &temp, tail->idx_itype, missing, (idx->idx_flags & idx_descending), (USHORT) FALSE); for (q = temp.key_data, l = temp.key_length; l; --l, --stuff_count) { if (stuff_count == 0) { *p++ = idx->idx_count - n; stuff_count = STUFF_COUNT; } *p++ = *q++; } } key->key_length = (p - key->key_data); } if (key->key_length >= MAX_KEY) { result = idx_e_keytoobig; } if (idx->idx_flags & idx_descending) { complement_key(key); } if (null_state) { *null_state = !missing_unique_segments ? idx_nulls_none : (missing_unique_segments == idx->idx_count) ? idx_nulls_all : idx_nulls_some; } return result; } // try catch(const std::exception&) { key->key_length = 0; return idx_e_conversion; } } USHORT BTR_key_length(JRD_REL relation, IDX * idx) { /************************************** * * B T R _ k e y _ l e n g t h * ************************************** * * Functional description * Compute the maximum key length for an index. * **************************************/ FMT format; USHORT n, key_length, length; idx::idx_repeat * tail; TDBB tdbb; tdbb = GET_THREAD_DATA; format = MET_current(tdbb, relation); tail = idx->idx_rpt; // If there is only a single key, the computation is straightforward. if (idx->idx_count == 1) { if (tail->idx_itype == idx_numeric || tail->idx_itype == idx_timestamp1) { return sizeof(double); } if (tail->idx_itype == idx_sql_time) { return sizeof(ULONG); } if (tail->idx_itype == idx_sql_date) { return sizeof(SLONG); } if (tail->idx_itype == idx_timestamp2) { return sizeof(SINT64); } if (tail->idx_itype == idx_numeric2) { return INT64_KEY_LENGTH; } #ifdef EXPRESSION_INDICES if (idx->idx_expression) { length = idx->idx_expression_desc.dsc_length; if (idx->idx_expression_desc.dsc_dtype == dtype_varying) { length = length - sizeof(SSHORT); } } else #endif { length = format->fmt_desc[tail->idx_field].dsc_length; if (format->fmt_desc[tail->idx_field].dsc_dtype == dtype_varying) { length = length - sizeof(SSHORT); } } if (tail->idx_itype >= idx_first_intl_string) { return INTL_key_length(tdbb, tail->idx_itype, length); } else { return length; } } // Compute length of key for segmented indices. key_length = 0; for (n = 0; n < idx->idx_count; n++, tail++) { if (tail->idx_itype == idx_numeric || tail->idx_itype == idx_timestamp1) { length = sizeof(double); } else if (tail->idx_itype == idx_sql_time) { length = sizeof(ULONG); } else if (tail->idx_itype == idx_sql_date) { length = sizeof(ULONG); } else if (tail->idx_itype == idx_timestamp2) { length = sizeof(SINT64); } else if (tail->idx_itype == idx_numeric2) { length = INT64_KEY_LENGTH; } else { length = format->fmt_desc[tail->idx_field].dsc_length; if (format->fmt_desc[tail->idx_field].dsc_dtype == dtype_varying) { length -= sizeof(SSHORT); } if (tail->idx_itype >= idx_first_intl_string) { length = INTL_key_length(tdbb, tail->idx_itype, length); } } key_length += ((length + STUFF_COUNT - 1) / STUFF_COUNT) * (STUFF_COUNT + 1); } return key_length; } #ifdef SCROLLABLE_CURSORS BTN BTR_last_node(BTR page, EXP expanded_page, BTX * expanded_node) { /************************************** * * B T R _ l a s t _ n o d e * ************************************** * * Functional description * Find the last node on a page. Used when walking * down the right side of an index tree. * **************************************/ BTN node, prior; SLONG number; BTX enode; /* the last expanded node is always at the end of the page minus the size of a BTX, since there is always an extra BTX node with zero-length tail at the end of the page */ enode = (BTX) ((UCHAR *) expanded_page + expanded_page->exp_length - BTX_SIZE); node = (BTN) ((UCHAR *) page + page->btr_length); /* starting at the end of the page, find the first node that is not an end marker */ while (true) { node = BTR_previous_node(node, &enode); number = get_long(node->btn_number); if (number != END_BUCKET && number != END_LEVEL) { if (expanded_node) *expanded_node = enode; return node; } } } #endif #ifdef SCROLLABLE_CURSORS BTR BTR_left_handoff(TDBB tdbb, WIN * window, BTR page, SSHORT lock_level) { /************************************** * * B T R _ l e f t _ h a n d o f f * ************************************** * * Functional description * Handoff a btree page to the left. This is more difficult than a * right handoff because we have to traverse pages without handing * off locks. (A lock handoff to the left while someone was handing * off to the right could result in deadlock.) * **************************************/ SLONG original_page, sibling, left_sibling; WIN fix_win; BTR fix_page; DBB dbb; SET_TDBB(tdbb); dbb = tdbb->tdbb_database; CHECK_DBB(dbb); original_page = window->win_page; left_sibling = page->btr_left_sibling; CCH_RELEASE(tdbb, window); window->win_page = left_sibling; page = (BTR) CCH_FETCH(tdbb, window, lock_level, pag_index); if ((sibling = page->btr_sibling) == original_page) return page; /* Since we are not handing off pages, a page could split before we get to it. * To detect this case, fetch the left sibling pointer and then handoff right * sibling pointers until we reach the page to the left of the page passed * to us. */ while (sibling != original_page) { page = (BTR) CCH_HANDOFF(tdbb, window, page->btr_sibling, lock_level, pag_index); sibling = page->btr_sibling; } fix_win.win_page = original_page; fix_win.win_flags = 0; fix_page = (BTR) CCH_FETCH(tdbb, &fix_win, LCK_write, pag_index); /* if someone else already fixed it, just return */ if (fix_page->btr_left_sibling == window->win_page) { CCH_RELEASE(tdbb, &fix_win); return page; } CCH_MARK(tdbb, &fix_win); fix_page->btr_left_sibling = window->win_page; if (dbb->dbb_journal) CCH_journal_page(tdbb, &fix_win); CCH_RELEASE(tdbb, &fix_win); return page; } #endif USHORT BTR_lookup(TDBB tdbb, JRD_REL relation, USHORT id, IDX * buffer) { /************************************** * * B T R _ l o o k u p * ************************************** * * Functional description * Return a description of the specified index. * **************************************/ WIN window; IRT root; SET_TDBB(tdbb); window.win_flags = 0; if (!(root = fetch_root(tdbb, &window, relation))) { return FB_FAILURE; } if ((id >= root->irt_count) || !BTR_description(relation, root, buffer, id)) { CCH_RELEASE(tdbb, &window); return FB_FAILURE; } CCH_RELEASE(tdbb, &window); return FB_SUCCESS; } void BTR_make_key(TDBB tdbb, USHORT count, JRD_NOD * exprs, IDX * idx, KEY * key, USHORT fuzzy) { /************************************** * * B T R _ m a k e _ k e y * ************************************** * * Functional description * Construct a (possibly) compound search key given a key count, * a vector of value expressions, and a place to put the key. * **************************************/ DSC *desc, temp_desc; SSHORT stuff_count; USHORT n, l; UCHAR *p, *q; KEY temp; bool missing; idx::idx_repeat * tail; SET_TDBB(tdbb); fb_assert(count > 0); fb_assert(idx != NULL); fb_assert(exprs != NULL); fb_assert(key != NULL); tail = idx->idx_rpt; // If the index is a single segment index, don't sweat the compound // stuff. if (idx->idx_count == 1) { desc = eval(tdbb, *exprs, &temp_desc, &missing); compress(tdbb, desc, key, tail->idx_itype, missing, (idx->idx_flags & idx_descending), fuzzy); } else { // Make a compound key p = key->key_data; stuff_count = 0; for (n = 0; n < count; n++, tail++) { for (; stuff_count; --stuff_count) { *p++ = 0; } desc = eval(tdbb, *exprs++, &temp_desc, &missing); compress(tdbb, desc, &temp, tail->idx_itype, missing, (idx->idx_flags & idx_descending), (USHORT) ((n == count - 1) ? fuzzy : FALSE)); for (q = temp.key_data, l = temp.key_length; l; --l, --stuff_count) { if (stuff_count == 0) { *p++ = idx->idx_count - n; stuff_count = STUFF_COUNT; } *p++ = *q++; } } key->key_length = (p - key->key_data); } if (idx->idx_flags & idx_descending) { complement_key(key); } } bool BTR_next_index(TDBB tdbb, JRD_REL relation, JRD_TRA transaction, IDX * idx, WIN * window) { /************************************** * * B T R _ n e x t _ i n d e x * ************************************** * * Functional description * Get next index for relation. Index ids * recently change from UCHAR to SHORT * **************************************/ IRT root; SSHORT id; SLONG trans; int trans_state; irt::irt_repeat * irt_desc; SET_TDBB(tdbb); if ((USHORT)idx->idx_id == (USHORT)-1) { id = 0; window->win_bdb = NULL; } else { id = idx->idx_id + 1; } if (window->win_bdb) { root = (IRT) window->win_buffer; } else if (!(root = fetch_root(tdbb, window, relation))) { return false; } for (; id < root->irt_count; ++id) { irt_desc = root->irt_rpt + id; if (!irt_desc->irt_root && (irt_desc->irt_flags & irt_in_progress) && transaction) { trans = irt_desc->irt_stuff.irt_transaction; CCH_RELEASE(tdbb, window); trans_state = TRA_wait(tdbb, transaction, trans, TRUE); if ((trans_state == tra_dead) || (trans_state == tra_committed)) { /* clean up this left-over index */ root = (IRT) CCH_FETCH(tdbb, window, LCK_write, pag_root); irt_desc = root->irt_rpt + id; if (!irt_desc->irt_root && irt_desc->irt_stuff.irt_transaction == trans && (irt_desc->irt_flags & irt_in_progress)) { BTR_delete_index(tdbb, window, id); } else { CCH_RELEASE(tdbb, window); } root = (IRT) CCH_FETCH(tdbb, window, LCK_read, pag_root); continue; } else { root = (IRT) CCH_FETCH(tdbb, window, LCK_read, pag_root); } } if (BTR_description(relation, root, idx, id)) { return true; } } CCH_RELEASE(tdbb, window); return false; } BTN BTR_next_node(BTN node, BTX * expanded_node) { /************************************** * * B T R _ n e x t _ n o d e * ************************************** * * Functional description * Find the next node on both the index page * and its associated expanded buffer. * **************************************/ if (*expanded_node) { *expanded_node = NEXT_EXPANDED((*expanded_node), node); } return NEXT_NODE(node); } BTN BTR_previous_node(BTN node, BTX * expanded_node) { /************************************** * * B T R _ p r e v i o u s _ n o d e * ************************************** * * Functional description * Find the previous node on a page. Used when walking * an index backwards. * **************************************/ node = (BTN) ((UCHAR *) node - (*expanded_node)->btx_btr_previous_length - BTN_SIZE); *expanded_node = (BTX) ((UCHAR *) * expanded_node - (*expanded_node)->btx_previous_length - BTX_SIZE); return node; } void BTR_remove(TDBB tdbb, WIN * root_window, IIB * insertion) { /************************************** * * B T R _ r e m o v e * ************************************** * * Functional description * Remove an index node from a b-tree. * If the node doesn't exist, don't get overly excited. * **************************************/ DBB dbb = tdbb->tdbb_database; IDX *idx; idx = insertion->iib_descriptor; WIN window; window.win_page = idx->idx_root; window.win_flags = 0; BTR page = (BTR) CCH_FETCH(tdbb, &window, LCK_read, pag_index); // If the page is level 0, re-fetch it for write UCHAR level = page->btr_level; if (level == 0) { CCH_RELEASE(tdbb, &window); CCH_FETCH(tdbb, &window, LCK_write, pag_index); } // remove the node from the index tree via recursive descent CONTENTS result = remove_node(tdbb, insertion, &window); // if the root page points at only one lower page, remove this // level to prevent the tree from being deeper than necessary-- // do this only if the level is greater than 1 to prevent // excessive thrashing in the case where a small table is // constantly being loaded and deleted. if ((result == contents_single) && (level > 1)) { // we must first release the windows to obtain the root for write // without getting deadlocked CCH_RELEASE(tdbb, &window); CCH_RELEASE(tdbb, root_window); IRT root = (IRT) CCH_FETCH(tdbb, root_window, LCK_write, pag_root); page = (BTR) CCH_FETCH(tdbb, &window, LCK_write, pag_index); // get the page number of the child, and check to make sure // the page still has only one node on it BTN node = page->btr_nodes; SLONG number = get_long(node->btn_number); node = NEXT_NODE(node); if (get_long(node->btn_number) >= 0) { CCH_RELEASE(tdbb, &window); CCH_RELEASE(tdbb, root_window); return; } CCH_MARK(tdbb, root_window); root->irt_rpt[idx->idx_id].irt_root = number; // journal root page change if (dbb->dbb_wal) { JRNRP journal; journal.jrnrp_type = JRNP_ROOT_PAGE; journal.jrnrp_id = idx->idx_id; journal.jrnrp_page = number; CCH_journal_record(tdbb, root_window, (UCHAR *) & journal, JRNRP_SIZE, 0, 0); } // release the pages, and place the page formerly at the top level // on the free list, making sure the root page is written out first // so that we're not pointing to a released page CCH_RELEASE(tdbb, root_window); CCH_RELEASE(tdbb, &window); PAG_release_page(window.win_page, root_window->win_page); } if (window.win_bdb) { CCH_RELEASE(tdbb, &window); } if (root_window->win_bdb) { CCH_RELEASE(tdbb, root_window); } } void BTR_reserve_slot(TDBB tdbb, JRD_REL relation, JRD_TRA transaction, IDX * idx) { /************************************** * * B T R _ r e s e r v e _ s l o t * ************************************** * * Functional description * Reserve a slot on an index root page * in preparation to index creation. * **************************************/ SET_TDBB(tdbb); DBB dbb = tdbb->tdbb_database; CHECK_DBB(dbb); // Get root page, assign an index id, and store the index descriptor. // Leave the root pointer null for the time being. WIN window; window.win_page = relation->rel_index_root; window.win_flags = 0; IRT root = (IRT) CCH_FETCH(tdbb, &window, LCK_write, pag_root); CCH_MARK(tdbb, &window); // check that we create no more indexes than will fit on a single root page if (root->irt_count > dbb->dbb_max_idx) { CCH_RELEASE(tdbb, &window); ERR_post(isc_no_meta_update, isc_arg_gds, isc_max_idx, isc_arg_number, (SLONG) dbb->dbb_max_idx, 0); } // Scan the index page looking for the high water mark of the descriptions and, // perhaps, an empty index slot IRTD *desc; USHORT l, space; irt::irt_repeat * root_idx, *end, *slot; bool maybe_no_room = false; retry: l = idx->idx_count * sizeof(IRTD); space = dbb->dbb_page_size; slot = NULL; for (root_idx = root->irt_rpt, end = root_idx + root->irt_count; root_idx < end; root_idx++) { if (root_idx->irt_root || (root_idx->irt_flags & irt_in_progress)) { space = MIN(space, root_idx->irt_desc); } if (!root_idx->irt_root && !slot && !(root_idx->irt_flags & irt_in_progress)) { slot = root_idx; } } space -= l; desc = (IRTD *) ((UCHAR *) root + space); // Verify that there is enough room on the Index root page. if (desc < (IRTD *) (end + 1)) { // Not enough room: Attempt to compress the index root page and try again. // If this is the second try already, then there really is no more room. if (maybe_no_room) { CCH_RELEASE(tdbb, &window); ERR_post(isc_no_meta_update, isc_arg_gds, isc_index_root_page_full, 0); } compress_root(tdbb, root); maybe_no_room = true; goto retry; } // If we didn't pick up an empty slot, allocate a new one if (!slot) { slot = end; root->irt_count++; } idx->idx_id = slot - root->irt_rpt; slot->irt_desc = space; fb_assert(idx->idx_count <= MAX_UCHAR); slot->irt_keys = (UCHAR) idx->idx_count; slot->irt_flags = idx->idx_flags | irt_in_progress; if (transaction) { slot->irt_stuff.irt_transaction = transaction->tra_number; } slot->irt_root = 0; MOVE_FASTER(idx->idx_rpt, desc, l); if (dbb->dbb_wal) { CCH_journal_page(tdbb, &window); } CCH_RELEASE(tdbb, &window); } float BTR_selectivity(TDBB tdbb, JRD_REL relation, USHORT id) { /************************************** * * B T R _ s e l e c t i v i t y * ************************************** * * Functional description * Update index selectivity on the fly. * Note that index leaf pages are walked * without visiting data pages. Thus the * effects of uncommitted transactions * will be included in the calculation. * **************************************/ SET_TDBB(tdbb); WIN window; window.win_flags = 0; IRT root = fetch_root(tdbb, &window, relation); if (!root) { return 0.0; } SLONG page = root->irt_rpt[id].irt_root; if (root->irt_count <= id || !page) { CCH_RELEASE(tdbb, &window); return 0.0; } window.win_flags = WIN_large_scan; window.win_scans = 1; BTR bucket = (BTR) CCH_HANDOFF(tdbb, &window, page, LCK_read, pag_index); // go down the left side of the index to leaf level BTN node; while (bucket->btr_level) { node = bucket->btr_nodes; page = get_long(node->btn_number); bucket = (BTR) CCH_HANDOFF(tdbb, &window, page, LCK_read, pag_index); } bool dup; SLONG nodes = 0; SLONG duplicates = 0; KEY key; key.key_length = 0; SSHORT l; UCHAR *p, *q; // go through all the leaf nodes and count them; // also count how many of them are duplicates while (page) { for (node = bucket->btr_nodes;; node = NEXT_NODE(node)) { page = get_long(node->btn_number); if (page < 0) { break; } ++nodes; l = node->btn_length + node->btn_prefix; // figure out if this is a duplicate if (node == bucket->btr_nodes) { dup = key_equality(&key, node); } else { dup = (!node->btn_length && (l == key.key_length)); } if (dup) { ++duplicates; } // keep the key value current for comparison with the next key key.key_length = l; if ( (l = node->btn_length) ) { p = key.key_data + node->btn_prefix; q = node->btn_data; do { *p++ = *q++; } while (--l); } } if (page == END_LEVEL || !(page = bucket->btr_sibling)) { break; } bucket = (BTR) CCH_HANDOFF_TAIL(tdbb, &window, page, LCK_read, pag_index); } CCH_RELEASE_TAIL(tdbb, &window); // calculate the selectivity and store it on the root page float selectivity = (float) ((nodes) ? 1.0 / (float) (nodes - duplicates) : 0.0); window.win_page = relation->rel_index_root; window.win_flags = 0; root = (IRT) CCH_FETCH(tdbb, &window, LCK_write, pag_root); CCH_MARK(tdbb, &window); root->irt_rpt[id].irt_stuff.irt_selectivity = selectivity; CCH_RELEASE(tdbb, &window); return selectivity; } static SLONG add_node(TDBB tdbb, WIN * window, IIB * insertion, KEY * new_key, SLONG * original_page, SLONG * sibling_page) { /************************************** * * a d d _ n o d e * ************************************** * * Functional description * Insert a node in an index. This recurses to the leaf level. * If a split occurs, return the new index page number and its * leading string. * **************************************/ SLONG split; BTR bucket = (BTR) window->win_buffer; // For leaf level guys, loop thru the leaf buckets until insertion // point is found (should be instant) if (bucket->btr_level == 0) { while (true) { split = insert_node(tdbb, window, insertion, new_key, original_page, sibling_page); if (split >= 0) { return split; } else { bucket = (BTR) CCH_HANDOFF(tdbb, window, bucket->btr_sibling, LCK_write, pag_index); } } } // If we're above the leaf level, find the appropriate node in the chain of sibling pages. // Hold on to this position while we recurse down to the next level, in case there's a // split at the lower level, in which case we need to insert the new page at this level. SLONG page; while (true) { page = find_node(bucket, insertion->iib_key, (insertion->iib_descriptor->idx_flags & idx_descending)); if (page != END_BUCKET) { break; } bucket = (BTR) CCH_HANDOFF(tdbb, window, bucket->btr_sibling, LCK_read, pag_index); } // Fetch the page at the next level down. If the next level is leaf level, // fetch for write since we know we are going to write to the page (most likely). SLONG index = window->win_page; CCH_HANDOFF(tdbb, window, page, (SSHORT) ((bucket->btr_level == 1) ? LCK_write : LCK_read), pag_index); // now recursively try to insert the node at the next level down IIB propogate; split = add_node(tdbb, window, insertion, new_key, &page, &propogate.iib_sibling); if (split == 0) { return 0; } // The page at the lower level split, so we need to insert a pointer // to the new page to the page at this level. window->win_page = index; bucket = (BTR) CCH_FETCH(tdbb, window, LCK_write, pag_index); propogate.iib_number = split; propogate.iib_descriptor = insertion->iib_descriptor; propogate.iib_relation = insertion->iib_relation; propogate.iib_duplicates = NULL; propogate.iib_key = new_key; // now loop through the sibling pages trying to find the appropriate // place to put the pointer to the lower level page--remember that the // page we were on could have split while we weren't looking SLONG original_page2; SLONG sibling_page2; while (true) { split = insert_node(tdbb, window, &propogate, new_key, &original_page2, &sibling_page2); if (split >= 0) { break; } else { bucket = (BTR) CCH_HANDOFF(tdbb, window, bucket->btr_sibling, LCK_write, pag_index); } } // the split page on the lower level has been propogated, so we can go back to // the page it was split from, and mark it as garbage-collectable now window->win_page = page; bucket = (BTR) CCH_FETCH(tdbb, window, LCK_write, pag_index); CCH_MARK(tdbb, window); bucket->btr_header.pag_flags &= ~btr_dont_gc; CCH_RELEASE(tdbb, window); if (original_page) { *original_page = original_page2; } if (sibling_page) { *sibling_page = sibling_page2; } return split; } static void complement_key(KEY * key) { /************************************** * * c o m p l e m e n t _ k e y * ************************************** * * Functional description * Negate a key for descending index. * **************************************/ UCHAR *p, *end; for (p = key->key_data, end = p + key->key_length; p < end; p++) { *p ^= -1; } } static void compress(TDBB tdbb, DSC * desc, KEY * key, USHORT itype, bool missing, bool descending, USHORT fuzzy) { /************************************** * * c o m p r e s s * ************************************** * * Functional description * Compress a data value into an index key. * **************************************/ UCHAR *q, *p; USHORT length; UCHAR pad, *ptr; union { INT64_KEY temp_int64_key; double temp_double; ULONG temp_ulong; SLONG temp_slong; SINT64 temp_sint64; UCHAR temp_char[sizeof(INT64_KEY)]; } temp; USHORT temp_copy_length; bool temp_is_negative = false; bool int64_key_op = false; SET_TDBB(tdbb); DBB dbb = tdbb->tdbb_database; CHECK_DBB(dbb); p = key->key_data; if (missing && dbb->dbb_ods_version >= ODS_VERSION7) { pad = 0; if (!descending) { pad ^= -1; } if (itype == idx_numeric || itype == idx_timestamp1) { length = sizeof(double); } else if (itype == idx_sql_time) { length = sizeof(ULONG); } else if (itype == idx_sql_date) { length = sizeof(SLONG); } else if (itype == idx_timestamp2) { length = sizeof(SINT64); } else if (itype == idx_numeric2) { length = INT64_KEY_LENGTH; } else { length = desc->dsc_length; if (desc->dsc_dtype == dtype_varying) { length -= sizeof(SSHORT); } if (itype >= idx_first_intl_string) { length = INTL_key_length(tdbb, itype, length); } } length = (length > sizeof(key->key_data)) ? sizeof(key->key_data) : length; while (length--) { *p++ = pad; } key->key_length = (p - key->key_data); return; } if (itype == idx_string || itype == idx_byte_array || itype == idx_metadata || itype >= idx_first_intl_string) { UCHAR temp1[MAX_KEY]; pad = (itype == idx_string) ? ' ' : 0; if (missing) { length = 0; } else if (itype >= idx_first_intl_string || itype == idx_metadata) { DSC to; // convert to an international byte array to.dsc_dtype = dtype_text; to.dsc_flags = 0; to.dsc_sub_type = 0; to.dsc_scale = 0; to.dsc_ttype = ttype_sort_key; to.dsc_length = sizeof(temp1); ptr = to.dsc_address = temp1; length = INTL_string_to_key(tdbb, itype, desc, &to, fuzzy); } else { USHORT ttype; length = MOV_get_string_ptr(desc, &ttype, &ptr, (VARY *) temp1, MAX_KEY); } if (length) { if (length > sizeof(key->key_data)) { length = sizeof(key->key_data); } do { *p++ = *ptr++; } while (--length); } else { *p++ = pad; } while (p > key->key_data) { if (*--p != pad) { break; } } key->key_length = p + 1 - key->key_data; return; } // The index is numeric. // For idx_numeric... // Convert the value to a double precision number, // then zap it to compare in a byte-wise order. // For idx_numeric2... // Convert the value to a INT64_KEY struct, // then zap it to compare in a byte-wise order. temp_copy_length = sizeof(double); if (missing) { memset(&temp, 0, sizeof(temp)); } if (itype == idx_timestamp1) { temp.temp_double = MOV_date_to_double(desc); temp_is_negative = (temp.temp_double < 0); #ifdef DEBUG_INDEXKEY ib_fprintf(ib_stderr, "TIMESTAMP1 %lf ", temp.temp_double); #endif } else if (itype == idx_timestamp2) { GDS_TIMESTAMP timestamp; timestamp = MOV_get_timestamp(desc); #define SECONDS_PER_DAY ((ULONG) 24 * 60 * 60) temp.temp_sint64 = ((SINT64) (timestamp.timestamp_date) * (SINT64) (SECONDS_PER_DAY * ISC_TIME_SECONDS_PRECISION)) + (SINT64) (timestamp.timestamp_time); temp_copy_length = sizeof(SINT64); temp_is_negative = (temp.temp_sint64 < 0); #ifdef DEBUG_INDEXKEY ib_fprintf(ib_stderr, "TIMESTAMP2: %d:%u ", ((SLONG *) desc->dsc_address)[0], ((ULONG *) desc->dsc_address)[1]); ib_fprintf(ib_stderr, "TIMESTAMP2: %20" QUADFORMAT "d ", temp.temp_sint64); #endif } else if (itype == idx_sql_date) { temp.temp_slong = MOV_get_sql_date(desc); temp_copy_length = sizeof(SLONG); temp_is_negative = (temp.temp_slong < 0); #ifdef DEBUG_INDEXKEY ib_fprintf(ib_stderr, "DATE %d ", temp.temp_slong); #endif } else if (itype == idx_sql_time) { temp.temp_ulong = MOV_get_sql_time(desc); temp_copy_length = sizeof(ULONG); temp_is_negative = false; #ifdef DEBUG_INDEXKEY ib_fprintf(ib_stderr, "TIME %u ", temp.temp_ulong); #endif } else if (itype == idx_numeric2) { int64_key_op = true; temp.temp_int64_key = make_int64_key(MOV_get_int64(desc, desc->dsc_scale), desc->dsc_scale); temp_copy_length = sizeof(temp.temp_int64_key.d_part); temp_is_negative = (temp.temp_int64_key.d_part < 0); #ifdef DEBUG_INDEXKEY print_int64_key(*(SINT64 *) desc->dsc_address, desc->dsc_scale, temp.temp_int64_key); #endif } else if (desc->dsc_dtype == dtype_timestamp) { // This is the same as the pre v6 behavior. Basically, the // customer has created a NUMERIC index, and is probing into that // index using a TIMESTAMP value. // eg: WHERE anInteger = TIMESTAMP '1998-9-16' temp.temp_double = MOV_date_to_double(desc); temp_is_negative = (temp.temp_double < 0); #ifdef DEBUG_INDEXKEY ib_fprintf(ib_stderr, "TIMESTAMP1 special %lg ", temp.temp_double); #endif } else { temp.temp_double = MOV_get_double(desc); temp_is_negative = (temp.temp_double < 0); #ifdef DEBUG_INDEXKEY ib_fprintf(ib_stderr, "NUMERIC %lg ", temp.temp_double); #endif } #ifdef IEEE #ifndef WORDS_BIGENDIAN // For little-endian machines, reverse the order of bytes for the key // Copy the first set of bytes into key_data for (q = temp.temp_char + temp_copy_length, length = temp_copy_length; length; --length) { *p++ = *--q; } // Copy the next 2 bytes into key_data, if key is of an int64 type if (int64_key_op) { for (q = temp.temp_char + sizeof(double) + sizeof(SSHORT), length = sizeof(SSHORT); length; --length) { *p++ = *--q; } } #else // For big-endian machines, copy the bytes as laid down // Copy the first set of bytes into key_data for (q = temp.temp_char, length = temp_copy_length; length; --length) { *p++ = *q++; } // Copy the next 2 bytes into key_data, if key is of an int64 type if (int64_key_op) { for (q = temp.temp_char + sizeof(double), length = sizeof(SSHORT); length; --length) { *p++ = *q++; } } #endif /* !WORDS_BIGENDIAN */ #else /* IEEE */ // The conversion from G_FLOAT to D_FLOAT made below was removed because // it prevented users from entering otherwise valid numbers into a field // which was in an index. A D_FLOAT has the sign and 7 of 8 exponent // bits in the first byte and the remaining exponent bit plus the first // 7 bits of the mantissa in the second byte. For G_FLOATS, the sign // and 7 of 11 exponent bits go into the first byte, with the remaining // 4 exponent bits going into the second byte, with the first 4 bits of // the mantissa. Why this conversion was done is unknown, but it is // of limited utility, being useful for reducing the compressed field // length only for those values which have 0 for the last 6 bytes and // a nonzero value for the 5-7 bits of the mantissa. //*************************************************************** //#ifdef VMS //temp.temp_double = MTH$CVT_G_D (&temp.temp_double); //#endif //*************************************************************** *p++ = temp.temp_char[1]; *p++ = temp.temp_char[0]; *p++ = temp.temp_char[3]; *p++ = temp.temp_char[2]; *p++ = temp.temp_char[5]; *p++ = temp.temp_char[4]; *p++ = temp.temp_char[7]; *p++ = temp.temp_char[6]; #error compile_time_failure: #error Code needs to be written in the non - IEEE floating point case #error to handle the following: #error a) idx_sql_date, idx_sql_time, idx_timestamp2 b) idx_numeric2 #endif /* IEEE */ // Test the sign of the double precision number. Just to be sure, don't // rely on the byte comparison being signed. If the number is negative, // complement the whole thing. Otherwise just zap the sign bit. if (temp_is_negative) { ((SSHORT *) key->key_data)[0] = -((SSHORT *) key->key_data)[0] - 1; ((SSHORT *) key->key_data)[1] = -((SSHORT *) key->key_data)[1] - 1; ((SSHORT *) key->key_data)[2] = -((SSHORT *) key->key_data)[2] - 1; ((SSHORT *) key->key_data)[3] = -((SSHORT *) key->key_data)[3] - 1; } else { key->key_data[0] ^= 1 << 7; } // Complement the s_part for an int64 key. // If we just flip the sign bit, which is equivalent to adding 32768, the // short part will unsigned-compare correctly. if (int64_key_op) { key->key_data[8] ^= 1 << 7; } // Finally, chop off trailing binary zeros for (p = &key->key_data[(!int64_key_op) ? temp_copy_length - 1 : INT64_KEY_LENGTH - 1]; p > key->key_data; --p) { if (*p) { break; } } key->key_length = (p - key->key_data) + 1; #ifdef DEBUG_INDEXKEY { USHORT i; ib_fprintf(ib_stderr, "KEY: length: %d Bytes: ", key->key_length); for (i = 0; i < key->key_length; i++) ib_fprintf(ib_stderr, "%02x ", key->key_data[i]); ib_fprintf(ib_stderr, "\n"); } #endif } static USHORT compress_root(TDBB tdbb, IRT page) { /************************************** * * c o m p r e s s _ r o o t * ************************************** * * Functional description * Compress an index root page. * **************************************/ UCHAR *temp, *p; USHORT l; irt::irt_repeat * root_idx, *end; SET_TDBB(tdbb); DBB dbb = tdbb->tdbb_database; CHECK_DBB(dbb); temp = (UCHAR *) tdbb->tdbb_default->allocate((SLONG) dbb->dbb_page_size, 0 #ifdef DEBUG_GDS_ALLOC ,__FILE__,__LINE__ #endif ); MOVE_FASTER(page, temp, dbb->dbb_page_size); p = temp + dbb->dbb_page_size; for (root_idx = page->irt_rpt, end = root_idx + page->irt_count; root_idx < end; root_idx++) { if (root_idx->irt_root) { l = root_idx->irt_keys * sizeof(IRTD); p -= l; MOVE_FAST((SCHAR *) page + root_idx->irt_desc, p, l); root_idx->irt_desc = p - temp; } } l = p - temp; tdbb->tdbb_default->deallocate(temp); return l; } static USHORT compute_prefix(KEY * key, UCHAR * string, USHORT length) { /************************************** * * c o m p u t e _ p r e f i x * ************************************** * * Functional description * Compute and return prefix common to two strings. * **************************************/ UCHAR *p; USHORT l; if (!(l = MIN(key->key_length, length))) { return 0; } p = key->key_data; while (*p == *string) { p++; string++; if (!--l) { break; } } return (p - key->key_data); } static void copy_key(KEY * in, KEY * out) { /************************************** * * c o p y _ k e y * ************************************** * * Functional description * Copy a key. * **************************************/ UCHAR *p, *q; USHORT l; if ( (l = out->key_length = in->key_length) ) { p = out->key_data; q = in->key_data; do { *p++ = *q++; } while (--l); } } static CONTENTS delete_node(TDBB tdbb, WIN * window, BTN node) { /************************************** * * d e l e t e _ n o d e * ************************************** * * Functional description * Delete a node from a page and return whether it * empty, if there is a single node on it, or if it * is above or below the threshold for garbage collection. * **************************************/ SET_TDBB(tdbb); DBB dbb = tdbb->tdbb_database; CHECK_DBB(dbb); BTR page = (BTR) window->win_buffer; SLONG node_offset = (UCHAR*) node - (UCHAR*) page; CCH_MARK(tdbb, window); // move the rest of the page to the left to cover over this node BTN next = (BTN) (node->btn_data + node->btn_length); QUAD_MOVE(next->btn_number, node->btn_number); USHORT l; UCHAR *p, *q; p = node->btn_data; q = next->btn_data; l = next->btn_length; if (node->btn_prefix < next->btn_prefix) { node->btn_length = next->btn_length + next->btn_prefix - node->btn_prefix; p += next->btn_prefix - node->btn_prefix; } else { page->btr_prefix_total -= node->btn_prefix; fb_assert(l <= MAX_UCHAR); node->btn_length = (UCHAR) l; node->btn_prefix = next->btn_prefix; } if (l) { do { *p++ = *q++; } while (--l); } // Compute length of rest of bucket and move it down. l = page->btr_length - (q - (UCHAR *) page); if (l) { // Could be overlapping buffers. // Use MEMMOVE macro which is memmove() in most platforms, instead // of MOVE_FAST which is memcpy() in most platforms. // memmove() is guaranteed to work non-destructivly on overlapping buffers. MEMMOVE(q, p, l); p += l; q += l; l = 0; } page->btr_length = p - (UCHAR *) page; // Journal b-tree page - logical log of delete if (dbb->dbb_wal) { JRNB journal; fb_assert(node_offset <= MAX_USHORT); journal.jrnb_type = JRNP_BTREE_DELETE; journal.jrnb_prefix_total = page->btr_prefix_total; journal.jrnb_offset = (USHORT) node_offset; journal.jrnb_delta = node->btn_prefix; /* DEBUG ONLY */ journal.jrnb_length = page->btr_length; /* DEBUG ONLY */ CCH_journal_record(tdbb, window, (UCHAR *) & journal, JRNB_SIZE, 0, 0); } // check to see if the page is now empty node = page->btr_nodes; SLONG number = get_long(node->btn_number); if (number < 0) { return contents_empty; } // check to see if there is just one node node = NEXT_NODE(node); number = get_long(node->btn_number); if (number < 0) { return contents_single; } // check to see if the size of the page is below the garbage collection threshold, // meaning below the size at which it should be merged with its left sibling if possible. if (page->btr_length < GARBAGE_COLLECTION_THRESHOLD) { return contents_below_threshold; } return contents_above_threshold; } static void delete_tree(TDBB tdbb, USHORT rel_id, USHORT idx_id, SLONG next, SLONG prior) { /************************************** * * d e l e t e _ t r e e * ************************************** * * Functional description * Release index pages back to free list. * **************************************/ SET_TDBB(tdbb); WIN window; window.win_flags = WIN_large_scan; window.win_scans = 1; SLONG down = next; // Delete the index tree from the top down. while (next) { window.win_page = next; BTR page = (BTR) CCH_FETCH(tdbb, &window, LCK_write, 0); // do a little defensive programming--if any of these conditions // are true we have a damaged pointer, so just stop deleting. At // the same time, allow updates of indexes with id > 255 even though // the page header uses a byte for its index id. This requires relaxing // the check slightly introducing a risk that we'll pick up a page belonging // to some other index that is ours +/- (256*n). On the whole, unlikely. if (page->btr_header.pag_type != pag_index || page->btr_id != (UCHAR)(idx_id % 256) || page->btr_relation != rel_id) { CCH_RELEASE(tdbb, &window); return; } // if we are at the beginning of a non-leaf level, position // "down" to the beginning of the next level down if (next == down) { if (page->btr_level) { BTN node = page->btr_nodes; down = get_long(node->btn_number); } else { down = 0; } } // go through all the sibling pages on this level and release them next = page->btr_sibling; CCH_RELEASE_TAIL(tdbb, &window); PAG_release_page(window.win_page, prior); prior = window.win_page; // if we are at end of level, go down to the next level if (!next) { next = down; } } } static DSC *eval(TDBB tdbb, JRD_NOD node, DSC * temp, bool *missing) { /************************************** * * e v a l * ************************************** * * Functional description * Evaluate an expression returning a descriptor, and * a flag to indicate a null value. * **************************************/ DSC *desc; SET_TDBB(tdbb); desc = EVL_expr(tdbb, node); *missing = false; if (desc && !(tdbb->tdbb_request->req_flags & req_null)) { return desc; } else { *missing = true; } temp->dsc_dtype = dtype_text; temp->dsc_flags = 0; temp->dsc_sub_type = 0; temp->dsc_scale = 0; temp->dsc_length = 1; temp->dsc_ttype = ttype_ascii; temp->dsc_address = (UCHAR *) " "; return temp; } static SLONG fast_load(TDBB tdbb, JRD_REL relation, IDX * idx, USHORT key_length, SCB sort_handle, float *selectivity) { /************************************** * * f a s t _ l o a d * ************************************** * * Functional description * Do a fast load. The indices have already been passed into sort, and * are ripe for the plucking. This beast is complicated, but, I hope, * comprehendable. * **************************************/ KEY keys[MAX_LEVELS]; BTN nodes[MAX_LEVELS]; BTR buckets[MAX_LEVELS]; WIN windows[MAX_LEVELS]; ULONG split_pages[MAX_LEVELS]; SET_TDBB(tdbb); DBB dbb = tdbb->tdbb_database; CHECK_DBB(dbb); // leaf-page and pointer-page size limits. USHORT lp_fill_limit = dbb->dbb_page_size - 2 * BTN_SIZE; USHORT pp_fill_limit = dbb->dbb_page_size - 2 * BTN_SIZE; keys[0].key_length = 0; // Allocate and format the first leaf level bucket. Awkwardly, // the bucket header has room for only a byte of index id and that's // part of the ODS. So, for now, we'll just record the first byte // of the id and hope for the best. Index buckets are (almost) always // located through the index structure (dmp being an exception used // only for debug) so the id is actually redundant. BTR bucket = (BTR) DPM_allocate(tdbb, &windows[0]); bucket->btr_header.pag_type = pag_index; bucket->btr_relation = relation->rel_id; bucket->btr_id = (UCHAR)(idx->idx_id % 256); bucket->btr_level = 0; bucket->btr_length = OFFSETA(BTR, btr_nodes); if (idx->idx_flags & idx_descending) { bucket->btr_header.pag_flags |= btr_descending; } buckets[0] = bucket; buckets[1] = NULL; nodes[0] = bucket->btr_nodes; ULONG count = 0; ULONG duplicates = 0; bool error = false; bool duplicate = false; BTN node, split_node, next_node; USHORT level, prefix, i, l; WIN *window, split_window; KEY *key, split_key, temp_key; UCHAR *record, *p, *q; ISR isr; tdbb->tdbb_flags |= TDBB_no_cache_unwind; try { // If there's an error during index construction, fall // thru to release the last index bucket at each level // of the index. This will prepare for a single attempt // to deallocate the index pages for reuse. while (!error) { // Get the next record in sorted order. SORT_get(tdbb->tdbb_status_vector, sort_handle, /* TMN: cast */ (ULONG **) & record #ifdef SCROLLABLE_CURSORS , RSE_get_forward #endif ); if (!record) { break; } isr = (ISR) (record + key_length); count++; bucket = buckets[0]; node = nodes[0]; split_pages[0] = 0; key = &keys[0]; // Compute the prefix as the length in common with the previous record's key. prefix = compute_prefix(key, record, isr->isr_key_length); // If the length of the new node will cause us to overflow the bucket, // form a new bucket. if (bucket->btr_length + isr->isr_key_length - prefix > lp_fill_limit) { BTR split = (BTR) DPM_allocate(tdbb, &split_window); bucket->btr_sibling = split_window.win_page; split->btr_left_sibling = windows[0].win_page; split->btr_header.pag_type = pag_index; split->btr_relation = bucket->btr_relation; split->btr_level = bucket->btr_level; split->btr_id = bucket->btr_id; split->btr_header.pag_flags |= (bucket->btr_header.pag_flags & btr_descending); // store the first node on the split page split_node = split->btr_nodes; QUAD_MOVE(node->btn_number, split_node->btn_number); split_node->btn_prefix = 0; p = split_node->btn_data; q = key->key_data; fb_assert(key->key_length <= MAX_UCHAR); if ( (l = split_node->btn_length = (UCHAR) key->key_length) ) { do { *p++ = *q++; } while (--l); } // mark the end of the previous page quad_put((SLONG) END_BUCKET, node->btn_number); // save the page number of the previous page and release it split_pages[0] = windows[0].win_page; CCH_RELEASE(tdbb, &windows[0]); // set up the new page as the "current" page windows[0] = split_window; node = split_node; buckets[0] = bucket = split; // save the first key on page as the page to be propogated copy_key(key, &split_key); } if (bucket->btr_length != OFFSETA(BTR, btr_nodes)) { node = NEXT_NODE(node); } // Insert the new node in the now current bucket fb_assert(prefix <= MAX_UCHAR); node->btn_prefix = (UCHAR) prefix; bucket->btr_prefix_total += prefix; quad_put(isr->isr_record_number, node->btn_number); p = node->btn_data; q = record + prefix; if ( (l = node->btn_length = isr->isr_key_length - prefix) ) { do { *p++ = *q++; } while (--l); } // check if this is a duplicate node duplicate = (!node->btn_length && prefix == key->key_length); if (duplicate) { ++duplicates; } // set this node as the current node, and update the length of the page nodes[0] = node; next_node = NEXT_NODE(node); bucket->btr_length = (UCHAR *) (next_node) - (UCHAR *) bucket; if (bucket->btr_length > dbb->dbb_page_size) { BUGCHECK(205); // msg 205 index bucket overfilled } // Remember the last key inserted to compress the next one. p = key->key_data; q = record; if ( (l = key->key_length = isr->isr_key_length) ) { do { *p++ = *q++; } while (--l); } // If there wasn't a split, we're done. If there was, propogate the // split upward for (level = 1; split_pages[level - 1]; level++) { // initialize the current pointers for this level window = &windows[level]; key = &keys[level]; split_pages[level] = 0; node = nodes[level]; // If there isn't already a bucket at this level, make one. Remember to // shorten the index id to a byte if (!(bucket = buckets[level])) { buckets[level + 1] = NULL; buckets[level] = bucket = (BTR) DPM_allocate(tdbb, window); bucket->btr_header.pag_type = pag_index; bucket->btr_relation = relation->rel_id; bucket->btr_id = (UCHAR)(idx->idx_id % 256); fb_assert(level <= MAX_UCHAR); bucket->btr_level = (UCHAR) level; if (idx->idx_flags & idx_descending) bucket->btr_header.pag_flags |= btr_descending; bucket->btr_length = OFFSETA(BTR, btr_nodes) + BTN_SIZE; // since this is the beginning of the level, we propogate // the lower-level page with a "degenerate" zero-length // node indicating that this page holds any key value // less than the next node node = bucket->btr_nodes; node->btn_length = node->btn_prefix = 0; quad_put(split_pages[level - 1], node->btn_number); key->key_length = 0; } // Compute the prefix in preparation of insertion prefix = compute_prefix(key, split_key.key_data, split_key.key_length); // Remember the last key inserted to compress the next one. copy_key(&split_key, &temp_key); // See if the new node fits in the current bucket. If not, split // the bucket. if (bucket->btr_length + temp_key.key_length - prefix > pp_fill_limit) { BTR split = (BTR) DPM_allocate(tdbb, &split_window); bucket->btr_sibling = split_window.win_page; split->btr_left_sibling = window->win_page; split->btr_header.pag_type = pag_index; split->btr_relation = bucket->btr_relation; split->btr_level = bucket->btr_level; split->btr_id = bucket->btr_id; split->btr_header.pag_flags |= (bucket->btr_header.pag_flags & btr_descending); split_node = split->btr_nodes; // insert the new node in the new bucket QUAD_MOVE(node->btn_number, split_node->btn_number); split_node->btn_prefix = 0; p = split_node->btn_data; q = key->key_data; fb_assert(key->key_length <= MAX_UCHAR); if ( (l = split_node->btn_length = (UCHAR) key->key_length) ) { do { MOVE_BYTE(q, p); } while (--l); } // mark the end of the page; note that the end_bucket marker must // contain info about the first node on the next page quad_put((SLONG) END_BUCKET, node->btn_number); // indicate to propogate the page we just split from split_pages[level] = window->win_page; CCH_RELEASE(tdbb, window); // and make the new page the current page *window = split_window; node = split_node; buckets[level] = bucket = split; copy_key(key, &split_key); } // Now propogate up the lower-level bucket by storing a "pointer" to it. node = NEXT_NODE(node); fb_assert(prefix <= MAX_UCHAR); node->btn_prefix = (UCHAR) prefix; bucket->btr_prefix_total += prefix; quad_put(windows[level - 1].win_page, node->btn_number); // Store the key associated with the page as the first unique key // value on the page. p = node->btn_data; q = temp_key.key_data + prefix; if ( (l = node->btn_length = temp_key.key_length - prefix) ) { do { MOVE_BYTE(q, p); } while (--l); } // Now restore the current key value and save this node as the // current node on this level; also calculate the new page length. copy_key(&temp_key, key); nodes[level] = node; next_node = NEXT_NODE(node); bucket->btr_length = (UCHAR *) next_node - (UCHAR *) bucket; if (bucket->btr_length > dbb->dbb_page_size) { BUGCHECK(205); // msg 205 index bucket overfilled } } #ifdef SUPERSERVER if (--tdbb->tdbb_quantum < 0 && !tdbb->tdbb_inhibit) { error = JRD_reschedule(tdbb, 0, FALSE); } #endif } // To finish up, put an end of level marker on the last bucket // of each level. for (i = 0; (bucket = buckets[i]); i++) { // retain the top level window for returning to the calling routine window = &windows[i]; // store the end of level marker node = LAST_NODE(bucket); node->btn_length = node->btn_prefix = 0; quad_put((SLONG) END_LEVEL, node->btn_number); // and update the final page length bucket->btr_length += BTN_SIZE; if (bucket->btr_length > dbb->dbb_page_size) { BUGCHECK(205); // msg 205 index bucket overfilled } CCH_RELEASE(tdbb, &windows[i]); } tdbb->tdbb_flags &= ~TDBB_no_cache_unwind; // do some final housekeeping SORT_fini(sort_handle, tdbb->tdbb_attachment); } // try catch (const std::exception&) { error = true; } // If index flush fails, try to delete the index tree. // If the index delete fails, just go ahead and punt. try { if (error) { delete_tree(tdbb, relation->rel_id, idx->idx_id, window->win_page, 0); ERR_punt(); } CCH_flush(tdbb, (USHORT) FLUSH_ALL, 0); *selectivity = (float) ((count) ? (1. / (double) (count - duplicates)) : 0); return window->win_page; } // try catch(const std::exception&) { if (!error) { error = true; } else { ERR_punt(); } } return -1L; /* lint */ } static IRT fetch_root(TDBB tdbb, WIN * window, JRD_REL relation) { /************************************** * * f e t c h _ r o o t * ************************************** * * Functional description * Return descriptions of all indices for relation. If there isn't * a known index root, assume we were called during optimization * and return no indices. * **************************************/ SET_TDBB(tdbb); if ((window->win_page = relation->rel_index_root) == 0) { if (relation->rel_id == 0) { return NULL; } else { DPM_scan_pages(tdbb); window->win_page = relation->rel_index_root; } } return (IRT) CCH_FETCH(tdbb, window, LCK_read, pag_root); } static SLONG find_node(BTR bucket, KEY * key, bool descending) { /************************************** * * f i n d _ n o d e * ************************************** * * Functional description * Find a node in an index level. Return either the * node equal to the key or the last node less than the key. * Note that this routine can be called only for non-leaf * pages, because it assumes the first node on page is * a degenerate, zero-length node. * **************************************/ UCHAR *key_end, *node_end; UCHAR *p, *q; BTN node = bucket->btr_nodes; // Compute common prefix of key and first node USHORT prefix = compute_prefix(key, node->btn_data, node->btn_length); p = key->key_data + prefix; key_end = key->key_data + key->key_length; SLONG number = get_long(node->btn_number); if (number == END_LEVEL) { BUGCHECK(206); // msg 206 exceeded index level } if (key->key_length == 0) { return number; } BTN prior; while (true) { // If this is the end of bucket, return node. Somebody else can // deal with this if (number == END_BUCKET) { return number; } prior = node; node = NEXT_NODE(node); number = get_long(node->btn_number); // If the page/record number is -1, the node is the last in the level // and, by definition, is the target node. Otherwise, if the // prefix of the current node is less than the running prefix, its // node must have a value greater than the key, which is the insertion // point. if (number == END_LEVEL || node->btn_prefix < prefix) { return get_long(prior->btn_number); } // If the node prefix is greater than current prefix , it must be less // than the key, so we can skip it. If it has zero length, then // it is a duplicate, and can also be skipped. q = node->btn_data; node_end = q + node->btn_length; if (descending) { if (node->btn_prefix == prefix) { while (true) { if (q == node_end || p == key_end) { return get_long(prior->btn_number); } else if (*p > *q) { break; } else if (*p++ < *q++) { return get_long(prior->btn_number); } } } } else if (node->btn_prefix == prefix && node->btn_length > 0) { while (true) { if (p == key_end) { return get_long(prior->btn_number); } else if (q == node_end || *p > *q) { break; } else if (*p++ < *q++) { return get_long(prior->btn_number); } } } prefix = (p - key->key_data); } // NOTREACHED return 0; // superfluous return to shut lint up } static CONTENTS garbage_collect(TDBB tdbb, WIN * window, SLONG parent_number) { /************************************** * * g a r b a g e _ c o l l e c t * ************************************** * * Functional description * Garbage collect an index page. This requires * care so that we don't step on other processes * that might be traversing the tree forwards, * backwards, or top to bottom. We must also * keep in mind that someone might be adding a node * at the same time we are deleting. Therefore we * must lock all the pages involved to prevent * such operations while we are garbage collecting. * **************************************/ SET_TDBB(tdbb); DBB dbb = tdbb->tdbb_database; CHECK_DBB(dbb); BTR gc_page = (BTR) window->win_buffer; CONTENTS result = contents_above_threshold; // check to see if the page was marked not to be garbage collected if (gc_page->btr_header.pag_flags & btr_dont_gc) { CCH_RELEASE(tdbb, window); return contents_above_threshold; } // record the left sibling now since this is the only way to // get to it quickly; don't worry if it's not accurate now or // is changed after we release the page, since we will fetch // it in a fault-tolerant way anyway. SLONG left_number = gc_page->btr_left_sibling; // if the left sibling is blank, that indicates we are the leftmost page, // so don't garbage-collect the page; do this for several reasons: // 1. The leftmost page needs a degenerate zero length node as its first node // (for a non-leaf, non-top-level page). // 2. The parent page would need to be fixed up to have a degenerate node // pointing to the right sibling. // 3. If we remove all pages on the level, we would need to re-add it next // time a record is inserted, so why constantly garbage-collect and re-create // this page? if (!left_number) { CCH_RELEASE(tdbb, window); return contents_above_threshold; } // record some facts for later validation USHORT relation_number = gc_page->btr_relation; UCHAR index_id = gc_page->btr_id; UCHAR index_level = gc_page->btr_level; // we must release the page we are attempting to garbage collect; // this is necessary to avoid deadlocks when we fetch the parent page CCH_RELEASE(tdbb, window); // fetch the parent page, but we have to be careful, because it could have // been garbage-collected when we released it--make checks so that we know it // is the parent page; there is a minute possibility that it could have been // released and reused already as another page on this level, but if so, it // won't really matter because we won't find the node on it WIN parent_window; parent_window.win_page = parent_number; parent_window.win_flags = 0; BTR parent_page = (BTR) CCH_FETCH(tdbb, &parent_window, LCK_write, pag_undefined); if ((parent_page->btr_header.pag_type != pag_index) || (parent_page->btr_relation != relation_number) || (parent_page->btr_id != (UCHAR)(index_id % 256)) || (parent_page->btr_level != index_level + 1)) { CCH_RELEASE(tdbb, &parent_window); return contents_above_threshold; } // find the left sibling page by going one page to the left, // but if it does not recognize us as its right sibling, keep // going to the right until we find the page that is our real // left sibling WIN left_window; left_window.win_page = left_number; left_window.win_flags = 0; BTR left_page = (BTR) CCH_FETCH(tdbb, &left_window, LCK_write, pag_index); while (left_page->btr_sibling != window->win_page) { #ifdef DEBUG_BTR CCH_RELEASE(tdbb, &parent_window); CCH_RELEASE(tdbb, &left_window); CORRUPT(204); // msg 204 index inconsistent #endif // If someone garbage collects the index page before we can, it // won't be found by traversing the right sibling chain. This means // scanning index pages until the end-of-level bucket is hit. if (!left_page->btr_sibling) { CCH_RELEASE(tdbb, &parent_window); CCH_RELEASE(tdbb, &left_window); return contents_above_threshold; } left_page = (BTR) CCH_HANDOFF(tdbb, &left_window, left_page->btr_sibling, LCK_write, pag_index); } // now refetch the original page and make sure it is still // below the threshold for garbage collection. gc_page = (BTR) CCH_FETCH(tdbb, window, LCK_write, pag_index); if ((gc_page->btr_length >= GARBAGE_COLLECTION_THRESHOLD) || (gc_page->btr_header.pag_flags & btr_dont_gc)) { CCH_RELEASE(tdbb, &parent_window); CCH_RELEASE(tdbb, &left_window); CCH_RELEASE(tdbb, window); return contents_above_threshold; } // fetch the right sibling page WIN right_window; BTR right_page = NULL; if ( (right_window.win_page = gc_page->btr_sibling) ) { right_window.win_flags = 0; right_page = (BTR) CCH_FETCH(tdbb, &right_window, LCK_write, pag_index); if (right_page->btr_left_sibling != window->win_page) { CCH_RELEASE(tdbb, &parent_window); if (left_page) { CCH_RELEASE(tdbb, &left_window); } CCH_RELEASE(tdbb, window); CCH_RELEASE(tdbb, &right_window); #ifdef DEBUG_BTR CORRUPT(204); // msg 204 index inconsistent #endif return contents_above_threshold; } } // Find the node on the parent's level--the parent page could // have split while we didn't have it locked #ifdef DEBUG_BTR SLONG previous_number = 0; #endif SLONG number; BTN parent_node; for (parent_node = parent_page->btr_nodes;;) { number = get_long(parent_node->btn_number); if (number == END_BUCKET) { parent_page = (BTR) CCH_HANDOFF(tdbb, &parent_window, parent_page->btr_sibling, LCK_write, pag_index); parent_node = parent_page->btr_nodes; continue; } if (number == window->win_page || number == END_LEVEL) { break; } #ifdef DEBUG_BTR previous_number = number; #endif parent_node = NEXT_NODE(parent_node); } // we should always find the node, but just in case we don't, bow out gracefully if (number == END_LEVEL) { CCH_RELEASE(tdbb, &left_window); if (right_page) { CCH_RELEASE(tdbb, &right_window); } CCH_RELEASE(tdbb, &parent_window); CCH_RELEASE(tdbb, window); #ifdef DEBUG_BTR CORRUPT(204); /* msg 204 index inconsistent */ #endif return contents_above_threshold; } // Fix for ARINC database corruption bug: in most cases we update the END_BUCKET // marker of the left sibling page to contain the END_BUCKET of the garbage-collected // page. However, when this page is the first page on its parent, then the left // sibling page is the last page on its parent. That means if we update its END_BUCKET // marker, its bucket of values will extend past that of its parent, causing trouble // down the line. // So we never garbage-collect a page which is the first one on its parent. This page // will have to wait until the parent page gets collapsed with the page to its left, // in which case this page itself will then be garbage-collectable. Since there are // no more keys on this page, it will not be garbage-collected itself. When the page // to the right falls below the threshold for garbage collection, it will be merged with // this page. if (parent_node == parent_page->btr_nodes) { CCH_RELEASE(tdbb, &left_window); if (right_page) { CCH_RELEASE(tdbb, &right_window); } CCH_RELEASE(tdbb, &parent_window); CCH_RELEASE(tdbb, window); return contents_above_threshold; } // find the last node on the left sibling and save its key value BTN last_node; KEY last_key; UCHAR *p, *q; USHORT l; p = last_key.key_data; for (last_node = left_page->btr_nodes; (number = get_long(last_node->btn_number) >= 0); last_node = NEXT_NODE(last_node)) { if ( (l = last_node->btn_length) ) { p = last_key.key_data + last_node->btn_prefix; q = last_node->btn_data; do { *p++ = *q++; } while (--l); } } last_key.key_length = p - last_key.key_data; // see if there's enough space on the left page to move all the nodes to it // and leave some extra space for expansion (at least one key length) BTN node = gc_page->btr_nodes; USHORT prefix = compute_prefix(&last_key, node->btn_data, node->btn_length); if (left_page->btr_length + gc_page->btr_length - prefix - last_node->btn_length - BTN_SIZE - ((UCHAR *) gc_page->btr_nodes - (UCHAR *) gc_page) > dbb->dbb_page_size - MAX_KEY) { CCH_RELEASE(tdbb, &parent_window); CCH_RELEASE(tdbb, &left_window); CCH_RELEASE(tdbb, window); if (right_page) CCH_RELEASE(tdbb, &right_window); return contents_above_threshold; } #ifdef DEBUG_BTR { SLONG next_number; BTN next_parent_node; // do a consistency check to be sure that the parent page has the proper // nodes to the left and to the right--this assumes single-user, because // it's possible that leaf pages in a duplicate chain could be out of // order when two different processes split pages at the same time next_parent_node = NEXT_NODE(parent_node); next_number = get_long(next_parent_node->btn_number); if ((left_page && previous_number && (previous_number != left_window.win_page)) || (right_page && (next_number > 0) && (next_number != right_window.win_page))) { CCH_RELEASE(tdbb, &parent_window); CCH_RELEASE(tdbb, &left_window); CCH_RELEASE(tdbb, window); if (right_page) { CCH_RELEASE(tdbb, &right_window); } CORRUPT(204); // msg 204 index inconsistent return contents_above_threshold; } } #endif // Now begin updating the pages. We must write them out in such // a way as to maintain on-disk integrity at all times. That means // not having pointers to released pages, and not leaving things in // an inconsistent state for navigation through the pages. // Update the parent first. If the parent is not written out first, // we will be pointing to a page which is not in the doubly linked // sibling list, and therefore navigation back and forth won't work. result = delete_node(tdbb, &parent_window, parent_node); CCH_RELEASE(tdbb, &parent_window); // Update the right sibling page next, since it does not really // matter that the left sibling pointer points to the page directly // to the left, only that it point to some page to the left. // Set up the precedence so that the parent will be written first. if (right_page) { if (parent_page) { CCH_precedence(tdbb, &right_window, parent_window.win_page); } CCH_MARK(tdbb, &right_window); right_page->btr_left_sibling = left_window.win_page; if (dbb->dbb_journal) { CCH_journal_page(tdbb, &right_window); } CCH_RELEASE(tdbb, &right_window); } // Now update the left sibling, effectively removing the garbage-collected page // from the tree. Set the precedence so the right sibling will be written first. if (right_page) { CCH_precedence(tdbb, &left_window, right_window.win_page); } else if (parent_page) { CCH_precedence(tdbb, &left_window, parent_window.win_page); } CCH_MARK(tdbb, &left_window); if (right_page) { left_page->btr_sibling = right_window.win_page; } else { left_page->btr_sibling = 0; } // move all the nodes from the garbage-collected page to the left sibling, // overwriting the END_BUCKET of the left sibling node = gc_page->btr_nodes; // calculate the total amount of compression on page as the combined totals // of the two pages, plus the compression of the first node on the g-c'ed page, // minus the prefix of the END_BUCKET node to be deleted left_page->btr_prefix_total += gc_page->btr_prefix_total + prefix - last_node->btn_prefix; // fix up the last node of the left page to contain the compressed first node last_node->btn_prefix = prefix; last_node->btn_length = node->btn_length - prefix; p = last_node->btn_number; q = node->btn_number; l = 4; do { *p++ = *q++; } while (--l); // copy over the remainder of the page to be garbage-collected p = last_node->btn_data; q = node->btn_data + prefix; l = gc_page->btr_length - (q - (UCHAR *) gc_page); if (l) { do { *p++ = *q++; } while (--l); } left_page->btr_length = p - (UCHAR *) left_page; #ifdef DEBUG_BTR if (left_page->btr_length > dbb->dbb_page_size) { CCH_RELEASE(tdbb, &left_window); CCH_RELEASE(tdbb, window); CORRUPT(204); /* msg 204 index inconsistent */ return contents_above_threshold; } #endif if (dbb->dbb_journal) { CCH_journal_page(tdbb, &left_window); } CCH_RELEASE(tdbb, &left_window); // finally, release the page, and indicate that we should write the // previous page out before we write the TIP page out CCH_RELEASE(tdbb, window); PAG_release_page(window->win_page, left_page ? left_window.win_page : right_page ? right_window.win_page : parent_window.win_page); // if the parent page needs to be garbage collected, that means we need to // re-fetch the parent and check to see whether it is still garbage-collectable; // make sure that the page is still a btree page in this index and in this level-- // there is a miniscule chance that it was already reallocated as another page // on this level which is already below the threshold, in which case it doesn't // hurt anything to garbage-collect it anyway if (result != contents_above_threshold) { window->win_page = parent_window.win_page; parent_page = (BTR) CCH_FETCH(tdbb, window, LCK_write, pag_undefined); if ((parent_page->btr_header.pag_type != pag_index) || (parent_page->btr_relation != relation_number) || (parent_page->btr_id != index_id) || (parent_page->btr_level != index_level + 1)) { CCH_RELEASE(tdbb, window); return contents_above_threshold; } // check whether it is empty parent_node = parent_page->btr_nodes; number = get_long(parent_node->btn_number); if (number < 0) { return contents_empty; } // check whether there is just one node parent_node = NEXT_NODE(parent_node); number = get_long(parent_node->btn_number); if (number < 0) { return contents_single; } // check to see if the size of the page is below the garbage collection threshold if (parent_page->btr_length < GARBAGE_COLLECTION_THRESHOLD) { return contents_below_threshold; } // the page must have risen above the threshold; release the window since // someone else added a node while the page was released CCH_RELEASE(tdbb, window); return contents_above_threshold; } return result; } static SLONG insert_node(TDBB tdbb, WIN * window, IIB * insertion, KEY * new_key, SLONG * original_page, SLONG * sibling_page) { /************************************** * * i n s e r t _ n o d e * ************************************** * * Functional description * Insert a node in a bucket. If this isn't the right bucket, * return -1. If it splits, return the split page number and * leading string. This is the workhorse for add_node. * **************************************/ SET_TDBB(tdbb); DBB dbb = tdbb->tdbb_database; CHECK_DBB(dbb); // find the insertion point for the specified key BTR bucket = (BTR) window->win_buffer; KEY *key; key = insertion->iib_key; USHORT prefix; BTN node = BTR_find_leaf(bucket, key, 0, &prefix, insertion->iib_descriptor->idx_flags & idx_descending, false); if (!node) { return -1; } // loop through the equivalent nodes until the correct insertion // point is found; for leaf level this will be the first node UCHAR *p, *q; USHORT old_prefix, old_length; USHORT node_offset, l; SLONG old_number; for (;;) { node_offset = (UCHAR *) node - (UCHAR *) bucket; old_number = get_long(node->btn_number); old_prefix = node->btn_prefix; old_length = node->btn_length; p = key->key_data + old_prefix; q = node->btn_data; l = MIN(key->key_length - old_prefix, old_length); if (l) { do { if (*p++ != *q++) { break; } --old_length; old_prefix++; } while (--l); } // check if the inserted node has the same value as the next node if (old_prefix != key->key_length || old_prefix != node->btn_length + node->btn_prefix) { break; } // This block of code moved up for IGNORE_NULL_IDX_KEY if (old_number == END_BUCKET) { return -1; } if (old_number == END_LEVEL) { break; } // if this is a non-leaf page, we need to find // the correct insertion point in the duplicate chain if (!bucket->btr_level) { break; } if (old_number == insertion->iib_sibling) { break; } // since the node is equivalent and we are about to skip past it, // the prefix of the inserted node is now the same prefix = old_prefix; while (old_number != insertion->iib_sibling) { node = NEXT_NODE(node); old_number = get_long(node->btn_number); if (node->btn_length) { break; } if (old_number == END_BUCKET) { return -1; } if (old_number == END_LEVEL) { break; } } } // Compute the length of the updated page. This is a function of the // new string length minus prefix and recompression done to the string // following the insertion. USHORT delta = BTN_SIZE + key->key_length - prefix + node->btn_prefix - old_prefix; // Prepare to slide down tail of bucket. If we're going to split, // move the initialized hunk of the bucket to an overflow area big // enough to hold the split. If the bucket isn't going to split, // mark the buffer as dirty. SLONG overflow_page[OVERSIZE]; if (bucket->btr_length + delta > dbb->dbb_page_size) { MOVE_FASTER(bucket, overflow_page, bucket->btr_length); node = (BTN) ((UCHAR *) overflow_page + node_offset); bucket = (BTR) overflow_page; } else { // if we are a pointer page, make sure that the page we are // pointing to gets written before we do for on-disk integrity if (bucket->btr_level != 0) { CCH_precedence(tdbb, window, insertion->iib_number); } CCH_MARK(tdbb, window); } BTN new_node = node; // Slide down the upper hunk of the bucket to make room for the // insertion. l = bucket->btr_length - node_offset; p = (UCHAR *) bucket + bucket->btr_length; q = p + delta; do { *--q = *--p; } while (--l); // Insert the new node. bucket->btr_length += delta; bucket->btr_prefix_total += prefix - node->btn_prefix; node->btn_prefix = prefix; quad_put(insertion->iib_number, node->btn_number); p = node->btn_data; q = key->key_data + prefix; if ( (l = node->btn_length = key->key_length - prefix) ) { do { MOVE_BYTE(q, p); } while (--l); } // Recompress and rebuild the next node. node = (BTN) p; bucket->btr_prefix_total += old_prefix; node->btn_prefix = old_prefix; node->btn_length = old_length; quad_put(old_number, node->btn_number); // We don't need to rebuild btn_data of first pushed node here because, // if old_prefix has increased we only move down part of the node anyway // figure out whether this node was inserted at the end of the page bool end_of_page = (old_number < 0); // If the index is unique, look for duplicates in this bucket. if (insertion->iib_descriptor->idx_flags & idx_unique) { while (node->btn_length == 0 && node->btn_prefix == key->key_length) { old_number = get_long(node->btn_number); if (old_number < 0) break; SBM_set(tdbb, &insertion->iib_duplicates, old_number); node = (BTN) node->btn_data; } } // If the bucket still fits on a page, we're almost done. if (bucket->btr_length <= dbb->dbb_page_size) { // Journal new node added. The node is journalled as the compressed // new node and the BTN of the re compressed next node. if (dbb->dbb_wal) { JRNB journal; journal.jrnb_type = JRNP_BTREE_NODE; journal.jrnb_prefix_total = bucket->btr_prefix_total; journal.jrnb_offset = node_offset; journal.jrnb_delta = delta; journal.jrnb_length = BTN_SIZE + BTN_SIZE + new_node->btn_length; CCH_journal_record(tdbb, window, (UCHAR *) & journal, JRNB_SIZE, (UCHAR *) bucket + node_offset, journal.jrnb_length); } CCH_RELEASE(tdbb, window); return 0; } // We've a bucket split in progress. We need to determine the split point. // Set it halfway through the page, unless we are at the end of the page, // in which case put only the new node on the new page. This will ensure // that pages get filled in the case of a monotonically increasing key. // Make sure that the original page has room, in case the END_BUCKET marker // is now longer because it is pointing at the new node. UCHAR *midpoint; if (end_of_page && ((UCHAR *) NEXT_NODE(new_node) <= (UCHAR *) bucket + dbb->dbb_page_size)) { midpoint = (UCHAR *) new_node; } else { midpoint = (UCHAR *) bucket + (dbb->dbb_page_size - OFFSETA(BTR, btr_nodes)) / 2; } // Copy the bucket up to the midpoint, restructing the full midpoint key SLONG prefix_total = 0; for (p = (UCHAR *) bucket->btr_nodes; p < midpoint;) { node = (BTN) p; prefix_total += node->btn_prefix; p = node->btn_data; q = new_key->key_data + node->btn_prefix; new_key->key_length = node->btn_prefix + node->btn_length; if ( (l = node->btn_length) ) do { *q++ = *p++; } while (--l); } // Allocate and format the overflow page WIN split_window; BTR split = (BTR) DPM_allocate(tdbb, &split_window); // if we're a pointer page, make sure the child page is written first if (bucket->btr_level != 0) { if ((UCHAR *) new_node < midpoint) { CCH_precedence(tdbb, window, insertion->iib_number); } else { CCH_precedence(tdbb, &split_window, insertion->iib_number); } } // format the new page to look like the old page SLONG right_sibling; split->btr_header.pag_type = bucket->btr_header.pag_type; split->btr_relation = bucket->btr_relation; split->btr_id = bucket->btr_id; split->btr_level = bucket->btr_level; split->btr_sibling = right_sibling = bucket->btr_sibling; split->btr_left_sibling = window->win_page; split->btr_header.pag_flags |= (bucket->btr_header.pag_flags & btr_descending); // Format the first node on the overflow page new_node = split->btr_nodes; new_node->btn_prefix = 0; QUAD_MOVE(node->btn_number, new_node->btn_number); p = new_node->btn_data; q = new_key->key_data; fb_assert(new_key->key_length <= MAX_UCHAR); if ( (l = new_node->btn_length = (UCHAR) new_key->key_length) ) { do { MOVE_BYTE(q, p); } while (--l); } // Copy down the remaining half of the original bucket on the overflow page q = (UCHAR *) (NEXT_NODE(node)); l = bucket->btr_length - (q - (UCHAR *) bucket); if (((U_IPTR) p & (ALIGNMENT - 1)) || ((U_IPTR) q & (ALIGNMENT - 1))) { MOVE_FAST(q, p, l); } else { MOVE_FASTER(q, p, l); } split->btr_length = p + l - (UCHAR *) split; // the sum of the prefixes on the split page is the previous total minus // the prefixes found on the original page; the sum of the prefixes on the // original page must exclude the split node split->btr_prefix_total = bucket->btr_prefix_total - prefix_total; bucket->btr_prefix_total = prefix_total - node->btn_prefix; SLONG split_page = split_window.win_page; CCH_RELEASE(tdbb, &split_window); CCH_precedence(tdbb, window, split_window.win_page); CCH_mark_must_write(tdbb, window); // The split bucket is still residing in the overflow area. Copy it // back to the original buffer. After cleaning up the last node, // we're done! bucket->btr_sibling = split_window.win_page; // mark the end of the page; note that the end_bucket marker must // contain info about the first node on the next page. quad_put((SLONG) END_BUCKET, node->btn_number); BTN next_node = NEXT_NODE(node); bucket->btr_length = (UCHAR *) next_node - (UCHAR *) bucket; MOVE_FASTER(bucket, window->win_buffer, bucket->btr_length); // mark the bucket as non garbage-collectable until we can propogate // the split page up to the parent; otherwise its possible that the // split page we just created will be lost. bucket->btr_header.pag_flags |= btr_dont_gc; // journal the split page if (dbb->dbb_wal) { journal_btree_segment(tdbb, window, bucket); } if (original_page) { *original_page = window->win_page; } // now we need to go to the right sibling page and update its // left sibling pointer to point to the newly split page if (right_sibling) { bucket = (BTR) CCH_HANDOFF(tdbb, window, right_sibling, LCK_write, pag_index); CCH_MARK(tdbb, window); bucket->btr_left_sibling = split_window.win_page; if (dbb->dbb_journal) { CCH_journal_page(tdbb, window); } } CCH_RELEASE(tdbb, window); // return the page number of the right sibling page if (sibling_page) { *sibling_page = right_sibling; } return split_page; } static void journal_btree_segment(TDBB tdbb, WIN * window, BTR bucket) { /************************************** * * j o u r n a l _ b t r e e _ s e g m e n t * ************************************** * * Functional description * Journal valid part of btree segment. * **************************************/ JRNB journal; SET_TDBB(tdbb); journal.jrnb_type = JRNP_BTREE_SEGMENT; journal.jrnb_offset = 0; journal.jrnb_delta = 0; journal.jrnb_length = bucket->btr_length; CCH_journal_record(tdbb, window, (UCHAR *) & journal, JRNB_SIZE, (UCHAR *) bucket, journal.jrnb_length); } static bool key_equality(KEY * key, BTN node) { /************************************** * * k e y _ e q u a l i t y * ************************************** * * Functional description * Check a B-tree node against a key for equality. * **************************************/ if (key->key_length != node->btn_length + node->btn_prefix) { return false; } SSHORT l; if (!(l = node->btn_length)) { return true; } UCHAR *p, *q; p = node->btn_data; q = key->key_data + node->btn_prefix; do { if (*p++ != *q++) { return false; } } while (--l); return true; } static INT64_KEY make_int64_key(SINT64 q, SSHORT scale) { /************************************** * * m a k e _ i n t 6 4 _ k e y * ************************************** * * Functional description * Make an Index key for a 64-bit Integer value. * **************************************/ UINT64 uq; INT64_KEY key; int n; // Following structure declared above in the modules global section // // static const struct { // UINT64 limit; --- if abs(q) is >= this, ... // SINT64 factor; --- then multiply by this, ... // SSHORT scale_change; --- and add this to the scale. // } int64_scale_control[]; // // Before converting the scaled int64 to a double, multiply it by the // largest power of 10 which will NOT cause an overflow, and adjust // the scale accordingly. This ensures that two different // representations of the same value, entered at times when the // declared scale of the column was different, actually wind up // being mapped to the same key. n = 0; uq = (UINT64) ((q >= 0) ? q : -q); // absolute value while (uq < int64_scale_control[n].limit) { n++; } q *= int64_scale_control[n].factor; scale -= int64_scale_control[n].scale_change; key.d_part = ((double) (q / 10000)) / powerof10(scale); key.s_part = (SSHORT) (q % 10000); return key; } #ifdef DEBUG_INDEXKEY static void print_int64_key(SINT64 value, SSHORT scale, INT64_KEY key) { /************************************** * * p r i n t _ i n t 6 4 _ k e y * ************************************** * * Functional description * Debugging function to print a key created out of an int64 * quantify. * **************************************/ UCHAR *p; USHORT n; ib_fprintf(ib_stderr, "%20" QUADFORMAT "d %4d %.15e %6d ", value, scale, key.d_part, key.s_part); p = (UCHAR *) & key; for (n = 10; n--; n > 0) ib_fprintf(ib_stderr, "%02x ", *p++); ib_fprintf(ib_stderr, "\n"); return; } #endif /* DEBUG_INDEXKEY */ static void quad_put(SLONG value, UCHAR* data) { /************************************** * * q u a d _ p u t * ************************************** * * Functional description * Move SLONG to a four byte vector. * **************************************/ const UCHAR* p = (UCHAR*) &value; data[0] = p[0]; data[1] = p[1]; data[2] = p[2]; data[3] = p[3]; } static void quad_move(UCHAR* a, UCHAR* b) { /************************************** * * q u a d _ m o v e * ************************************** * * Functional description * Move an unaligned longword (4 bytes). * **************************************/ MOVE_BYTE(a, b); MOVE_BYTE(a, b); MOVE_BYTE(a, b); MOVE_BYTE(a, b); } static CONTENTS remove_node(TDBB tdbb, IIB * insertion, WIN * window) { /************************************** * * r e m o v e _ n o d e * ************************************** * * Functional description * Remove an index node from a b-tree, * recursing down through the levels in case * we need to garbage collect pages. * **************************************/ SET_TDBB(tdbb); DBB dbb = tdbb->tdbb_database; IDX* idx = insertion->iib_descriptor; BTR page = (BTR) window->win_buffer; // if we are on a leaf page, remove the leaf node if (page->btr_level == 0) { return remove_leaf_node(tdbb, insertion, window); } SLONG number, parent_number; CONTENTS result; while (true) { number = find_node(page, insertion->iib_key, (idx->idx_flags & idx_descending)); // we should always find the node, but let's make sure if (number == END_LEVEL) { CCH_RELEASE(tdbb, window); #ifdef DEBUG_BTR CORRUPT(204); // msg 204 index inconsistent #endif return contents_above_threshold; } // recurse to the next level down; if we are about to fetch a // level 0 page, make sure we fetch it for write if (number != END_BUCKET) { // handoff down to the next level, retaining the parent page number parent_number = window->win_page; page = (BTR) CCH_HANDOFF(tdbb, window, number, (SSHORT) ((page->btr_level == 1) ? LCK_write : LCK_read), pag_index); // if the removed node caused the page to go below the garbage collection // threshold, and the database was created by a version of the engine greater // than 8.2, then we can garbage-collect the page result = remove_node(tdbb, insertion, window); if ((result != contents_above_threshold) && (dbb->dbb_ods_version >= ODS_VERSION9)) { return garbage_collect(tdbb, window, parent_number); } if (window->win_bdb) { CCH_RELEASE(tdbb, window); } return contents_above_threshold; } // we've hit end of bucket, so go to the sibling looking for the node page = (BTR) CCH_HANDOFF(tdbb, window, page->btr_sibling, LCK_read, pag_index); } // NOTREACHED return contents_empty; // superfluous return to shut lint up } static CONTENTS remove_leaf_node(TDBB tdbb, IIB * insertion, WIN * window) { /************************************** * * r e m o v e _ l e a f _ n o d e * ************************************** * * Functional description * Remove an index node from the leaf level. * **************************************/ SET_TDBB(tdbb); BTR page = (BTR) window->win_buffer; BTN node; KEY *key; key = insertion->iib_key; USHORT prefix; // Look for the first node with the value to be removed. while (!(node = BTR_find_leaf(page, key, 0, &prefix, insertion->iib_descriptor->idx_flags & idx_descending, false))) { page = (BTR) CCH_HANDOFF(tdbb, window, page->btr_sibling, LCK_write, pag_index); } // Make sure first node looks ok if (prefix > node->btn_prefix || key->key_length != node->btn_length + node->btn_prefix) { #ifdef DEBUG_BTR CCH_RELEASE(tdbb, window); CORRUPT(204); /* msg 204 index inconsistent */ #endif return contents_above_threshold; } // check to make sure the node has the same value USHORT l; UCHAR *p, *q; p = node->btn_data; q = key->key_data + node->btn_prefix; if ( (l = node->btn_length) ) { do { if (*p++ != *q++) { #ifdef DEBUG_BTR CCH_RELEASE(tdbb, window); CORRUPT(204); // msg 204 index inconsistent #endif return contents_above_threshold; } } while (--l); } // ***************************************************** // AB: This becomes a very expensive task if there are // many duplicates inside the index (non-unique index)! // Therefor we also need to add the record-number to the // non-leaf pages and sort duplicates by record-number. // ***************************************************** // now look through the duplicate nodes to find the one // with matching record number ULONG pages = 0; SLONG number; while (true) { // if we find the right one, quit number = get_long(node->btn_number); if (insertion->iib_number == number) { break; } if (number == END_LEVEL) { #ifdef DEBUG_BTR CCH_RELEASE(tdbb, window); CORRUPT(204); // msg 204 index inconsistent #endif return contents_above_threshold; } // go to the next node and check that it is a duplicate if (number != END_BUCKET) { node = (BTN) (node->btn_data + node->btn_length); if (node->btn_length != 0 || node->btn_prefix != key->key_length) { #ifdef DEBUG_BTR CCH_RELEASE(tdbb, window); CORRUPT(204); // msg 204 index inconsistent #endif return contents_above_threshold; } continue; } // if we hit the end of bucket, go to the right sibling page, // and check that the first node is a duplicate ++pages; page = (BTR) CCH_HANDOFF(tdbb, window, page->btr_sibling, LCK_write, pag_index); node = page->btr_nodes; l = node->btn_length; if (l != key->key_length) { #ifdef DEBUG_BTR CCH_RELEASE(tdbb, window); CORRUPT(204); // msg 204 index inconsistent #endif return contents_above_threshold; } if (l) { p = node->btn_data; q = key->key_data; do { if (*p++ != *q++) { #ifdef DEBUG_BTR CCH_RELEASE(tdbb, window); CORRUPT(204); // msg 204 index inconsistent #endif return contents_above_threshold; } } while (--l); } #ifdef SUPERSERVER // Until deletion of duplicate nodes becomes efficient, limit // leaf level traversal by rescheduling. if (--tdbb->tdbb_quantum < 0 && !tdbb->tdbb_inhibit) { if (JRD_reschedule(tdbb, 0, FALSE)) { CCH_RELEASE(tdbb, window); ERR_punt(); } } #endif } // If we've needed to search thru a significant number of pages, warn the // cache manager in case we come back this way if (pages > 75) { CCH_expand(tdbb, pages + 25); } return delete_node(tdbb, window, node); } static bool scan(TDBB tdbb, BTN node, SBM * bitmap, USHORT prefix, KEY * key, USHORT flag) { /************************************** * * s c a n * ************************************** * * Functional description * Do an index scan. If we run over the bucket, return TRUE. If * we're completely done, return FALSE. * **************************************/ USHORT l; UCHAR *p = NULL, *q = NULL; USHORT i, count; SET_TDBB(tdbb); // if the search key is flagged to indicate a multi-segment index // stuff the key to the stuff boundary if ((flag & irb_partial) && (flag & irb_equality) && !(flag & irb_starting) && !(flag & irb_descending)) { count = STUFF_COUNT - ((key->key_length + STUFF_COUNT) % (STUFF_COUNT + 1)); for (i = 0; i < count; i++) { key->key_data[key->key_length + i] = 0; } count += key->key_length; } else { count = key->key_length; } UCHAR* end_key = key->key_data + count; count -= key->key_length; // reset irb_equality flag passed for optimization flag &= ~irb_equality; SLONG number; while (true) { number = get_long(node->btn_number); if (number == END_LEVEL) { return false; } if (node->btn_prefix <= prefix) { prefix = node->btn_prefix; p = key->key_data + prefix; q = node->btn_data; for (l = node->btn_length; l; --l, prefix++) { if (p >= end_key) { if (flag) { break; } else { return false; } } if (p > (end_key - count)) { if (*p++ == *q++) { break; } else { continue; } } if (*p < *q) { return false; } if (*p++ > *q++) { break; } } } if (number == END_BUCKET) { return true; } if ((flag & irb_starting) || !count) { SBM_set(tdbb, bitmap, number); } else if (p > (end_key - count)) { SBM_set(tdbb, bitmap, number); } node = NEXT_NODE(node); } // NOTREACHED return false; // superfluous return to shut lint up }