8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 00:03:02 +01:00

Improvement #7093 and fix for #7094.

- #7093 - Improve indexed lookup speed of strings when the last
keys characters are part of collated contractions.

- #7094 - Incorrect indexed lookup of strings when the last keys
characters are part of collated contractions and there is
multi-segment insensitive descending index.
This commit is contained in:
Adriano dos Santos Fernandes 2022-01-11 09:49:34 -03:00
parent 244d9188e8
commit 8357a9328a
15 changed files with 876 additions and 446 deletions

View File

@ -57,9 +57,15 @@ typedef USHORT (*pfn_INTL_keylength) (texttype* tt, USHORT len);
/* Types of the keys which may be returned by str2key routine */ /* Types of the keys which may be returned by str2key routine */
#define INTL_KEY_SORT 0 /* Full sort key */ #define INTL_KEY_SORT 0 /* Full sort key */
#define INTL_KEY_PARTIAL 1 /* Starting portion of sort key for equality class */ #define INTL_KEY_PARTIAL 1 /* Starting portion of sort key for equality class */
#define INTL_KEY_UNIQUE 2 /* Full key for the equality class of the string */ #define INTL_KEY_UNIQUE 2 /* Full key for the equality class of the string */
#define INTL_KEY_MULTI_STARTING 3 /* Multiple starting keys */
/* INTL_KEY_MULTI_STARTING format:
key ::= { <key_length> <key_bytes> }...
key_length ::= <key length least significant byte> <key length most significant byte>
*/
/* Returned value of INTL_BAD_KEY_LENGTH means that key error happened during /* Returned value of INTL_BAD_KEY_LENGTH means that key error happened during
key construction. When partial key is requested returned string should key construction. When partial key is requested returned string should
@ -128,6 +134,8 @@ typedef void (*pfn_INTL_tt_destroy) (texttype* tt);
(char, case, accent) which is case-insensitive, (char, case, accent) which is case-insensitive,
but accent-sensitive */ but accent-sensitive */
#define TEXTTYPE_MULTI_STARTING_KEY 8 /* Supports INTL_KEY_MULTI_STARTING */
struct texttype struct texttype
{ {

View File

@ -186,7 +186,9 @@ public:
USet* (U_EXPORT2 *usetOpen)(UChar32 start, UChar32 end); USet* (U_EXPORT2 *usetOpen)(UChar32 start, UChar32 end);
void (U_EXPORT2 *ucolClose)(UCollator* coll); void (U_EXPORT2 *ucolClose)(UCollator* coll);
int32_t (U_EXPORT2 *ucolGetContractions)(const UCollator* coll, USet* conts, UErrorCode* status); int32_t (U_EXPORT2 *ucolGetContractionsAndExpansions)(const UCollator* coll, USet* contractions, USet* expansions,
UBool addPrefixes, UErrorCode* status);
int32_t (U_EXPORT2 *ucolGetSortKey)(const UCollator* coll, const UChar* source, int32_t (U_EXPORT2 *ucolGetSortKey)(const UCollator* coll, const UChar* source,
int32_t sourceLength, uint8_t* result, int32_t resultLength); int32_t sourceLength, uint8_t* result, int32_t resultLength);
UCollator* (U_EXPORT2 *ucolOpen)(const char* loc, UErrorCode* status); UCollator* (U_EXPORT2 *ucolOpen)(const char* loc, UErrorCode* status);
@ -1034,7 +1036,8 @@ UnicodeUtil::ICU* UnicodeUtil::loadICU(const string& icuVersion, const string& c
icu->getEntryPoint("uset_open", icu->ucModule, icu->usetOpen); icu->getEntryPoint("uset_open", icu->ucModule, icu->usetOpen);
icu->getEntryPoint("ucol_close", icu->inModule, icu->ucolClose); icu->getEntryPoint("ucol_close", icu->inModule, icu->ucolClose);
icu->getEntryPoint("ucol_getContractions", icu->inModule, icu->ucolGetContractions); icu->getEntryPoint("ucol_getContractionsAndExpansions", icu->inModule,
icu->ucolGetContractionsAndExpansions);
icu->getEntryPoint("ucol_getSortKey", icu->inModule, icu->ucolGetSortKey); icu->getEntryPoint("ucol_getSortKey", icu->inModule, icu->ucolGetSortKey);
icu->getEntryPoint("ucol_open", icu->inModule, icu->ucolOpen); icu->getEntryPoint("ucol_open", icu->inModule, icu->ucolOpen);
icu->getEntryPoint("ucol_setAttribute", icu->inModule, icu->ucolSetAttribute); icu->getEntryPoint("ucol_setAttribute", icu->inModule, icu->ucolSetAttribute);
@ -1348,35 +1351,145 @@ UnicodeUtil::Utf16Collation* UnicodeUtil::Utf16Collation::create(
USet* contractions = icu->usetOpen(1, 0); USet* contractions = icu->usetOpen(1, 0);
// status not verified here. // status not verified here.
icu->ucolGetContractions(partialCollator, contractions, &status); icu->ucolGetContractionsAndExpansions(partialCollator, contractions, NULL, false, &status);
int contractionsCount = icu->usetGetItemCount(contractions); int contractionsCount = icu->usetGetItemCount(contractions);
for (int contractionIndex = 0; contractionIndex < contractionsCount; ++contractionIndex) for (int contractionIndex = 0; contractionIndex < contractionsCount; ++contractionIndex)
{ {
UChar str[10]; UChar strChars[10];
UChar32 start, end; UChar32 start, end;
status = U_ZERO_ERROR; status = U_ZERO_ERROR;
int len = icu->usetGetItem(contractions, contractionIndex, &start, &end, str, sizeof(str), &status); int len = icu->usetGetItem(contractions, contractionIndex, &start, &end, strChars, sizeof(strChars), &status);
if (len >= 2) if (len >= 2)
{ {
obj->maxContractionsPrefixLength = len - 1 > obj->maxContractionsPrefixLength ? obj->maxContractionsPrefixLength = len - 1 > obj->maxContractionsPrefixLength ?
len - 1 : obj->maxContractionsPrefixLength; len - 1 : obj->maxContractionsPrefixLength;
for (int currentLen = 1; currentLen < len; ++currentLen) UCHAR key[100];
{ int keyLen = icu->ucolGetSortKey(partialCollator, strChars, len, key, sizeof(key));
string s(reinterpret_cast<const char*>(str), currentLen * 2);
if (!obj->contractionsPrefix.exist(s)) for (int prefixLen = 1; prefixLen < len; ++prefixLen)
obj->contractionsPrefix.push(s); {
const Array<USHORT> str(strChars, prefixLen);
SortKeyArray* keySet = obj->contractionsPrefix.get(str);
if (!keySet)
{
keySet = obj->contractionsPrefix.put(str);
UCHAR prefixKey[100];
int prefixKeyLen = icu->ucolGetSortKey(partialCollator,
strChars, prefixLen, prefixKey, sizeof(prefixKey));
keySet->add(Array<UCHAR>(prefixKey, prefixKeyLen));
}
keySet->add(Array<UCHAR>(key, keyLen));
} }
} }
} }
icu->usetClose(contractions); icu->usetClose(contractions);
ContractionsPrefixMap::Accessor accessor(&obj->contractionsPrefix);
for (bool found = accessor.getFirst(); found; found = accessor.getNext())
{
SortKeyArray& keySet = accessor.current()->second;
if (keySet.getCount() <= 1)
continue;
fb_assert(accessor.current()->first.hasData());
USHORT ch = accessor.current()->first[0];
if (ch >= 0xFDD0 && ch <= 0xFDEF)
{
keySet.clear();
keySet.add(Array<UCHAR>());
continue;
}
SortKeyArray::iterator firstKeyIt = keySet.begin();
SortKeyArray::iterator lastKeyIt = --keySet.end();
const UCHAR* firstKeyDataIt = firstKeyIt->begin();
const UCHAR* lastKeyDataIt = lastKeyIt->begin();
const UCHAR* firstKeyDataEnd = firstKeyIt->end();
const UCHAR* lastKeyDataEnd = lastKeyIt->end();
if (*firstKeyDataIt == *lastKeyDataIt)
{
unsigned common = 0;
do
{
++common;
} while (++firstKeyDataIt != firstKeyDataEnd && ++lastKeyDataIt != lastKeyDataEnd &&
*firstKeyDataIt == *lastKeyDataIt);
Array<UCHAR> commonKey(firstKeyIt->begin(), common);
keySet.clear();
keySet.add(commonKey);
}
else
{
SortKeyArray::iterator secondKeyIt = ++keySet.begin();
const UCHAR* secondKeyDataIt = secondKeyIt->begin();
const UCHAR* secondKeyDataEnd = secondKeyIt->end();
ObjectsArray<Array<UCHAR> > commonKeys;
commonKeys.add(*firstKeyIt);
while (secondKeyIt != keySet.end())
{
unsigned common = 0;
while (firstKeyDataIt != firstKeyDataEnd && secondKeyDataIt != secondKeyDataEnd &&
*firstKeyDataIt == *secondKeyDataIt)
{
++common;
++firstKeyDataIt;
++secondKeyDataIt;
}
unsigned backSize = commonKeys.back()->getCount();
if (common > backSize)
commonKeys.back()->append(secondKeyIt->begin() + backSize, common - backSize);
else if (common < backSize)
{
if (common == 0)
commonKeys.push(*secondKeyIt);
else
commonKeys.back()->resize(common);
}
if (++secondKeyIt != keySet.end())
{
++firstKeyIt;
firstKeyDataIt = firstKeyIt->begin();
secondKeyDataIt = secondKeyIt->begin();
firstKeyDataEnd = firstKeyIt->end();
secondKeyDataEnd = secondKeyIt->end();
}
}
keySet.clear();
for (ObjectsArray<Array<UCHAR> >::iterator ckIt = commonKeys.begin(); ckIt != commonKeys.end(); ++ckIt)
keySet.add(*ckIt);
}
}
if (obj->maxContractionsPrefixLength)
tt->texttype_flags |= TEXTTYPE_MULTI_STARTING_KEY;
return obj; return obj;
} }
@ -1427,41 +1540,17 @@ USHORT UnicodeUtil::Utf16Collation::stringToKey(USHORT srcLen, const USHORT* src
srcLenLong = pad - src + 1; srcLenLong = pad - src + 1;
} }
if (srcLenLong == 0)
return 0;
HalfStaticArray<USHORT, BUFFER_SMALL / 2> buffer; HalfStaticArray<USHORT, BUFFER_SMALL / 2> buffer;
const UCollator* coll = NULL; const UCollator* coll = NULL;
switch (key_type) switch (key_type)
{ {
case INTL_KEY_PARTIAL: case INTL_KEY_PARTIAL:
case INTL_KEY_MULTI_STARTING:
coll = partialCollator; coll = partialCollator;
// Remove last bytes of key if they are start of a contraction
// to correctly find in the index.
for (int i = MIN(maxContractionsPrefixLength, srcLenLong); i > 0; --i)
{
if (contractionsPrefix.exist(string(reinterpret_cast<const char*>(src + srcLenLong - i), i * 2)))
{
srcLenLong -= i;
break;
}
}
if (numericSort)
{
// ASF: Wee need to remove trailing numbers to return sub key that
// matches full key. Example: "abc1" becomes "abc" to match "abc10".
const USHORT* p = src + srcLenLong - 1;
for (; p >= src; --p)
{
if (!(*p >= '0' && *p <= '9'))
break;
}
srcLenLong = p - src + 1;
}
break; break;
case INTL_KEY_UNIQUE: case INTL_KEY_UNIQUE:
@ -1480,11 +1569,102 @@ USHORT UnicodeUtil::Utf16Collation::stringToKey(USHORT srcLen, const USHORT* src
return INTL_BAD_KEY_LENGTH; return INTL_BAD_KEY_LENGTH;
} }
if (srcLenLong == 0) if (key_type == INTL_KEY_MULTI_STARTING)
return 0; {
bool trailingNumbersRemoved = false;
return icu->ucolGetSortKey(coll, if (numericSort)
{
// ASF: Wee need to remove trailing numbers to return sub key that
// matches full key. Example: "abc1" becomes "abc" to match "abc10".
const USHORT* p = src + srcLenLong - 1;
for (; p >= src; --p)
{
if (!(*p >= '0' && *p <= '9'))
break;
trailingNumbersRemoved = true;
}
srcLenLong = p - src + 1;
}
if (!trailingNumbersRemoved)
{
for (int i = MIN(maxContractionsPrefixLength, srcLenLong); i > 0; --i)
{
SortKeyArray* keys = contractionsPrefix.get(Array<USHORT>(src + srcLenLong - i, i));
if (keys)
{
const UCHAR* dstStart = dst;
ULONG prefixLen;
srcLenLong -= i;
if (srcLenLong != 0)
{
prefixLen = icu->ucolGetSortKey(coll,
reinterpret_cast<const UChar*>(src), srcLenLong, dst + 2, dstLen - 2);
if (prefixLen == 0 || prefixLen > dstLen - 2 || prefixLen > MAX_USHORT)
return INTL_BAD_KEY_LENGTH;
fb_assert(dst[2 + prefixLen - 1] == '\0');
--prefixLen;
dstLen -= 2 + prefixLen;
}
else
prefixLen = 0;
for (SortKeyArray::const_iterator keyIt = keys->begin();
keyIt != keys->end();
++keyIt)
{
const ULONG keyLen = prefixLen + keyIt->getCount();
if (keyLen > dstLen - 2 || keyLen > MAX_USHORT)
return INTL_BAD_KEY_LENGTH;
dst[0] = UCHAR(keyLen & 0xFF);
dst[1] = UCHAR(keyLen >> 8);
if (dst != dstStart)
memcpy(dst + 2, dstStart + 2, prefixLen);
memcpy(dst + 2 + prefixLen, keyIt->begin(), keyIt->getCount());
dst += 2 + keyLen;
dstLen -= 2 + keyLen;
}
return dst - dstStart;
}
}
}
ULONG keyLen = icu->ucolGetSortKey(coll,
reinterpret_cast<const UChar*>(src), srcLenLong, dst + 2, dstLen - 3);
if (keyLen == 0 || keyLen > dstLen - 3 || keyLen > MAX_USHORT)
return INTL_BAD_KEY_LENGTH;
fb_assert(dst[2 + keyLen - 1] == '\0');
--keyLen;
dst[0] = UCHAR(keyLen & 0xFF);
dst[1] = UCHAR(keyLen >> 8);
return keyLen + 2;
}
const ULONG keyLen = icu->ucolGetSortKey(coll,
reinterpret_cast<const UChar*>(src), srcLenLong, dst, dstLen); reinterpret_cast<const UChar*>(src), srcLenLong, dst, dstLen);
if (keyLen == 0 || keyLen > dstLen || keyLen > MAX_USHORT)
return INTL_BAD_KEY_LENGTH;
return keyLen;
} }

View File

@ -30,7 +30,9 @@
#include "intlobj_new.h" #include "intlobj_new.h"
#include "../common/IntlUtil.h" #include "../common/IntlUtil.h"
#include "../common/os/mod_loader.h" #include "../common/os/mod_loader.h"
#include "../common/classes/array.h"
#include "../common/classes/fb_string.h" #include "../common/classes/fb_string.h"
#include "../common/classes/GenericMap.h"
#include "../common/classes/objects_array.h" #include "../common/classes/objects_array.h"
#undef U_SHOW_CPLUSPLUS_API #undef U_SHOW_CPLUSPLUS_API
@ -184,6 +186,45 @@ public:
ULONG canonical(ULONG srcLen, const USHORT* src, ULONG dstLen, ULONG* dst, const ULONG* exceptions); ULONG canonical(ULONG srcLen, const USHORT* src, ULONG dstLen, ULONG* dst, const ULONG* exceptions);
private: private:
template <typename T>
class ArrayComparator
{
public:
static bool greaterThan(const Firebird::Array<T>& i1, const Firebird::Array<T>& i2)
{
FB_SIZE_T minCount = MIN(i1.getCount(), i2.getCount());
int cmp = memcmp(i1.begin(), i2.begin(), minCount * sizeof(T));
if (cmp != 0)
return cmp > 0;
return i1.getCount() > i2.getCount();
}
static bool greaterThan(const Firebird::Array<T>* i1, const Firebird::Array<T>* i2)
{
return greaterThan(*i1, *i2);
}
};
typedef Firebird::SortedObjectsArray<
Firebird::Array<UCHAR>,
Firebird::InlineStorage<Firebird::Array<UCHAR>*, 3>,
Firebird::Array<UCHAR>,
Firebird::DefaultKeyValue<const Firebird::Array<UCHAR>*>,
ArrayComparator<UCHAR>
> SortKeyArray;
typedef Firebird::GenericMap<
Firebird::Pair<
Firebird::Full<
Firebird::Array<USHORT>, // UTF-16 string
SortKeyArray // sort keys
>
>,
ArrayComparator<USHORT>
> ContractionsPrefixMap;
static ICU* loadICU(const Firebird::string& collVersion, const Firebird::string& locale, static ICU* loadICU(const Firebird::string& collVersion, const Firebird::string& locale,
const Firebird::string& configInfo); const Firebird::string& configInfo);
@ -196,7 +237,7 @@ public:
UCollator* compareCollator; UCollator* compareCollator;
UCollator* partialCollator; UCollator* partialCollator;
UCollator* sortCollator; UCollator* sortCollator;
Firebird::SortedObjectsArray<Firebird::string> contractionsPrefix; // UTF-16 string ContractionsPrefixMap contractionsPrefix;
unsigned maxContractionsPrefixLength; // number of characters unsigned maxContractionsPrefixLength; // number of characters
bool numericSort; bool numericSort;
}; };

View File

@ -523,8 +523,6 @@ USHORT famasc_string_to_key(texttype* obj, USHORT iInLen, const BYTE* pInChar, U
{ {
fb_assert(pOutChar != NULL); fb_assert(pOutChar != NULL);
fb_assert(pInChar != NULL); fb_assert(pInChar != NULL);
fb_assert(iInLen <= LANGASCII_MAX_KEY);
fb_assert(iOutLen <= LANGASCII_MAX_KEY);
fb_assert(iOutLen >= famasc_key_length(obj, iInLen)); fb_assert(iOutLen >= famasc_key_length(obj, iInLen));
// point inbuff at last character // point inbuff at last character

View File

@ -139,8 +139,6 @@ static USHORT LCKSC_string_to_key(texttype* obj, USHORT iInLen, const BYTE* pInC
{ {
fb_assert(pOutChar != NULL); fb_assert(pOutChar != NULL);
fb_assert(pInChar != NULL); fb_assert(pInChar != NULL);
fb_assert(iInLen <= LANGKSC_MAX_KEY);
fb_assert(iOutLen <= LANGKSC_MAX_KEY);
fb_assert(iOutLen >= LCKSC_key_length(obj, iInLen)); fb_assert(iOutLen >= LCKSC_key_length(obj, iInLen));
const BYTE* inbuff = pInChar + iInLen - 1; const BYTE* inbuff = pInChar + iInLen - 1;

View File

@ -168,8 +168,6 @@ USHORT LC_NARROW_string_to_key(texttype* obj, USHORT iInLen, const BYTE* pInChar
{ {
fb_assert(pOutChar != NULL); fb_assert(pOutChar != NULL);
fb_assert(pInChar != NULL); fb_assert(pInChar != NULL);
// fb_assert (iInLen <= LANGFAM2_MAX_KEY);
fb_assert(iOutLen <= LANGFAM2_MAX_KEY);
fb_assert(iOutLen >= LC_NARROW_key_length(obj, iInLen)); fb_assert(iOutLen >= LC_NARROW_key_length(obj, iInLen));
TextTypeImpl* impl = static_cast<TextTypeImpl*>(obj->texttype_impl); TextTypeImpl* impl = static_cast<TextTypeImpl*>(obj->texttype_impl);

View File

@ -229,7 +229,8 @@ IndexScratch::IndexScratch(MemoryPool& p, thread_db* tdbb, index_desc* ix,
lowerCount = 0; lowerCount = 0;
upperCount = 0; upperCount = 0;
nonFullMatchedSegments = 0; nonFullMatchedSegments = 0;
fuzzy = false; usePartialKey = false;
useMultiStartingKeys = false;
segments.grow(idx->idx_count); segments.grow(idx->idx_count);
@ -278,7 +279,8 @@ IndexScratch::IndexScratch(MemoryPool& p, const IndexScratch& scratch) :
lowerCount = scratch.lowerCount; lowerCount = scratch.lowerCount;
upperCount = scratch.upperCount; upperCount = scratch.upperCount;
nonFullMatchedSegments = scratch.nonFullMatchedSegments; nonFullMatchedSegments = scratch.nonFullMatchedSegments;
fuzzy = scratch.fuzzy; usePartialKey = scratch.usePartialKey;
useMultiStartingKeys = scratch.useMultiStartingKeys;
idx = scratch.idx; idx = scratch.idx;
// Allocate needed segments // Allocate needed segments
@ -987,7 +989,8 @@ void OptimizerRetrieval::getInversionCandidates(InversionCandidateList* inversio
scratch.lowerCount = 0; scratch.lowerCount = 0;
scratch.upperCount = 0; scratch.upperCount = 0;
scratch.nonFullMatchedSegments = MAX_INDEX_SEGMENTS + 1; scratch.nonFullMatchedSegments = MAX_INDEX_SEGMENTS + 1;
scratch.fuzzy = false; scratch.usePartialKey = false;
scratch.useMultiStartingKeys = false;
if (scratch.candidate) if (scratch.candidate)
{ {
@ -1003,26 +1006,35 @@ void OptimizerRetrieval::getInversionCandidates(InversionCandidateList* inversio
if (segment->scope == scope) if (segment->scope == scope)
scratch.scopeCandidate = true; scratch.scopeCandidate = true;
if (segment->scanType != segmentScanMissing && !(scratch.idx->idx_flags & idx_unique)) const USHORT iType = scratch.idx->idx_rpt[j].idx_itype;
if (iType >= idx_first_intl_string)
{ {
const USHORT iType = scratch.idx->idx_rpt[j].idx_itype; const USHORT iType = scratch.idx->idx_rpt[j].idx_itype;
TextType* textType = INTL_texttype_lookup(tdbb, INTL_INDEX_TO_TEXT(iType));
if (iType >= idx_first_intl_string) if (segment->scanType != segmentScanMissing && !(scratch.idx->idx_flags & idx_unique))
{ {
TextType* textType = INTL_texttype_lookup(tdbb, INTL_INDEX_TO_TEXT(iType));
if (textType->getFlags() & TEXTTYPE_SEPARATE_UNIQUE) if (textType->getFlags() & TEXTTYPE_SEPARATE_UNIQUE)
{ {
// ASF: Order is more precise than equivalence class. // ASF: Order is more precise than equivalence class.
// We can't use the next segments, and we'll need to use // We can't use the next segments, and we'll need to use
// INTL_KEY_PARTIAL to construct the last segment's key. // INTL_KEY_PARTIAL to construct the last segment's key.
scratch.fuzzy = true; scratch.usePartialKey = true;
} }
} }
if (segment->scanType == segmentScanStarting)
{
if (textType->getFlags() & TEXTTYPE_MULTI_STARTING_KEY)
scratch.useMultiStartingKeys = true; // use INTL_KEY_MULTI_STARTING
else
scratch.usePartialKey = true;
}
} }
// Check if this is the last usable segment // Check if this is the last usable segment
if (!scratch.fuzzy && if (!scratch.usePartialKey &&
(segment->scanType == segmentScanEqual || (segment->scanType == segmentScanEqual ||
segment->scanType == segmentScanEquivalent || segment->scanType == segmentScanEquivalent ||
segment->scanType == segmentScanMissing)) segment->scanType == segmentScanMissing))
@ -1296,9 +1308,12 @@ InversionNode* OptimizerRetrieval::makeIndexScanNode(IndexScratch* indexScratch)
retrieval->irb_generic |= irb_exclude_upper; retrieval->irb_generic |= irb_exclude_upper;
} }
if (indexScratch->fuzzy) if (indexScratch->usePartialKey)
retrieval->irb_generic |= irb_starting; // Flag the need to use INTL_KEY_PARTIAL in btr. retrieval->irb_generic |= irb_starting; // Flag the need to use INTL_KEY_PARTIAL in btr.
if (indexScratch->useMultiStartingKeys)
retrieval->irb_generic |= irb_multi_starting; // Flag the need to use INTL_KEY_MULTI_STARTING in btr.
// This index is never used for IS NULL, thus we can ignore NULLs // This index is never used for IS NULL, thus we can ignore NULLs
// already at index scan. But this rule doesn't apply to nod_equiv // already at index scan. But this rule doesn't apply to nod_equiv
// which requires NULLs to be found in the index. // which requires NULLs to be found in the index.

View File

@ -122,7 +122,8 @@ public:
int lowerCount; // int lowerCount; //
int upperCount; // int upperCount; //
int nonFullMatchedSegments; // int nonFullMatchedSegments; //
bool fuzzy; // Need to use INTL_KEY_PARTIAL in btr lookups bool usePartialKey; // Use INTL_KEY_PARTIAL
bool useMultiStartingKeys; // Use INTL_KEY_MULTI_STARTING
double cardinality; // Estimated cardinality when using the whole index double cardinality; // Estimated cardinality when using the whole index
Firebird::Array<IndexScratchSegment*> segments; Firebird::Array<IndexScratchSegment*> segments;

View File

@ -369,10 +369,13 @@ void BTR_complement_key(temporary_key* key)
* Negate a key for descending index. * Negate a key for descending index.
* *
**************************************/ **************************************/
UCHAR* p = key->key_data; do
for (const UCHAR* const end = p + key->key_length; p < end; p++) { {
*p ^= -1; UCHAR* p = key->key_data;
} for (const UCHAR* const end = p + key->key_length; p < end; p++) {
*p ^= -1;
}
} while (key = key->key_next.get());
} }
@ -679,152 +682,161 @@ void BTR_evaluate(thread_db* tdbb, const IndexRetrieval* retrieval, RecordBitmap
index_desc idx; index_desc idx;
RelationPages* relPages = retrieval->irb_relation->getPages(tdbb); RelationPages* relPages = retrieval->irb_relation->getPages(tdbb);
WIN window(relPages->rel_pg_space_id, -1); WIN window(relPages->rel_pg_space_id, -1);
temporary_key lower, upper; temporary_key lowerKey, upperKey;
lower.key_flags = 0; lowerKey.key_flags = 0;
lower.key_length = 0; lowerKey.key_length = 0;
upper.key_flags = 0; upperKey.key_flags = 0;
upper.key_length = 0; upperKey.key_length = 0;
btree_page* page = BTR_find_page(tdbb, retrieval, &window, &idx, &lower, &upper);
const bool descending = (idx.idx_flags & idx_descending); temporary_key* lower = &lowerKey;
bool skipLowerKey = (retrieval->irb_generic & irb_exclude_lower); temporary_key* upper = &upperKey;
const bool partLower = (retrieval->irb_lower_count < idx.idx_count); bool first = true;
// If there is a starting descriptor, search down index to starting position. do
// This may involve sibling buckets if splits are in progress. If there
// isn't a starting descriptor, walk down the left side of the index.
USHORT prefix;
UCHAR* pointer;
if (retrieval->irb_lower_count)
{ {
while (!(pointer = find_node_start_point(page, &lower, 0, &prefix, btree_page* page = BTR_find_page(tdbb, retrieval, &window, &idx, lower, upper, first);
idx.idx_flags & idx_descending, (retrieval->irb_generic & (irb_starting | irb_partial))))) first = false;
{
page = (btree_page*) CCH_HANDOFF(tdbb, &window, page->btr_sibling, LCK_read, pag_index);
}
// Compute the number of matching characters in lower and upper bounds const bool descending = (idx.idx_flags & idx_descending);
if (retrieval->irb_upper_count) bool skipLowerKey = (retrieval->irb_generic & irb_exclude_lower);
{ const bool partLower = (retrieval->irb_lower_count < idx.idx_count);
prefix = IndexNode::computePrefix(upper.key_data, upper.key_length,
lower.key_data, lower.key_length);
}
if (skipLowerKey) // If there is a starting descriptor, search down index to starting position.
// This may involve sibling buckets if splits are in progress. If there
// isn't a starting descriptor, walk down the left side of the index.
USHORT prefix;
UCHAR* pointer;
if (retrieval->irb_lower_count)
{ {
IndexNode node; while (!(pointer = find_node_start_point(page, lower, 0, &prefix,
node.readNode(pointer, true); idx.idx_flags & idx_descending, (retrieval->irb_generic & (irb_starting | irb_partial)))))
if ((lower.key_length == node.prefix + node.length) ||
(lower.key_length <= node.prefix + node.length) && partLower)
{ {
const UCHAR* p = node.data, *q = lower.key_data + node.prefix; page = (btree_page*) CCH_HANDOFF(tdbb, &window, page->btr_sibling, LCK_read, pag_index);
const UCHAR* const end = lower.key_data + lower.key_length;
while (q < end)
{
if (*p++ != *q++)
{
skipLowerKey = false;
break;
}
}
if ((q >= end) && (p < node.data + node.length) && skipLowerKey && partLower)
{
// since key length always is multiplier of (STUFF_COUNT + 1) (for partial
// compound keys) and we passed lower key completely then p pointed
// us to the next segment number and we can use this fact to calculate
// how many segments is equal to lower key
const USHORT segnum = idx.idx_count - (UCHAR) (descending ? ((*p) ^ -1) : *p);
if (segnum < retrieval->irb_lower_count) {
skipLowerKey = false;
}
}
} }
else {
skipLowerKey = false; // Compute the number of matching characters in lower and upper bounds
if (retrieval->irb_upper_count)
{
prefix = IndexNode::computePrefix(upper->key_data, upper->key_length,
lower->key_data, lower->key_length);
}
if (skipLowerKey)
{
IndexNode node;
node.readNode(pointer, true);
if ((lower->key_length == node.prefix + node.length) ||
(lower->key_length <= node.prefix + node.length) && partLower)
{
const UCHAR* p = node.data, *q = lower->key_data + node.prefix;
const UCHAR* const end = lower->key_data + lower->key_length;
while (q < end)
{
if (*p++ != *q++)
{
skipLowerKey = false;
break;
}
}
if ((q >= end) && (p < node.data + node.length) && skipLowerKey && partLower)
{
// since key length always is multiplier of (STUFF_COUNT + 1) (for partial
// compound keys) and we passed lower key completely then p pointed
// us to the next segment number and we can use this fact to calculate
// how many segments is equal to lower key
const USHORT segnum = idx.idx_count - (UCHAR) (descending ? ((*p) ^ -1) : *p);
if (segnum < retrieval->irb_lower_count) {
skipLowerKey = false;
}
}
}
else {
skipLowerKey = false;
}
} }
} }
} else
else
{
pointer = page->btr_nodes + page->btr_jump_size;
prefix = 0;
skipLowerKey = false;
}
// if there is an upper bound, scan the index pages looking for it
if (retrieval->irb_upper_count)
{
while (scan(tdbb, pointer, bitmap, bitmap_and, &idx, retrieval, prefix, &upper,
skipLowerKey, lower))
{ {
page = (btree_page*) CCH_HANDOFF(tdbb, &window, page->btr_sibling, LCK_read, pag_index);
pointer = page->btr_nodes + page->btr_jump_size; pointer = page->btr_nodes + page->btr_jump_size;
prefix = 0; prefix = 0;
} skipLowerKey = false;
}
else
{
// if there isn't an upper bound, just walk the index to the end of the level
const UCHAR* endPointer = (UCHAR*)page + page->btr_length;
const bool ignoreNulls =
(retrieval->irb_generic & irb_ignore_null_value_key) && (idx.idx_count == 1);
IndexNode node;
pointer = node.readNode(pointer, true);
// Check if pointer is still valid
if (pointer > endPointer) {
BUGCHECK(204); // msg 204 index inconsistent
} }
while (true) // if there is an upper bound, scan the index pages looking for it
if (retrieval->irb_upper_count)
{ {
if (node.isEndLevel) { while (scan(tdbb, pointer, bitmap, bitmap_and, &idx, retrieval, prefix, upper,
break; skipLowerKey, *lower))
}
if (!node.isEndBucket)
{ {
// If we're walking in a descending index and we need to ignore NULLs page = (btree_page*) CCH_HANDOFF(tdbb, &window, page->btr_sibling, LCK_read, pag_index);
// then stop at the first NULL we see (only for single segment!) pointer = page->btr_nodes + page->btr_jump_size;
if (descending && ignoreNulls && node.prefix == 0 && prefix = 0;
node.length >= 1 && node.data[0] == 255)
{
break;
}
if (skipLowerKey)
{
checkForLowerKeySkip(skipLowerKey, partLower, node, lower, idx, retrieval);
}
if (!skipLowerKey)
{
if (!bitmap_and || bitmap_and->test(node.recordNumber.getValue()))
RBM_SET(tdbb->getDefaultPool(), bitmap, node.recordNumber.getValue());
}
pointer = node.readNode(pointer, true);
// Check if pointer is still valid
if (pointer > endPointer) {
BUGCHECK(204); // msg 204 index inconsistent
}
continue;
} }
}
else
{
// if there isn't an upper bound, just walk the index to the end of the level
const UCHAR* endPointer = (UCHAR*)page + page->btr_length;
const bool ignoreNulls =
(retrieval->irb_generic & irb_ignore_null_value_key) && (idx.idx_count == 1);
page = (btree_page*) CCH_HANDOFF(tdbb, &window, page->btr_sibling, LCK_read, pag_index); IndexNode node;
endPointer = (UCHAR*) page + page->btr_length;
pointer = page->btr_nodes + page->btr_jump_size;
pointer = node.readNode(pointer, true); pointer = node.readNode(pointer, true);
// Check if pointer is still valid // Check if pointer is still valid
if (pointer > endPointer) { if (pointer > endPointer) {
BUGCHECK(204); // msg 204 index inconsistent BUGCHECK(204); // msg 204 index inconsistent
} }
}
}
CCH_RELEASE(tdbb, &window); while (true)
{
if (node.isEndLevel) {
break;
}
if (!node.isEndBucket)
{
// If we're walking in a descending index and we need to ignore NULLs
// then stop at the first NULL we see (only for single segment!)
if (descending && ignoreNulls && node.prefix == 0 &&
node.length >= 1 && node.data[0] == 255)
{
break;
}
if (skipLowerKey)
{
checkForLowerKeySkip(skipLowerKey, partLower, node, *lower, idx, retrieval);
}
if (!skipLowerKey)
{
if (!bitmap_and || bitmap_and->test(node.recordNumber.getValue()))
RBM_SET(tdbb->getDefaultPool(), bitmap, node.recordNumber.getValue());
}
pointer = node.readNode(pointer, true);
// Check if pointer is still valid
if (pointer > endPointer) {
BUGCHECK(204); // msg 204 index inconsistent
}
continue;
}
page = (btree_page*) CCH_HANDOFF(tdbb, &window, page->btr_sibling, LCK_read, pag_index);
endPointer = (UCHAR*) page + page->btr_length;
pointer = page->btr_nodes + page->btr_jump_size;
pointer = node.readNode(pointer, true);
// Check if pointer is still valid
if (pointer > endPointer) {
BUGCHECK(204); // msg 204 index inconsistent
}
}
}
CCH_RELEASE(tdbb, &window);
} while ((lower = lower->key_next.get()) && (upper = upper->key_next.get()));
} }
@ -852,7 +864,8 @@ btree_page* BTR_find_page(thread_db* tdbb,
WIN* window, WIN* window,
index_desc* idx, index_desc* idx,
temporary_key* lower, temporary_key* lower,
temporary_key* upper) temporary_key* upper,
bool makeKeys)
{ {
/************************************** /**************************************
* *
@ -872,19 +885,26 @@ btree_page* BTR_find_page(thread_db* tdbb,
// are looking for an equality // are looking for an equality
if (retrieval->irb_key) if (retrieval->irb_key)
{ {
fb_assert(makeKeys);
copy_key(retrieval->irb_key, lower); copy_key(retrieval->irb_key, lower);
copy_key(retrieval->irb_key, upper); copy_key(retrieval->irb_key, upper);
} }
else else if (makeKeys)
{ {
idx_e errorCode = idx_e_ok; idx_e errorCode = idx_e_ok;
const USHORT keyType =
(retrieval->irb_generic & irb_multi_starting) ? INTL_KEY_MULTI_STARTING :
(retrieval->irb_generic & irb_starting) ? INTL_KEY_PARTIAL :
(retrieval->irb_desc.idx_flags & idx_unique) ? INTL_KEY_UNIQUE :
INTL_KEY_SORT;
if (retrieval->irb_upper_count) if (retrieval->irb_upper_count)
{ {
errorCode = BTR_make_key(tdbb, retrieval->irb_upper_count, errorCode = BTR_make_key(tdbb, retrieval->irb_upper_count,
retrieval->irb_value + retrieval->irb_desc.idx_count, retrieval->irb_value + retrieval->irb_desc.idx_count,
&retrieval->irb_desc, upper, &retrieval->irb_desc, upper,
(retrieval->irb_generic & irb_starting) != 0); keyType);
} }
if (errorCode == idx_e_ok) if (errorCode == idx_e_ok)
@ -893,7 +913,7 @@ btree_page* BTR_find_page(thread_db* tdbb,
{ {
errorCode = BTR_make_key(tdbb, retrieval->irb_lower_count, errorCode = BTR_make_key(tdbb, retrieval->irb_lower_count,
retrieval->irb_value, &retrieval->irb_desc, lower, retrieval->irb_value, &retrieval->irb_desc, lower,
(retrieval->irb_generic & irb_starting) != 0); keyType);
} }
} }
@ -1071,7 +1091,11 @@ void BTR_insert(thread_db* tdbb, WIN* root_window, index_insertion* insertion)
window.win_page = root->irt_rpt[idx->idx_id].getRoot(); window.win_page = root->irt_rpt[idx->idx_id].getRoot();
bucket = (btree_page*) CCH_FETCH(tdbb, &window, LCK_write, pag_index); bucket = (btree_page*) CCH_FETCH(tdbb, &window, LCK_write, pag_index);
key = ret_key; key.key_length = ret_key.key_length;
memcpy(key.key_data, ret_key.key_data, ret_key.key_length);
key.key_flags = ret_key.key_flags;
key.key_nulls = ret_key.key_nulls;
key.key_next.reset(ret_key.key_next.release());
} }
// the original page was marked as not garbage-collectable, but // the original page was marked as not garbage-collectable, but
@ -1160,7 +1184,7 @@ void BTR_insert(thread_db* tdbb, WIN* root_window, index_insertion* insertion)
idx_e BTR_key(thread_db* tdbb, jrd_rel* relation, Record* record, index_desc* idx, idx_e BTR_key(thread_db* tdbb, jrd_rel* relation, Record* record, index_desc* idx,
temporary_key* key, const bool fuzzy, USHORT count) temporary_key* key, const USHORT keyType, USHORT count)
{ {
/************************************** /**************************************
* *
@ -1197,11 +1221,6 @@ idx_e BTR_key(thread_db* tdbb, jrd_rel* relation, Record* record, index_desc* id
const USHORT maxKeyLength = dbb->getMaxIndexKeyLength(); const USHORT maxKeyLength = dbb->getMaxIndexKeyLength();
try { try {
const USHORT keyType = fuzzy ?
INTL_KEY_PARTIAL :
((idx->idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT);
// Special case single segment indices // Special case single segment indices
if (idx->idx_count == 1) if (idx->idx_count == 1)
@ -1471,7 +1490,7 @@ idx_e BTR_make_key(thread_db* tdbb,
const ValueExprNode* const* exprs, const ValueExprNode* const* exprs,
const index_desc* idx, const index_desc* idx,
temporary_key* key, temporary_key* key,
bool fuzzy) USHORT keyType)
{ {
/************************************** /**************************************
* *
@ -1500,13 +1519,11 @@ idx_e BTR_make_key(thread_db* tdbb,
key->key_flags = 0; key->key_flags = 0;
key->key_nulls = 0; key->key_nulls = 0;
const bool fuzzy = keyType == INTL_KEY_PARTIAL || keyType == INTL_KEY_MULTI_STARTING;
const bool descending = (idx->idx_flags & idx_descending); const bool descending = (idx->idx_flags & idx_descending);
const index_desc::idx_repeat* tail = idx->idx_rpt; const index_desc::idx_repeat* tail = idx->idx_rpt;
const USHORT keyType = fuzzy ?
INTL_KEY_PARTIAL : ((idx->idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT);
const USHORT maxKeyLength = dbb->getMaxIndexKeyLength(); const USHORT maxKeyLength = dbb->getMaxIndexKeyLength();
// If the index is a single segment index, don't sweat the compound stuff // If the index is a single segment index, don't sweat the compound stuff
@ -1522,7 +1539,10 @@ idx_e BTR_make_key(thread_db* tdbb,
compress(tdbb, desc, key, tail->idx_itype, isNull, descending, keyType); compress(tdbb, desc, key, tail->idx_itype, isNull, descending, keyType);
if (fuzzy && (key->key_flags & key_empty)) if (fuzzy && (key->key_flags & key_empty))
{
key->key_length = 0; key->key_length = 0;
key->key_next.reset();
}
} }
else else
{ {
@ -1559,37 +1579,64 @@ idx_e BTR_make_key(thread_db* tdbb,
prior_length = (p - key->key_data); prior_length = (p - key->key_data);
const UCHAR* q = temp.key_data; fb_assert(n == count - 1 || !temp.key_next);
for (USHORT l = temp.key_length; l; --l, --stuff_count)
{
if (stuff_count == 0)
{
*p++ = idx->idx_count - n;
stuff_count = STUFF_COUNT;
if (p - key->key_data >= maxKeyLength) SSHORT save_stuff_count = stuff_count;
temporary_key* current_key = key;
temporary_key* temp_ptr = &temp;
do
{
const UCHAR* q = temp_ptr->key_data;
for (USHORT l = temp_ptr->key_length; l; --l, --stuff_count)
{
if (stuff_count == 0)
{
*p++ = idx->idx_count - n;
stuff_count = STUFF_COUNT;
if (p - current_key->key_data >= maxKeyLength)
return idx_e_keytoobig;
}
*p++ = *q++;
if (p - current_key->key_data >= maxKeyLength)
return idx_e_keytoobig; return idx_e_keytoobig;
} }
*p++ = *q++; // AB: Fix bug SF #1242982
// Equality search on first segment (integer) in compound indexes resulted
// in more scans on specific values (2^n, f.e. 131072) than needed.
if (!fuzzy && count != idx->idx_count && n == count - 1)
{
for (; stuff_count; --stuff_count)
{
*p++ = 0;
if (p - key->key_data >= maxKeyLength) if (p - current_key->key_data >= maxKeyLength)
return idx_e_keytoobig; return idx_e_keytoobig;
} }
} }
// AB: Fix bug SF #1242982 current_key->key_length = p - current_key->key_data;
// Equality search on first segment (integer) in compound indexes resulted
// in more scans on specific values (2^n, f.e. 131072) than needed.
if (!fuzzy && (n != idx->idx_count))
{
for (; stuff_count; --stuff_count)
{
*p++ = 0;
if (p - key->key_data >= maxKeyLength) if ((temp_ptr = temp_ptr->key_next.get()))
return idx_e_keytoobig; {
} temporary_key* next_key = FB_NEW_POOL(*tdbb->getDefaultPool()) temporary_key();
next_key->key_length = 0;
next_key->key_flags = key->key_flags;
next_key->key_nulls = key->key_nulls;
memcpy(next_key->key_data, key->key_data, prior_length);
current_key->key_next = next_key;
current_key = next_key;
p = current_key->key_data + prior_length;
stuff_count = save_stuff_count;
}
} while (temp_ptr);
} }
// dimitr: If the search is fuzzy and the last segment is empty, // dimitr: If the search is fuzzy and the last segment is empty,
@ -1597,8 +1644,6 @@ idx_e BTR_make_key(thread_db* tdbb,
// the rule that every string starts with an empty string. // the rule that every string starts with an empty string.
if (fuzzy && (temp.key_flags & key_empty)) if (fuzzy && (temp.key_flags & key_empty))
key->key_length = prior_length; key->key_length = prior_length;
else
key->key_length = (p - key->key_data);
if (is_key_empty) if (is_key_empty)
{ {
@ -1659,7 +1704,7 @@ void BTR_make_null_key(thread_db* tdbb, const index_desc* idx, temporary_key* ke
// If the index is a single segment index, don't sweat the compound stuff // If the index is a single segment index, don't sweat the compound stuff
if ((idx->idx_count == 1) || (idx->idx_flags & idx_expressn)) if ((idx->idx_count == 1) || (idx->idx_flags & idx_expressn))
{ {
compress(tdbb, &null_desc, key, tail->idx_itype, true, descending, false); compress(tdbb, &null_desc, key, tail->idx_itype, true, descending, INTL_KEY_SORT);
} }
else else
{ {
@ -1673,7 +1718,7 @@ void BTR_make_null_key(thread_db* tdbb, const index_desc* idx, temporary_key* ke
for (; stuff_count; --stuff_count) for (; stuff_count; --stuff_count)
*p++ = 0; *p++ = 0;
compress(tdbb, &null_desc, &temp, tail->idx_itype, true, descending, false); compress(tdbb, &null_desc, &temp, tail->idx_itype, true, descending, INTL_KEY_SORT);
const UCHAR* q = temp.key_data; const UCHAR* q = temp.key_data;
for (USHORT l = temp.key_length; l; --l, --stuff_count) for (USHORT l = temp.key_length; l; --l, --stuff_count)
@ -2401,16 +2446,24 @@ static void compress(thread_db* tdbb,
* Compress a data value into an index key. * Compress a data value into an index key.
* *
**************************************/ **************************************/
union { if (isNull)
INT64_KEY temp_int64_key; {
double temp_double; const UCHAR pad = 0;
ULONG temp_ulong; key->key_flags &= ~key_empty;
SLONG temp_slong; // AB: NULL should be threated as lowest value possible.
SINT64 temp_sint64; // Therefore don't complement pad when we have an ascending index.
UCHAR temp_char[sizeof(INT64_KEY)]; if (descending)
} temp; {
bool temp_is_negative = false; // DESC NULLs are stored as 1 byte
bool int64_key_op = false; key->key_data[0] = pad;
key->key_length = 1;
}
else
key->key_length = 0; // ASC NULLs are stored with no data
fb_assert(!key->key_next);
return;
}
// For descending index and new index structure we insert 0xFE at the beginning. // For descending index and new index structure we insert 0xFE at the beginning.
// This is only done for values which begin with 0xFE (254) or 0xFF (255) and // This is only done for values which begin with 0xFE (254) or 0xFF (255) and
@ -2421,90 +2474,145 @@ static void compress(thread_db* tdbb,
const UCHAR desc_end_value_check = 0x00; // ~0xFF; const UCHAR desc_end_value_check = 0x00; // ~0xFF;
const Database* dbb = tdbb->getDatabase(); const Database* dbb = tdbb->getDatabase();
bool first_key = true;
VaryStr<MAX_KEY * 4> buffer;
size_t multiKeyLength;
UCHAR* ptr;
UCHAR* p = key->key_data; UCHAR* p = key->key_data;
if (isNull)
{
const UCHAR pad = 0;
key->key_flags &= ~key_empty;
// AB: NULL should be threated as lowest value possible.
// Therefore don't complement pad when we have an ascending index.
if (descending)
{
// DESC NULLs are stored as 1 byte
*p++ = pad;
key->key_length = (p - key->key_data);
}
else
key->key_length = 0; // ASC NULLs are stored with no data
return;
}
if (itype == idx_string || itype == idx_byte_array || itype == idx_metadata || if (itype == idx_string || itype == idx_byte_array || itype == idx_metadata ||
itype >= idx_first_intl_string) itype >= idx_first_intl_string)
{ {
VaryStr<MAX_KEY> buffer; temporary_key* root_key = key;
const UCHAR pad = (itype == idx_string) ? ' ' : 0; bool has_next;
UCHAR* ptr;
size_t length; do
if (itype >= idx_first_intl_string || itype == idx_metadata)
{ {
DSC to; size_t length;
// convert to an international byte array has_next = false;
to.dsc_dtype = dtype_text;
to.dsc_flags = 0;
to.dsc_sub_type = 0;
to.dsc_scale = 0;
to.dsc_ttype() = ttype_sort_key;
to.dsc_length = MIN(MAX_KEY, sizeof(buffer));
ptr = to.dsc_address = reinterpret_cast<UCHAR*>(buffer.vary_string);
length = INTL_string_to_key(tdbb, itype, desc, &to, key_type);
}
else
length = MOV_get_string(desc, &ptr, &buffer, MAX_KEY);
if (length) if (first_key)
{
// clear key_empty flag, because length is >= 1
key->key_flags &= ~key_empty;
if (length > sizeof(key->key_data))
length = sizeof(key->key_data);
if (descending && ((*ptr == desc_end_value_prefix) || (*ptr == desc_end_value_check)))
{ {
*p++ = desc_end_value_prefix; first_key = false;
if ((length + 1) > sizeof(key->key_data))
length = sizeof(key->key_data) - 1; if (itype >= idx_first_intl_string || itype == idx_metadata)
{
DSC to;
// convert to an international byte array
to.dsc_dtype = dtype_text;
to.dsc_flags = 0;
to.dsc_sub_type = 0;
to.dsc_scale = 0;
to.dsc_ttype() = ttype_sort_key;
to.dsc_length = MAX_KEY * 4;
ptr = to.dsc_address = reinterpret_cast<UCHAR*>(buffer.vary_string);
multiKeyLength = length = INTL_string_to_key(tdbb, itype, desc, &to, key_type);
}
else
length = MOV_get_string(desc, &ptr, &buffer, MAX_KEY);
} }
memcpy(p, ptr, length); if (key_type == INTL_KEY_MULTI_STARTING && multiKeyLength != 0)
p += length; {
} fb_assert(ptr < (UCHAR*) buffer.vary_string + multiKeyLength);
else
{
// Leave key_empty flag, because the string is an empty string
if (descending && ((pad == desc_end_value_prefix) || (pad == desc_end_value_check)))
*p++ = desc_end_value_prefix;
*p++ = pad; length = ptr[0] + ptr[1] * 256;
} ptr += 2;
while (p > key->key_data) has_next = ptr + length < (UCHAR*) buffer.vary_string + multiKeyLength;
{
if (*--p != pad) if (descending)
break; {
} if (has_next)
{
temporary_key* new_key = FB_NEW_POOL(*tdbb->getDefaultPool()) temporary_key();
new_key->key_length = 0;
new_key->key_flags = 0;
new_key->key_nulls = 0;
new_key->key_next = key == root_key ? NULL : key;
key = new_key;
}
else if (key != root_key)
{
root_key->key_next = key;
key = root_key;
}
p = key->key_data;
}
}
const UCHAR pad = (itype == idx_string) ? ' ' : 0;
if (length)
{
// clear key_empty flag, because length is >= 1
key->key_flags &= ~key_empty;
if (length > sizeof(key->key_data))
length = sizeof(key->key_data);
if (descending && ((*ptr == desc_end_value_prefix) || (*ptr == desc_end_value_check)))
{
*p++ = desc_end_value_prefix;
if ((length + 1) > sizeof(key->key_data))
length = sizeof(key->key_data) - 1;
}
memcpy(p, ptr, length);
p += length;
}
else
{
// Leave key_empty flag, because the string is an empty string
if (descending && ((pad == desc_end_value_prefix) || (pad == desc_end_value_check)))
*p++ = desc_end_value_prefix;
*p++ = pad;
}
while (p > key->key_data)
{
if (*--p != pad)
break;
}
key->key_length = p + 1 - key->key_data;
if (has_next && !descending)
{
temporary_key* new_key = FB_NEW_POOL(*tdbb->getDefaultPool()) temporary_key();
new_key->key_length = 0;
new_key->key_flags = 0;
new_key->key_nulls = 0;
key->key_next = new_key;
key = new_key;
p = key->key_data;
}
ptr += length;
} while (has_next);
key->key_length = p + 1 - key->key_data;
return; return;
} }
p = key->key_data;
union {
INT64_KEY temp_int64_key;
double temp_double;
ULONG temp_ulong;
SLONG temp_slong;
SINT64 temp_sint64;
UCHAR temp_char[sizeof(INT64_KEY)];
} temp;
bool temp_is_negative = false;
bool int64_key_op = false;
// The index is numeric. // The index is numeric.
// For idx_numeric... // For idx_numeric...
// Convert the value to a double precision number, // Convert the value to a double precision number,
@ -3408,9 +3516,9 @@ static ULONG fast_load(thread_db* tdbb,
// Detect the case when set of duplicate keys contains more then one key // Detect the case when set of duplicate keys contains more then one key
// from primary record version. It breaks the unique constraint and must // from primary record version. It breaks the unique constraint and must
// be rejected. Note, it is not always could be detected while sorting. // be rejected. Note, it is not always could be detected while sorting.
// Set to true when primary record version is found in current set of // Set to true when primary record version is found in current set of
// duplicate keys. // duplicate keys.
bool primarySeen = false; bool primarySeen = false;
while (!error) while (!error)

View File

@ -151,6 +151,7 @@ struct temporary_key
UCHAR key_flags; UCHAR key_flags;
USHORT key_nulls; // bitmap of encountered null segments, USHORT key_nulls; // bitmap of encountered null segments,
// USHORT is enough to store MAX_INDEX_SEGMENTS bits // USHORT is enough to store MAX_INDEX_SEGMENTS bits
Firebird::AutoPtr<temporary_key> key_next; // next key (INTL_KEY_MULTI_STARTING)
}; };
@ -224,6 +225,7 @@ const int irb_ignore_null_value_key = 8; // if lower bound is specified and upp
const int irb_descending = 16; // Base index uses descending order const int irb_descending = 16; // Base index uses descending order
const int irb_exclude_lower = 32; // exclude lower bound keys while scanning index const int irb_exclude_lower = 32; // exclude lower bound keys while scanning index
const int irb_exclude_upper = 64; // exclude upper bound keys while scanning index const int irb_exclude_upper = 64; // exclude upper bound keys while scanning index
const int irb_multi_starting = 128; // Use INTL_KEY_MULTI_STARTING
typedef Firebird::HalfStaticArray<float, 4> SelectivityList; typedef Firebird::HalfStaticArray<float, 4> SelectivityList;

View File

@ -38,15 +38,15 @@ DSC* BTR_eval_expression(Jrd::thread_db*, Jrd::index_desc*, Jrd::Record*, bool&)
void BTR_evaluate(Jrd::thread_db*, const Jrd::IndexRetrieval*, Jrd::RecordBitmap**, Jrd::RecordBitmap*); void BTR_evaluate(Jrd::thread_db*, const Jrd::IndexRetrieval*, Jrd::RecordBitmap**, Jrd::RecordBitmap*);
UCHAR* BTR_find_leaf(Ods::btree_page*, Jrd::temporary_key*, UCHAR*, USHORT*, bool, bool); UCHAR* BTR_find_leaf(Ods::btree_page*, Jrd::temporary_key*, UCHAR*, USHORT*, bool, bool);
Ods::btree_page* BTR_find_page(Jrd::thread_db*, const Jrd::IndexRetrieval*, Jrd::win*, Jrd::index_desc*, Ods::btree_page* BTR_find_page(Jrd::thread_db*, const Jrd::IndexRetrieval*, Jrd::win*, Jrd::index_desc*,
Jrd::temporary_key*, Jrd::temporary_key*); Jrd::temporary_key*, Jrd::temporary_key*, bool = true);
void BTR_insert(Jrd::thread_db*, Jrd::win*, Jrd::index_insertion*); void BTR_insert(Jrd::thread_db*, Jrd::win*, Jrd::index_insertion*);
Jrd::idx_e BTR_key(Jrd::thread_db*, Jrd::jrd_rel*, Jrd::Record*, Jrd::index_desc*, Jrd::temporary_key*, Jrd::idx_e BTR_key(Jrd::thread_db*, Jrd::jrd_rel*, Jrd::Record*, Jrd::index_desc*, Jrd::temporary_key*,
const bool, USHORT = 0); const USHORT, USHORT = 0);
USHORT BTR_key_length(Jrd::thread_db*, Jrd::jrd_rel*, Jrd::index_desc*); USHORT BTR_key_length(Jrd::thread_db*, Jrd::jrd_rel*, Jrd::index_desc*);
Ods::btree_page* BTR_left_handoff(Jrd::thread_db*, Jrd::win*, Ods::btree_page*, SSHORT); Ods::btree_page* BTR_left_handoff(Jrd::thread_db*, Jrd::win*, Ods::btree_page*, SSHORT);
bool BTR_lookup(Jrd::thread_db*, Jrd::jrd_rel*, USHORT, Jrd::index_desc*, Jrd::RelationPages*); bool BTR_lookup(Jrd::thread_db*, Jrd::jrd_rel*, USHORT, Jrd::index_desc*, Jrd::RelationPages*);
Jrd::idx_e BTR_make_key(Jrd::thread_db*, USHORT, const Jrd::ValueExprNode* const*, const Jrd::index_desc*, Jrd::idx_e BTR_make_key(Jrd::thread_db*, USHORT, const Jrd::ValueExprNode* const*, const Jrd::index_desc*,
Jrd::temporary_key*, bool); Jrd::temporary_key*, USHORT);
void BTR_make_null_key(Jrd::thread_db*, const Jrd::index_desc*, Jrd::temporary_key*); void BTR_make_null_key(Jrd::thread_db*, const Jrd::index_desc*, Jrd::temporary_key*);
bool BTR_next_index(Jrd::thread_db*, Jrd::jrd_rel*, Jrd::jrd_tra*, Jrd::index_desc*, Jrd::win*); bool BTR_next_index(Jrd::thread_db*, Jrd::jrd_rel*, Jrd::jrd_tra*, Jrd::index_desc*, Jrd::win*);
void BTR_remove(Jrd::thread_db*, Jrd::win*, Jrd::index_insertion*); void BTR_remove(Jrd::thread_db*, Jrd::win*, Jrd::index_insertion*);

View File

@ -208,7 +208,7 @@ bool IDX_check_master_types(thread_db* tdbb, index_desc& idx, jrd_rel* partner_r
// get the description of the partner index // get the description of the partner index
const bool ok = BTR_description(tdbb, partner_relation, root, &partner_idx, idx.idx_primary_index); const bool ok = BTR_description(tdbb, partner_relation, root, &partner_idx, idx.idx_primary_index);
CCH_RELEASE(tdbb, &window); CCH_RELEASE(tdbb, &window);
if (!ok) if (!ok)
BUGCHECK(175); // msg 175 partner index description not found BUGCHECK(175); // msg 175 partner index description not found
@ -399,7 +399,8 @@ void IDX_create_index(thread_db* tdbb,
{ {
Record* record = stack.pop(); Record* record = stack.pop();
result = BTR_key(tdbb, relation, record, idx, &key, false); result = BTR_key(tdbb, relation, record, idx, &key,
((idx->idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT));
if (result == idx_e_ok) if (result == idx_e_ok)
{ {
@ -743,7 +744,9 @@ void IDX_garbage_collect(thread_db* tdbb, record_param* rpb, RecordStack& going,
{ {
Record* const rec1 = stack1.object(); Record* const rec1 = stack1.object();
idx_e result = BTR_key(tdbb, rpb->rpb_relation, rec1, &idx, &key1, false); idx_e result = BTR_key(tdbb, rpb->rpb_relation, rec1, &idx, &key1,
((idx.idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT));
if (result != idx_e_ok) if (result != idx_e_ok)
{ {
if (result == idx_e_conversion) if (result == idx_e_conversion)
@ -760,7 +763,9 @@ void IDX_garbage_collect(thread_db* tdbb, record_param* rpb, RecordStack& going,
{ {
Record* const rec2 = stack2.object(); Record* const rec2 = stack2.object();
result = BTR_key(tdbb, rpb->rpb_relation, rec2, &idx, &key2, false); result = BTR_key(tdbb, rpb->rpb_relation, rec2, &idx, &key2,
((idx.idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT));
if (result != idx_e_ok) if (result != idx_e_ok)
{ {
if (result == idx_e_conversion) if (result == idx_e_conversion)
@ -783,7 +788,9 @@ void IDX_garbage_collect(thread_db* tdbb, record_param* rpb, RecordStack& going,
{ {
Record* const rec3 = stack3.object(); Record* const rec3 = stack3.object();
result = BTR_key(tdbb, rpb->rpb_relation, rec3, &idx, &key2, false); result = BTR_key(tdbb, rpb->rpb_relation, rec3, &idx, &key2,
((idx.idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT));
if (result != idx_e_ok) if (result != idx_e_ok)
{ {
if (result == idx_e_conversion) if (result == idx_e_conversion)
@ -855,14 +862,16 @@ void IDX_modify(thread_db* tdbb,
idx_e error_code; idx_e error_code;
if ((error_code = BTR_key(tdbb, new_rpb->rpb_relation, if ((error_code = BTR_key(tdbb, new_rpb->rpb_relation,
new_rpb->rpb_record, &idx, &key1, false))) new_rpb->rpb_record, &idx, &key1,
((idx.idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT))))
{ {
CCH_RELEASE(tdbb, &window); CCH_RELEASE(tdbb, &window);
context.raise(tdbb, error_code, new_rpb->rpb_record); context.raise(tdbb, error_code, new_rpb->rpb_record);
} }
if ((error_code = BTR_key(tdbb, org_rpb->rpb_relation, if ((error_code = BTR_key(tdbb, org_rpb->rpb_relation,
org_rpb->rpb_record, &idx, &key2, false))) org_rpb->rpb_record, &idx, &key2,
((idx.idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT))))
{ {
CCH_RELEASE(tdbb, &window); CCH_RELEASE(tdbb, &window);
context.raise(tdbb, error_code, org_rpb->rpb_record); context.raise(tdbb, error_code, org_rpb->rpb_record);
@ -929,14 +938,16 @@ void IDX_modify_check_constraints(thread_db* tdbb,
idx_e error_code; idx_e error_code;
if ((error_code = BTR_key(tdbb, new_rpb->rpb_relation, if ((error_code = BTR_key(tdbb, new_rpb->rpb_relation,
new_rpb->rpb_record, &idx, &key1, false))) new_rpb->rpb_record, &idx, &key1,
((idx.idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT))))
{ {
CCH_RELEASE(tdbb, &window); CCH_RELEASE(tdbb, &window);
context.raise(tdbb, error_code, new_rpb->rpb_record); context.raise(tdbb, error_code, new_rpb->rpb_record);
} }
if ((error_code = BTR_key(tdbb, org_rpb->rpb_relation, if ((error_code = BTR_key(tdbb, org_rpb->rpb_relation,
org_rpb->rpb_record, &idx, &key2, false))) org_rpb->rpb_record, &idx, &key2,
((idx.idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT))))
{ {
CCH_RELEASE(tdbb, &window); CCH_RELEASE(tdbb, &window);
context.raise(tdbb, error_code, org_rpb->rpb_record); context.raise(tdbb, error_code, org_rpb->rpb_record);
@ -1012,7 +1023,8 @@ void IDX_store(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
IndexErrorContext context(rpb->rpb_relation, &idx); IndexErrorContext context(rpb->rpb_relation, &idx);
idx_e error_code; idx_e error_code;
if ( (error_code = BTR_key(tdbb, rpb->rpb_relation, rpb->rpb_record, &idx, &key, false)) ) if ((error_code = BTR_key(tdbb, rpb->rpb_relation, rpb->rpb_record, &idx, &key,
((idx.idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT))))
{ {
CCH_RELEASE(tdbb, &window); CCH_RELEASE(tdbb, &window);
context.raise(tdbb, error_code, rpb->rpb_record); context.raise(tdbb, error_code, rpb->rpb_record);
@ -1148,7 +1160,7 @@ static idx_e check_duplicates(thread_db* tdbb,
record, relation_2, record_idx)) record, relation_2, record_idx))
{ {
// When check foreign keys in snapshot transaction, ensure that // When check foreign keys in snapshot transaction, ensure that
// master record is visible in transaction context and still // master record is visible in transaction context and still
// satisfy foreign key constraint. // satisfy foreign key constraint.
if (is_fk && !(transaction->tra_flags & TRA_read_committed)) if (is_fk && !(transaction->tra_flags & TRA_read_committed))
@ -1342,7 +1354,12 @@ static idx_e check_partner_index(thread_db* tdbb,
// tmpIndex.idx_flags |= idx_unique; // tmpIndex.idx_flags |= idx_unique;
tmpIndex.idx_flags = (tmpIndex.idx_flags & ~idx_unique) | (partner_idx.idx_flags & idx_unique); tmpIndex.idx_flags = (tmpIndex.idx_flags & ~idx_unique) | (partner_idx.idx_flags & idx_unique);
temporary_key key; temporary_key key;
result = BTR_key(tdbb, relation, record, &tmpIndex, &key, starting, segment);
const USHORT keyType = starting ?
INTL_KEY_PARTIAL :
(tmpIndex.idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT;
result = BTR_key(tdbb, relation, record, &tmpIndex, &key, keyType, segment);
CCH_RELEASE(tdbb, &window); CCH_RELEASE(tdbb, &window);
// now check for current duplicates // now check for current duplicates
@ -1531,7 +1548,9 @@ static idx_e insert_key(thread_db* tdbb,
// don't bother to check the primary key. // don't bother to check the primary key.
CCH_FETCH(tdbb, window_ptr, LCK_read, pag_root); CCH_FETCH(tdbb, window_ptr, LCK_read, pag_root);
temporary_key key; temporary_key key;
result = BTR_key(tdbb, relation, record, idx, &key, false); result = BTR_key(tdbb, relation, record, idx, &key,
((idx->idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT));
CCH_RELEASE(tdbb, window_ptr); CCH_RELEASE(tdbb, window_ptr);
if (result == idx_e_ok && key.key_nulls == 0) if (result == idx_e_ok && key.key_nulls == 0)
{ {

View File

@ -1298,6 +1298,7 @@ USHORT INTL_string_to_key(thread_db* tdbb,
case ttype_binary: case ttype_binary:
case ttype_ascii: case ttype_ascii:
case ttype_none: case ttype_none:
fb_assert(key_type != INTL_KEY_MULTI_STARTING);
while (len-- && destLen-- > 0) while (len-- && destLen-- > 0)
*dest++ = *src++; *dest++ = *src++;
// strip off ending pad characters // strip off ending pad characters
@ -1312,6 +1313,7 @@ USHORT INTL_string_to_key(thread_db* tdbb,
break; break;
default: default:
TextType* obj = INTL_texttype_lookup(tdbb, ttype); TextType* obj = INTL_texttype_lookup(tdbb, ttype);
fb_assert(key_type != INTL_KEY_MULTI_STARTING || (obj->getFlags() & TEXTTYPE_MULTI_STARTING_KEY));
outlen = obj->string_to_key(len, src, pByte->dsc_length, dest, key_type); outlen = obj->string_to_key(len, src, pByte->dsc_length, dest, key_type);
break; break;
} }

View File

@ -49,7 +49,7 @@ IndexTableScan::IndexTableScan(CompilerScratch* csb, const string& alias,
{ {
fb_assert(m_index); fb_assert(m_index);
// Reserve one excess byte for the upper key - in case when length of // Reserve one excess byte for the upper key - in case when length of
// upper key at retrieval is greater than declared index key length. // upper key at retrieval is greater than declared index key length.
// See also comments at openStream(). // See also comments at openStream().
FB_SIZE_T size = sizeof(Impure) + 2u * m_length + 1u; FB_SIZE_T size = sizeof(Impure) + 2u * m_length + 1u;
@ -71,6 +71,12 @@ void IndexTableScan::open(thread_db* tdbb) const
RLCK_reserve_relation(tdbb, request->req_transaction, m_relation, false); RLCK_reserve_relation(tdbb, request->req_transaction, m_relation, false);
rpb->rpb_number.setValue(BOF_NUMBER); rpb->rpb_number.setValue(BOF_NUMBER);
fb_assert(!impure->irsb_nav_lower);
impure->irsb_nav_current_lower = impure->irsb_nav_lower = FB_NEW_POOL(*tdbb->getDefaultPool()) temporary_key;
fb_assert(!impure->irsb_nav_upper);
impure->irsb_nav_current_upper = impure->irsb_nav_upper = FB_NEW_POOL(*tdbb->getDefaultPool()) temporary_key;
} }
void IndexTableScan::close(thread_db* tdbb) const void IndexTableScan::close(thread_db* tdbb) const
@ -106,6 +112,18 @@ void IndexTableScan::close(thread_db* tdbb) const
impure->irsb_nav_page = 0; impure->irsb_nav_page = 0;
} }
if (impure->irsb_nav_lower)
{
delete impure->irsb_nav_lower;
impure->irsb_nav_current_lower = impure->irsb_nav_lower = NULL;
}
if (impure->irsb_nav_upper)
{
delete impure->irsb_nav_upper;
impure->irsb_nav_current_upper = impure->irsb_nav_upper = NULL;
}
} }
} }
@ -130,120 +148,137 @@ bool IndexTableScan::getRecord(thread_db* tdbb) const
setPage(tdbb, impure, NULL); setPage(tdbb, impure, NULL);
} }
index_desc* const idx = (index_desc*) ((SCHAR*) impure + m_offset); // If this is the first time, start at the beginning
if (!impure->irsb_nav_page)
// find the last fetched position from the index
const USHORT pageSpaceID = m_relation->getPages(tdbb)->rel_pg_space_id;
win window(pageSpaceID, impure->irsb_nav_page);
UCHAR* nextPointer = getPosition(tdbb, impure, &window);
if (!nextPointer)
{ {
rpb->rpb_number.setValid(false); // initialize for a retrieval
return false; if (!setupBitmaps(tdbb, impure))
}
temporary_key key;
memcpy(key.key_data, impure->irsb_nav_data, impure->irsb_nav_length);
const IndexRetrieval* const retrieval = m_index->retrieval;
const USHORT flags = retrieval->irb_generic & (irb_descending | irb_partial | irb_starting);
// set the upper (or lower) limit for navigational retrieval
temporary_key upper;
if (retrieval->irb_upper_count)
{
upper.key_length = impure->irsb_nav_upper_length;
memcpy(upper.key_data, impure->irsb_nav_data + m_length, upper.key_length);
}
// Find the next interesting node. If necessary, skip to the next page.
RecordNumber number;
IndexNode node;
while (true)
{
Ods::btree_page* page = (Ods::btree_page*) window.win_buffer;
UCHAR* pointer = nextPointer;
if (pointer)
{
node.readNode(pointer, true);
number = node.recordNumber;
}
if (node.isEndLevel)
break;
if (node.isEndBucket)
{
page = (Ods::btree_page*) CCH_HANDOFF(tdbb, &window, page->btr_sibling, LCK_read, pag_index);
nextPointer = page->btr_nodes + page->btr_jump_size;
continue;
}
// Build the current key value from the prefix and current node data.
memcpy(key.key_data + node.prefix, node.data, node.length);
key.key_length = node.length + node.prefix;
// Make sure we haven't hit the upper (or lower) limit.
if (retrieval->irb_upper_count &&
compareKeys(idx, key.key_data, key.key_length, &upper, flags) > 0)
{
break;
}
// skip this record if:
// 1) there is an inversion tree for this index and this record
// is not in the bitmap for the inversion, or
// 2) the record has already been visited
if ((!(impure->irsb_flags & irsb_mustread) &&
(!impure->irsb_nav_bitmap ||
!RecordBitmap::test(*impure->irsb_nav_bitmap, number.getValue()))) ||
RecordBitmap::test(impure->irsb_nav_records_visited, number.getValue()))
{
nextPointer = node.readNode(pointer, true);
continue;
}
// reset the current navigational position in the index
rpb->rpb_number = number;
setPosition(tdbb, impure, rpb, &window, pointer, key);
CCH_RELEASE(tdbb, &window);
if (VIO_get(tdbb, rpb, request->req_transaction, request->req_pool))
{
temporary_key value;
const idx_e result = BTR_key(tdbb, m_relation, rpb->rpb_record, idx, &value, false);
if (result != idx_e_ok)
{
IndexErrorContext context(m_relation, idx);
context.raise(tdbb, result, rpb->rpb_record);
}
if (!compareKeys(idx, key.key_data, key.key_length, &value, 0))
{
// mark in the navigational bitmap that we have visited this record
RBM_SET(tdbb->getDefaultPool(), &impure->irsb_nav_records_visited,
rpb->rpb_number.getValue());
rpb->rpb_number.setValid(true);
return true;
}
}
nextPointer = getPosition(tdbb, impure, &window);
if (!nextPointer)
{ {
rpb->rpb_number.setValid(false); rpb->rpb_number.setValid(false);
return false; return false;
} }
} }
CCH_RELEASE(tdbb, &window); index_desc* const idx = (index_desc*) ((SCHAR*) impure + m_offset);
// find the last fetched position from the index
const USHORT pageSpaceID = m_relation->getPages(tdbb)->rel_pg_space_id;
win window(pageSpaceID, impure->irsb_nav_page);
const IndexRetrieval* const retrieval = m_index->retrieval;
const USHORT flags = retrieval->irb_generic & (irb_descending | irb_partial | irb_starting);
do
{
UCHAR* nextPointer = getPosition(tdbb, impure, &window);
if (!nextPointer)
{
rpb->rpb_number.setValid(false);
return false;
}
temporary_key key;
memcpy(key.key_data, impure->irsb_nav_data, impure->irsb_nav_length);
// set the upper (or lower) limit for navigational retrieval
temporary_key upper;
if (retrieval->irb_upper_count)
{
upper.key_length = impure->irsb_nav_upper_length;
memcpy(upper.key_data, impure->irsb_nav_data + m_length, upper.key_length);
}
// Find the next interesting node. If necessary, skip to the next page.
RecordNumber number;
IndexNode node;
while (true)
{
Ods::btree_page* page = (Ods::btree_page*) window.win_buffer;
UCHAR* pointer = nextPointer;
if (pointer)
{
node.readNode(pointer, true);
number = node.recordNumber;
}
if (node.isEndLevel)
break;
if (node.isEndBucket)
{
page = (Ods::btree_page*) CCH_HANDOFF(tdbb, &window, page->btr_sibling, LCK_read, pag_index);
nextPointer = page->btr_nodes + page->btr_jump_size;
continue;
}
// Build the current key value from the prefix and current node data.
memcpy(key.key_data + node.prefix, node.data, node.length);
key.key_length = node.length + node.prefix;
// Make sure we haven't hit the upper (or lower) limit.
if (retrieval->irb_upper_count &&
compareKeys(idx, key.key_data, key.key_length, &upper, flags) > 0)
{
break;
}
// skip this record if:
// 1) there is an inversion tree for this index and this record
// is not in the bitmap for the inversion, or
// 2) the record has already been visited
if ((!(impure->irsb_flags & irsb_mustread) &&
(!impure->irsb_nav_bitmap ||
!RecordBitmap::test(*impure->irsb_nav_bitmap, number.getValue()))) ||
RecordBitmap::test(impure->irsb_nav_records_visited, number.getValue()))
{
nextPointer = node.readNode(pointer, true);
continue;
}
// reset the current navigational position in the index
rpb->rpb_number = number;
setPosition(tdbb, impure, rpb, &window, pointer, key);
CCH_RELEASE(tdbb, &window);
if (VIO_get(tdbb, rpb, request->req_transaction, request->req_pool))
{
temporary_key value;
const idx_e result = BTR_key(tdbb, m_relation, rpb->rpb_record, idx, &value,
((idx->idx_flags & idx_unique) ? INTL_KEY_UNIQUE : INTL_KEY_SORT));
if (result != idx_e_ok)
{
IndexErrorContext context(m_relation, idx);
context.raise(tdbb, result, rpb->rpb_record);
}
if (!compareKeys(idx, key.key_data, key.key_length, &value, 0))
{
// mark in the navigational bitmap that we have visited this record
RBM_SET(tdbb->getDefaultPool(), &impure->irsb_nav_records_visited,
rpb->rpb_number.getValue());
rpb->rpb_number.setValid(true);
return true;
}
}
nextPointer = getPosition(tdbb, impure, &window);
if (!nextPointer)
{
rpb->rpb_number.setValid(false);
return false;
}
}
CCH_RELEASE(tdbb, &window);
advanceStream(tdbb, impure, &window);
} while (true);
// bof or eof must have been set at this point // bof or eof must have been set at this point
rpb->rpb_number.setValid(false); rpb->rpb_number.setValid(false);
@ -448,14 +483,34 @@ bool IndexTableScan::findSavedNode(thread_db* tdbb, Impure* impure, win* window,
} }
} }
void IndexTableScan::advanceStream(thread_db* tdbb, Impure* impure, win* window) const
{
impure->irsb_nav_current_lower = impure->irsb_nav_current_lower->key_next.get();
impure->irsb_nav_current_upper = impure->irsb_nav_current_upper->key_next.get();
setPage(tdbb, impure, NULL);
window->win_page = 0;
}
UCHAR* IndexTableScan::getPosition(thread_db* tdbb, Impure* impure, win* window) const UCHAR* IndexTableScan::getPosition(thread_db* tdbb, Impure* impure, win* window) const
{ {
// If this is the first time, start at the beginning while (impure->irsb_nav_current_lower)
if (!window->win_page.getPageNum())
{ {
return openStream(tdbb, impure, window); UCHAR* position = getStreamPosition(tdbb, impure, window);
if (position)
return position;
advanceStream(tdbb, impure, window);
} }
return NULL;
}
UCHAR* IndexTableScan::getStreamPosition(thread_db* tdbb, Impure* impure, win* window) const
{
if (!window->win_page.getPageNum())
return openStream(tdbb, impure, window);
// Re-fetch page and get incarnation counter // Re-fetch page and get incarnation counter
Ods::btree_page* page = (Ods::btree_page*) CCH_FETCH(tdbb, window, LCK_read, pag_index); Ods::btree_page* page = (Ods::btree_page*) CCH_FETCH(tdbb, window, LCK_read, pag_index);
@ -492,9 +547,9 @@ UCHAR* IndexTableScan::getPosition(thread_db* tdbb, Impure* impure, win* window)
UCHAR* IndexTableScan::openStream(thread_db* tdbb, Impure* impure, win* window) const UCHAR* IndexTableScan::openStream(thread_db* tdbb, Impure* impure, win* window) const
{ {
// initialize for a retrieval temporary_key* lower = impure->irsb_nav_current_lower;
if (!setupBitmaps(tdbb, impure)) temporary_key* upper = impure->irsb_nav_current_upper;
return NULL; const bool firstKeys = lower == impure->irsb_nav_lower;
setPage(tdbb, impure, NULL); setPage(tdbb, impure, NULL);
impure->irsb_nav_length = 0; impure->irsb_nav_length = 0;
@ -502,23 +557,22 @@ UCHAR* IndexTableScan::openStream(thread_db* tdbb, Impure* impure, win* window)
// Find the starting leaf page // Find the starting leaf page
const IndexRetrieval* const retrieval = m_index->retrieval; const IndexRetrieval* const retrieval = m_index->retrieval;
index_desc* const idx = (index_desc*) ((SCHAR*) impure + m_offset); index_desc* const idx = (index_desc*) ((SCHAR*) impure + m_offset);
temporary_key lower, upper; Ods::btree_page* page = BTR_find_page(tdbb, retrieval, window, idx, lower, upper, firstKeys);
Ods::btree_page* page = BTR_find_page(tdbb, retrieval, window, idx, &lower, &upper);
setPage(tdbb, impure, window); setPage(tdbb, impure, window);
// find the upper limit for the search // find the upper limit for the search
temporary_key* limit_ptr = NULL; temporary_key* limit_ptr = NULL;
if (retrieval->irb_upper_count) if (retrieval->irb_upper_count)
{ {
// If upper key length is greater than declared key length, we need // If upper key length is greater than declared key length, we need
// one "excess" byte for correct comparison. Without it there could // one "excess" byte for correct comparison. Without it there could
// be false equality hits. // be false equality hits.
impure->irsb_nav_upper_length = MIN(m_length + 1, upper.key_length); impure->irsb_nav_upper_length = MIN(m_length + 1, upper->key_length);
memcpy(impure->irsb_nav_data + m_length, upper.key_data, impure->irsb_nav_upper_length); memcpy(impure->irsb_nav_data + m_length, upper->key_data, impure->irsb_nav_upper_length);
} }
if (retrieval->irb_lower_count) if (retrieval->irb_lower_count)
limit_ptr = &lower; limit_ptr = lower;
// If there is a starting descriptor, search down index to starting position. // If there is a starting descriptor, search down index to starting position.
// This may involve sibling buckets if splits are in progress. If there // This may involve sibling buckets if splits are in progress. If there

View File

@ -194,6 +194,10 @@ namespace Jrd
RecordBitmap** irsb_nav_bitmap; // bitmap for inversion tree RecordBitmap** irsb_nav_bitmap; // bitmap for inversion tree
RecordBitmap* irsb_nav_records_visited; // bitmap of records already retrieved RecordBitmap* irsb_nav_records_visited; // bitmap of records already retrieved
BtrPageGCLock* irsb_nav_btr_gc_lock; // lock to prevent removal of currently walked index page BtrPageGCLock* irsb_nav_btr_gc_lock; // lock to prevent removal of currently walked index page
temporary_key* irsb_nav_lower; // lower (possible multiple) key
temporary_key* irsb_nav_upper; // upper (possible multiple) key
temporary_key* irsb_nav_current_lower; // current lower key
temporary_key* irsb_nav_current_upper; // current upper key
USHORT irsb_nav_offset; // page offset of current index node USHORT irsb_nav_offset; // page offset of current index node
USHORT irsb_nav_upper_length; // length of upper key value USHORT irsb_nav_upper_length; // length of upper key value
USHORT irsb_nav_length; // length of expanded key USHORT irsb_nav_length; // length of expanded key
@ -223,7 +227,9 @@ namespace Jrd
private: private:
int compareKeys(const index_desc*, const UCHAR*, USHORT, const temporary_key*, USHORT) const; int compareKeys(const index_desc*, const UCHAR*, USHORT, const temporary_key*, USHORT) const;
bool findSavedNode(thread_db* tdbb, Impure* impure, win* window, UCHAR**) const; bool findSavedNode(thread_db* tdbb, Impure* impure, win* window, UCHAR**) const;
void advanceStream(thread_db* tdbb, Impure* impure, win* window) const;
UCHAR* getPosition(thread_db* tdbb, Impure* impure, win* window) const; UCHAR* getPosition(thread_db* tdbb, Impure* impure, win* window) const;
UCHAR* getStreamPosition(thread_db* tdbb, Impure* impure, win* window) const;
UCHAR* openStream(thread_db* tdbb, Impure* impure, win* window) const; UCHAR* openStream(thread_db* tdbb, Impure* impure, win* window) const;
void setPage(thread_db* tdbb, Impure* impure, win* window) const; void setPage(thread_db* tdbb, Impure* impure, win* window) const;
void setPosition(thread_db* tdbb, Impure* impure, record_param*, void setPosition(thread_db* tdbb, Impure* impure, record_param*,