8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 21:23:04 +01:00

Fixed CORE-5598: block size exceeds implementation restriction error while inner joining large datasets with a long key using the HASH JOIN plan

This commit is contained in:
Dmitry Yemanov 2018-01-03 21:07:18 +03:00
parent 682bf54cd0
commit 4638dc1023
2 changed files with 97 additions and 248 deletions

View File

@ -23,19 +23,15 @@
#include "firebird.h"
#include "../common/classes/Hash.h"
#include "../jrd/jrd.h"
#include "../jrd/btr.h"
#include "../jrd/req.h"
#include "../jrd/intl.h"
#include "../jrd/cmp_proto.h"
#include "../jrd/evl_proto.h"
#include "../jrd/mov_proto.h"
#include "../jrd/intl_proto.h"
#include "../jrd/Collation.h"
#include "RecordSource.h"
#include <stdlib.h>
using namespace Firebird;
using namespace Jrd;
@ -43,196 +39,70 @@ using namespace Jrd;
// Data access: hash join
// ----------------------
namespace
{
typedef int (*qsort_compare_callback)(const void* a1, const void* a2, void* arg);
struct qsort_ctx_data
{
void* arg;
qsort_compare_callback compare;
};
#if defined(DARWIN) // || defined(FREEBSD)
#undef HAVE_QSORT_R
#endif
#ifndef HAVE_QSORT_R
#if defined(WIN_NT) || defined(DARWIN) || defined(FREEBSD)
int qsort_ctx_arg_swap(void* arg, const void* a1, const void* a2)
{
struct qsort_ctx_data* ss = (struct qsort_ctx_data*) arg;
return (ss->compare)(a1, a2, ss->arg);
}
#endif
#endif
#define USE_QSORT_CTX
void qsort_ctx(void* base, size_t count, size_t width, qsort_compare_callback compare, void* arg)
{
#ifdef HAVE_QSORT_R
qsort_r(base, count, width, compare, arg);
#else
#if defined(WIN_NT)
struct qsort_ctx_data tmp = {arg, compare};
qsort_s(base, count, width, &qsort_ctx_arg_swap, &tmp);
#elif defined(DARWIN) || defined(FREEBSD)
struct qsort_ctx_data tmp = {arg, compare};
qsort_r(base, count, width, &tmp, &qsort_ctx_arg_swap);
#else
#undef USE_QSORT_CTX
#endif
#endif
}
} // namespace
// NS: FIXME - Why use static hash table here??? Hash table shall support dynamic resizing
static const size_t HASH_SIZE = 1009;
static const size_t COLLISION_PREALLOCATE_SIZE = 32; // 256 KB
static const size_t KEYBUF_PREALLOCATE_SIZE = 64 * 1024; // 64 KB
static const size_t KEYBUF_SIZE_LIMIT = 1024 * 1024 * 1024; // 1 GB
static const ULONG HASH_SIZE = 1009;
static const ULONG BUCKET_PREALLOCATE_SIZE = 32; // 256 bytes per slot
class HashJoin::HashTable : public PermanentStorage
{
struct Collision
{
#ifdef USE_QSORT_CTX
Collision()
: offset(0), position(0)
{}
Collision(void* /*ctx*/, ULONG off, ULONG pos)
: offset(off), position(pos)
{}
#else
Collision()
: context(NULL), offset(0), position(0)
{}
Collision(void* ctx, ULONG off, ULONG pos)
: context(ctx), offset(off), position(pos)
{}
void* context;
#endif
ULONG offset;
ULONG position;
};
class CollisionList
{
static const FB_SIZE_T INVALID_ITERATOR = FB_SIZE_T(~0);
public:
CollisionList(MemoryPool& pool, const KeyBuffer* keyBuffer, ULONG itemLength)
: m_collisions(pool, COLLISION_PREALLOCATE_SIZE),
m_keyBuffer(keyBuffer), m_itemLength(itemLength), m_iterator(INVALID_ITERATOR)
{}
#ifdef USE_QSORT_CTX
static int compare(const void* p1, const void* p2, void* arg)
#else
static int compare(const void* p1, const void* p2)
#endif
struct Entry
{
const Collision* const c1 = static_cast<const Collision*>(p1);
const Collision* const c2 = static_cast<const Collision*>(p2);
Entry()
: hash(0), position(0)
{}
#ifndef USE_QSORT_CTX
fb_assert(c1->context == c2->context);
#endif
Entry(ULONG h, ULONG pos)
: hash(h), position(pos)
{}
const CollisionList* const collisions =
#ifdef USE_QSORT_CTX
static_cast<const CollisionList*>(arg);
#else
static_cast<const CollisionList*>(c1->context);
#endif
const UCHAR* const baseAddress = collisions->m_keyBuffer->begin();
static const ULONG generate(const Entry& item)
{
return item.hash;
}
const UCHAR* const ptr1 = baseAddress + c1->offset;
const UCHAR* const ptr2 = baseAddress + c2->offset;
ULONG hash;
ULONG position;
};
return memcmp(ptr1, ptr2, collisions->m_itemLength);
public:
CollisionList(MemoryPool& pool)
: m_collisions(pool, BUCKET_PREALLOCATE_SIZE),
m_iterator(INVALID_ITERATOR)
{
m_collisions.setSortMode(FB_ARRAY_SORT_MANUAL);
}
void sort()
{
Collision* const base = m_collisions.begin();
const size_t count = m_collisions.getCount();
const size_t width = sizeof(Collision);
#ifdef USE_QSORT_CTX
qsort_ctx(base, count, width, compare, this);
#else
qsort(base, count, width, compare);
#endif
m_collisions.sort();
}
void add(const Collision& collision)
void add(ULONG hash, ULONG position)
{
m_collisions.add(collision);
m_collisions.add(Entry(hash, position));
}
bool locate(ULONG length, const UCHAR* data)
bool locate(ULONG hash)
{
const UCHAR* const ptr1 = data;
const ULONG len1 = length;
const ULONG len2 = m_itemLength;
const ULONG minLen = MIN(len1, len2);
if (m_collisions.find(hash, m_iterator))
return true;
FB_SIZE_T highBound = m_collisions.getCount(), lowBound = 0;
const UCHAR* const baseAddress = m_keyBuffer->begin();
while (highBound > lowBound)
{
const FB_SIZE_T temp = (highBound + lowBound) >> 1;
const UCHAR* const ptr2 = baseAddress + m_collisions[temp].offset;
const int result = memcmp(ptr1, ptr2, minLen);
if (result > 0 || (!result && len1 > len2))
lowBound = temp + 1;
else
highBound = temp;
}
if (highBound >= m_collisions.getCount() ||
lowBound >= m_collisions.getCount())
{
m_iterator = INVALID_ITERATOR;
return false;
}
const UCHAR* const ptr2 = baseAddress + m_collisions[lowBound].offset;
if (memcmp(ptr1, ptr2, minLen))
{
m_iterator = INVALID_ITERATOR;
return false;
}
m_iterator = lowBound;
return true;
m_iterator = INVALID_ITERATOR;
return false;
}
bool iterate(ULONG length, const UCHAR* data, ULONG& position)
bool iterate(ULONG hash, ULONG& position)
{
if (m_iterator >= m_collisions.getCount())
return false;
const Collision& collision = m_collisions[m_iterator++];
const UCHAR* const baseAddress = m_keyBuffer->begin();
const Entry& collision = m_collisions[m_iterator++];
const UCHAR* const ptr1 = data;
const ULONG len1 = length;
const UCHAR* const ptr2 = baseAddress + collision.offset;
const ULONG len2 = m_itemLength;
const ULONG minLen = MIN(len1, len2);
if (memcmp(ptr1, ptr2, minLen))
if (hash != collision.hash)
{
m_iterator = INVALID_ITERATOR;
return false;
@ -243,14 +113,12 @@ class HashJoin::HashTable : public PermanentStorage
}
private:
Array<Collision> m_collisions;
const KeyBuffer* const m_keyBuffer;
const ULONG m_itemLength;
SortedArray<Entry, EmptyStorage<Entry>, ULONG, Entry> m_collisions;
FB_SIZE_T m_iterator;
};
public:
HashTable(MemoryPool& pool, size_t streamCount, unsigned int tableSize = HASH_SIZE)
HashTable(MemoryPool& pool, ULONG streamCount, ULONG tableSize = HASH_SIZE)
: PermanentStorage(pool), m_streamCount(streamCount),
m_tableSize(tableSize), m_slot(0)
{
@ -260,18 +128,15 @@ public:
~HashTable()
{
for (size_t i = 0; i < m_streamCount * m_tableSize; i++)
for (ULONG i = 0; i < m_streamCount * m_tableSize; i++)
delete m_collisions[i];
delete[] m_collisions;
}
void put(size_t stream,
ULONG keyLength, const KeyBuffer* keyBuffer,
ULONG offset, ULONG position)
void put(ULONG stream, ULONG hash, ULONG position)
{
const unsigned int slot =
InternalHash::hash(keyLength, keyBuffer->begin() + offset, m_tableSize);
const ULONG slot = hash % m_tableSize;
fb_assert(stream < m_streamCount);
fb_assert(slot < m_tableSize);
@ -280,25 +145,25 @@ public:
if (!collisions)
{
collisions = FB_NEW_POOL(getPool()) CollisionList(getPool(), keyBuffer, keyLength);
collisions = FB_NEW_POOL(getPool()) CollisionList(getPool());
m_collisions[stream * m_tableSize + slot] = collisions;
}
collisions->add(Collision(collisions, offset, position));
collisions->add(hash, position);
}
bool setup(ULONG length, const UCHAR* data)
bool setup(ULONG hash)
{
const unsigned int slot = InternalHash::hash(length, data, m_tableSize);
const ULONG slot = hash % m_tableSize;
for (size_t i = 0; i < m_streamCount; i++)
for (ULONG i = 0; i < m_streamCount; i++)
{
CollisionList* const collisions = m_collisions[i * m_tableSize + slot];
if (!collisions)
return false;
if (!collisions->locate(length, data))
if (!collisions->locate(hash))
return false;
}
@ -306,25 +171,25 @@ public:
return true;
}
void reset(size_t stream, ULONG length, const UCHAR* data)
void reset(ULONG stream, ULONG hash)
{
fb_assert(stream < m_streamCount);
CollisionList* const collisions = m_collisions[stream * m_tableSize + m_slot];
collisions->locate(length, data);
collisions->locate(hash);
}
bool iterate(size_t stream, ULONG length, const UCHAR* data, ULONG& position)
bool iterate(ULONG stream, ULONG hash, ULONG& position)
{
fb_assert(stream < m_streamCount);
CollisionList* const collisions = m_collisions[stream * m_tableSize + m_slot];
return collisions->iterate(length, data, position);
return collisions->iterate(hash, position);
}
void sort()
{
for (size_t i = 0; i < m_streamCount * m_tableSize; i++)
for (ULONG i = 0; i < m_streamCount * m_tableSize; i++)
{
CollisionList* const collisions = m_collisions[i];
@ -334,10 +199,10 @@ public:
}
private:
const size_t m_streamCount;
const unsigned int m_tableSize;
const ULONG m_streamCount;
const ULONG m_tableSize;
CollisionList** m_collisions;
size_t m_slot;
ULONG m_slot;
};
@ -351,11 +216,11 @@ HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count,
m_leader.source = args[0];
m_leader.keys = keys[0];
m_leader.keyLengths = FB_NEW_POOL(csb->csb_pool)
KeyLengthArray(csb->csb_pool, m_leader.keys->getCount());
const FB_SIZE_T leaderKeyCount = m_leader.keys->getCount();
m_leader.keyLengths = FB_NEW_POOL(csb->csb_pool) ULONG[leaderKeyCount];
m_leader.totalKeyLength = 0;
for (FB_SIZE_T j = 0; j < m_leader.keys->getCount(); j++)
for (FB_SIZE_T j = 0; j < leaderKeyCount; j++)
{
dsc desc;
(*m_leader.keys)[j]->getDesc(tdbb, csb, &desc);
@ -365,11 +230,11 @@ HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count,
if (IS_INTL_DATA(&desc))
keyLength = INTL_key_length(tdbb, INTL_INDEX_TYPE(&desc), keyLength);
m_leader.keyLengths->add(keyLength);
m_leader.keyLengths[j] = keyLength;
m_leader.totalKeyLength += keyLength;
}
for (size_t i = 1; i < count; i++)
for (FB_SIZE_T i = 1; i < count; i++)
{
RecordSource* const sub_rsb = args[i];
fb_assert(sub_rsb);
@ -377,11 +242,11 @@ HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count,
SubStream sub;
sub.buffer = FB_NEW_POOL(csb->csb_pool) BufferedStream(csb, sub_rsb);
sub.keys = keys[i];
sub.keyLengths = FB_NEW_POOL(csb->csb_pool)
KeyLengthArray(csb->csb_pool, sub.keys->getCount());
const FB_SIZE_T subKeyCount = sub.keys->getCount();
sub.keyLengths = FB_NEW_POOL(csb->csb_pool) ULONG[subKeyCount];
sub.totalKeyLength = 0;
for (FB_SIZE_T j = 0; j < sub.keys->getCount(); j++)
for (FB_SIZE_T j = 0; j < subKeyCount; j++)
{
dsc desc;
(*sub.keys)[j]->getDesc(tdbb, csb, &desc);
@ -391,7 +256,7 @@ HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count,
if (IS_INTL_DATA(&desc))
keyLength = INTL_key_length(tdbb, INTL_INDEX_TYPE(&desc), keyLength);
sub.keyLengths->add(keyLength);
sub.keyLengths[j] = keyLength;
sub.totalKeyLength += keyLength;
}
@ -406,19 +271,17 @@ void HashJoin::open(thread_db* tdbb) const
impure->irsb_flags = irsb_open | irsb_mustread;
delete impure->irsb_arg_buffer;
delete impure->irsb_hash_table;
delete[] impure->irsb_leader_buffer;
delete[] impure->irsb_record_counts;
MemoryPool& pool = *tdbb->getDefaultPool();
const size_t argCount = m_args.getCount();
const FB_SIZE_T argCount = m_args.getCount();
impure->irsb_arg_buffer = FB_NEW_POOL(pool) KeyBuffer(pool, KEYBUF_PREALLOCATE_SIZE);
impure->irsb_hash_table = FB_NEW_POOL(pool) HashTable(pool, argCount);
impure->irsb_leader_buffer = FB_NEW_POOL(pool) UCHAR[m_leader.totalKeyLength];
impure->irsb_record_counts = FB_NEW_POOL(pool) ULONG[argCount];
UCharBuffer buffer(pool);
for (FB_SIZE_T i = 0; i < argCount; i++)
{
@ -427,24 +290,14 @@ void HashJoin::open(thread_db* tdbb) const
m_args[i].buffer->open(tdbb);
ULONG& counter = impure->irsb_record_counts[i];
counter = 0;
ULONG counter = 0;
UCHAR* const keyBuffer = buffer.getBuffer(m_args[i].totalKeyLength, false);
while (m_args[i].buffer->getRecord(tdbb))
{
const ULONG offset = (ULONG) impure->irsb_arg_buffer->getCount();
if (offset > KEYBUF_SIZE_LIMIT)
status_exception::raise(Arg::Gds(isc_imp_exc) << Arg::Gds(isc_blktoobig));
impure->irsb_arg_buffer->resize(offset + m_args[i].totalKeyLength);
UCHAR* const keys = impure->irsb_arg_buffer->begin() + offset;
computeKeys(tdbb, request, m_args[i], keys);
impure->irsb_hash_table->put(i, m_args[i].totalKeyLength,
impure->irsb_arg_buffer,
offset, counter++);
const ULONG hash = computeHash(tdbb, request, m_args[i], keyBuffer);
impure->irsb_hash_table->put(i, hash, counter++);
}
}
impure->irsb_hash_table->sort();
@ -466,15 +319,9 @@ void HashJoin::close(thread_db* tdbb) const
delete impure->irsb_hash_table;
impure->irsb_hash_table = NULL;
delete impure->irsb_arg_buffer;
impure->irsb_arg_buffer = NULL;
delete[] impure->irsb_leader_buffer;
impure->irsb_leader_buffer = NULL;
delete[] impure->irsb_record_counts;
impure->irsb_record_counts = NULL;
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++)
m_args[i].buffer->close(tdbb);
@ -493,9 +340,6 @@ bool HashJoin::getRecord(thread_db* tdbb) const
if (!(impure->irsb_flags & irsb_open))
return false;
const ULONG leaderKeyLength = m_leader.totalKeyLength;
UCHAR* leaderKeyBuffer = impure->irsb_leader_buffer;
while (true)
{
if (impure->irsb_flags & irsb_mustread)
@ -507,13 +351,13 @@ bool HashJoin::getRecord(thread_db* tdbb) const
// Compute and hash the comparison keys
memset(leaderKeyBuffer, 0, leaderKeyLength);
computeKeys(tdbb, request, m_leader, leaderKeyBuffer);
impure->irsb_leader_hash =
computeHash(tdbb, request, m_leader, impure->irsb_leader_buffer);
// Ensure the every inner stream having matches for this hash slot.
// Setup the hash table for the iteration through collisions.
if (!impure->irsb_hash_table->setup(leaderKeyLength, leaderKeyBuffer))
if (!impure->irsb_hash_table->setup(impure->irsb_leader_hash))
continue;
impure->irsb_flags &= ~irsb_mustread;
@ -626,20 +470,24 @@ void HashJoin::nullRecords(thread_db* tdbb) const
m_args[i].source->nullRecords(tdbb);
}
void HashJoin::computeKeys(thread_db* tdbb, jrd_req* request,
const SubStream& sub, UCHAR* keyBuffer) const
ULONG HashJoin::computeHash(thread_db* tdbb,
jrd_req* request,
const SubStream& sub,
UCHAR* keyBuffer) const
{
UCHAR* keyPtr = keyBuffer;
for (FB_SIZE_T i = 0; i < sub.keys->getCount(); i++)
{
dsc* const desc = EVL_expr(tdbb, request, (*sub.keys)[i]);
const USHORT keyLength = (*sub.keyLengths)[i];
const USHORT keyLength = sub.keyLengths[i];
if (desc && !(request->req_flags & req_null))
{
if (desc->isText())
{
dsc to;
to.makeText(keyLength, desc->getTextType(), keyBuffer);
to.makeText(keyLength, desc->getTextType(), keyPtr);
if (IS_INTL_DATA(desc))
{
@ -658,25 +506,30 @@ void HashJoin::computeKeys(thread_db* tdbb, jrd_req* request,
// We don't enforce proper alignments inside the key buffer,
// so use plain byte copying instead of MOV_move() to avoid bus errors
fb_assert(keyLength == desc->dsc_length);
memcpy(keyBuffer, desc->dsc_address, keyLength);
memcpy(keyPtr, desc->dsc_address, keyLength);
}
}
else
{
memset(keyPtr, 0, keyLength);
}
keyBuffer += keyLength;
keyPtr += keyLength;
}
fb_assert(keyPtr - keyBuffer == sub.totalKeyLength);
return InternalHash::hash(sub.totalKeyLength, keyBuffer);
}
bool HashJoin::fetchRecord(thread_db* tdbb, Impure* impure, FB_SIZE_T stream) const
bool HashJoin::fetchRecord(thread_db* tdbb, Impure* impure, ULONG stream) const
{
HashTable* const hashTable = impure->irsb_hash_table;
const BufferedStream* const arg = m_args[stream].buffer;
const ULONG leaderKeyLength = m_leader.totalKeyLength;
const UCHAR* leaderKeyBuffer = impure->irsb_leader_buffer;
ULONG position;
if (hashTable->iterate(stream, leaderKeyLength, leaderKeyBuffer, position))
if (hashTable->iterate(stream, impure->irsb_leader_hash, position))
{
arg->locate(tdbb, position);
@ -689,9 +542,9 @@ bool HashJoin::fetchRecord(thread_db* tdbb, Impure* impure, FB_SIZE_T stream) co
if (stream == 0 || !fetchRecord(tdbb, impure, stream - 1))
return false;
hashTable->reset(stream, leaderKeyLength, leaderKeyBuffer);
hashTable->reset(stream, impure->irsb_leader_hash);
if (hashTable->iterate(stream, leaderKeyLength, leaderKeyBuffer, position))
if (hashTable->iterate(stream, impure->irsb_leader_hash, position))
{
arg->locate(tdbb, position);

View File

@ -1012,9 +1012,6 @@ namespace Jrd
{
class HashTable;
typedef Firebird::Array<USHORT> KeyLengthArray;
typedef Firebird::Array<UCHAR> KeyBuffer;
struct SubStream
{
union
@ -1024,16 +1021,15 @@ namespace Jrd
};
NestValueArray* keys;
KeyLengthArray* keyLengths;
ULONG* keyLengths;
ULONG totalKeyLength;
};
struct Impure : public RecordSource::Impure
{
KeyBuffer* irsb_arg_buffer;
HashTable* irsb_hash_table;
UCHAR* irsb_leader_buffer;
ULONG* irsb_record_counts;
ULONG irsb_leader_hash;
};
public:
@ -1057,8 +1053,8 @@ namespace Jrd
void nullRecords(thread_db* tdbb) const override;
private:
void computeKeys(thread_db* tdbb, jrd_req* request,
const SubStream& sub, UCHAR* buffer) const;
ULONG computeHash(thread_db* tdbb, jrd_req* request,
const SubStream& sub, UCHAR* buffer) const;
bool fetchRecord(thread_db* tdbb, Impure* impure, FB_SIZE_T stream) const;
SubStream m_leader;