8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-24 00:03:03 +01:00

1) Extended hash joins to handle more than two streams at once.

2) Allowed the hash table size to be configurable at runtime.
This commit is contained in:
dimitr 2010-01-22 07:11:02 +00:00
parent 4a865b6887
commit 2c3d97fcbe
2 changed files with 283 additions and 110 deletions

View File

@ -37,17 +37,140 @@ using namespace Jrd;
// Data access: hash join
// ----------------------
HashJoin::HashJoin(CompilerScratch* csb, RecordSource* outer, RecordSource* inner,
const jrd_nod* outerKeys, const jrd_nod* innerKeys)
: m_outer(outer), m_inner(FB_NEW(csb->csb_pool) BufferedStream(csb, inner)),
m_outerKeys(outerKeys), m_innerKeys(innerKeys),
class HashJoin::HashTable : public PermanentStorage
{
static const size_t HASH_SIZE = 1009;
typedef Firebird::Array<FB_UINT64> CollisionList;
public:
HashTable(MemoryPool& pool, size_t streamCount, size_t tableSize = HASH_SIZE)
: PermanentStorage(pool), m_streamCount(streamCount), m_tableSize(tableSize), m_slot(0)
{
m_collisions = FB_NEW(pool) CollisionList*[streamCount * tableSize];
memset(m_collisions, 0, streamCount * tableSize * sizeof(CollisionList*));
m_iterators = FB_NEW(pool) size_t[streamCount];
memset(m_iterators, 0, streamCount * sizeof(size_t));
}
~HashTable()
{
for (size_t i = 0; i < m_streamCount * m_tableSize; i++)
{
delete m_collisions[i];
}
delete[] m_collisions;
delete[] m_iterators;
}
size_t hash(const UCHAR* address, size_t length)
{
size_t hash_value = 0;
UCHAR* p = NULL;
const UCHAR* q = address;
for (size_t l = 0; l < length; l++)
{
if (!(l & 3))
{
p = (UCHAR*) &hash_value;
}
*p++ += *q++;
}
return (hash_value % m_tableSize);
}
void put(size_t stream, size_t slot, FB_UINT64 value)
{
fb_assert(stream < m_streamCount);
fb_assert(slot < m_tableSize);
CollisionList* collisions = m_collisions[stream * m_streamCount + slot];
if (!collisions)
{
collisions = FB_NEW(getPool()) CollisionList(getPool());
m_collisions[stream * m_streamCount + slot] = collisions;
}
collisions->add(value);
}
bool setup(size_t slot)
{
fb_assert(slot < m_tableSize);
for (size_t i = 0; i < m_streamCount; i++)
{
if (!m_collisions[i * m_streamCount + slot])
{
return false;
}
reset(i);
}
m_slot = slot;
return true;
}
void reset(size_t stream)
{
fb_assert(stream < m_streamCount);
m_iterators[stream] = 0;
}
bool iterate(size_t stream, FB_UINT64& value)
{
fb_assert(stream < m_streamCount);
CollisionList* const collisions = m_collisions[stream * m_streamCount + m_slot];
size_t& iterator = m_iterators[stream];
if (iterator < collisions->getCount())
{
value = (*collisions)[iterator++];
return true;
}
return false;
}
private:
const size_t m_streamCount;
const size_t m_tableSize;
CollisionList** m_collisions;
size_t* m_iterators;
size_t m_slot;
};
HashJoin::HashJoin(CompilerScratch* csb, size_t count,
RecordSource* const* args, jrd_nod* const* keys)
: m_args(csb->csb_pool, count - 1), m_keys(csb->csb_pool, count - 1),
m_outerJoin(false), m_semiJoin(false), m_antiJoin(false)
{
fb_assert(m_outer && m_inner);
fb_assert(m_outerKeys && m_outerKeys->nod_type == nod_list);
fb_assert(m_innerKeys && m_innerKeys->nod_type == nod_list);
fb_assert(count >= 2);
m_impure = CMP_impure(csb, sizeof(Impure));
fb_assert(args[0]);
m_leader = args[0];
fb_assert(keys[0]);
m_leaderKeys = keys[0];
for (size_t i = 1; i < count; i++)
{
fb_assert(args[i]);
m_args.add(FB_NEW(csb->csb_pool) BufferedStream(csb, args[i]));
fb_assert(keys[i] && keys[i]->nod_type == nod_list);
m_keys.add(keys[i]);
}
}
void HashJoin::open(thread_db* tdbb)
@ -57,59 +180,48 @@ void HashJoin::open(thread_db* tdbb)
impure->irsb_flags = irsb_open | irsb_mustread;
for (size_t i = 0; i < HASH_SIZE; i++)
delete impure->irsb_hash_table;
MemoryPool& pool = *tdbb->getDefaultPool();
impure->irsb_hash_table = FB_NEW(pool) HashTable(pool, m_args.getCount());
for (size_t i = 0; i < m_args.getCount(); i++)
{
delete impure->irsb_hash_table[i];
impure->irsb_hash_table[i] = NULL;
}
// Read and cache the inner streams. While doing that,
// hash the join condition values and populate hash tables.
// Read and cache the inner stream. While doing that,
// hash the join condition values and populate a hash table.
m_args[i]->open(tdbb);
m_inner->open(tdbb);
FB_UINT64 position = 0;
while (m_inner->getRecord(tdbb))
{
const USHORT hash_slot = hashKeys(tdbb, request, false);
CollisionList* collisions = impure->irsb_hash_table[hash_slot];
if (!collisions)
FB_UINT64 position = 0;
while (m_args[i]->getRecord(tdbb))
{
MemoryPool& pool = *tdbb->getDefaultPool();
collisions = FB_NEW(pool) CollisionList(pool);
impure->irsb_hash_table[hash_slot] = collisions;
const size_t hash_slot = hashKeys(tdbb, request, impure->irsb_hash_table, m_keys[i]);
impure->irsb_hash_table->put(i, hash_slot, position++);
}
collisions->add(position++);
}
impure->irsb_hash_slot = 0;
impure->irsb_collision = 0;
m_outer->open(tdbb);
m_leader->open(tdbb);
}
void HashJoin::close(thread_db* tdbb)
{
jrd_req* const request = tdbb->getRequest();
Impure* const impure = (Impure*) ((UCHAR*) request + m_impure);
invalidateRecords(request);
Impure* const impure = (Impure*) ((UCHAR*) request + m_impure);
if (impure->irsb_flags & irsb_open)
{
impure->irsb_flags &= ~irsb_open;
for (size_t i = 0; i < HASH_SIZE; i++)
delete impure->irsb_hash_table;
impure->irsb_hash_table = NULL;
for (size_t i = 0; i < m_args.getCount(); i++)
{
delete impure->irsb_hash_table[i];
impure->irsb_hash_table[i] = NULL;
m_args[i]->close(tdbb);
}
m_outer->close(tdbb);
m_inner->close(tdbb);
m_leader->close(tdbb);
}
}
@ -127,39 +239,51 @@ bool HashJoin::getRecord(thread_db* tdbb)
{
if (impure->irsb_flags & irsb_mustread)
{
if (!m_outer->getRecord(tdbb))
// Fetch the record from the leading stream
if (!m_leader->getRecord(tdbb))
{
return false;
}
const USHORT hash_slot = hashKeys(tdbb, request, true);
// Hash the comparison keys
if (!impure->irsb_hash_table[hash_slot])
const size_t hash_slot = hashKeys(tdbb, request, impure->irsb_hash_table, m_leaderKeys);
// Ensure the every inner stream having matches for this hash slot.
// Setup the hash table for the iteration through collisions.
if (!impure->irsb_hash_table->setup(hash_slot))
{
continue;
}
impure->irsb_hash_slot = hash_slot;
impure->irsb_collision = 0;
impure->irsb_flags &= ~irsb_mustread;
impure->irsb_flags |= irsb_first;
}
CollisionList* const collisions = impure->irsb_hash_table[impure->irsb_hash_slot];
// Fetch collisions from the inner streams
if (impure->irsb_collision >= collisions->getCount())
if (impure->irsb_flags & irsb_first)
{
for (size_t i = 0; i < m_args.getCount(); i++)
{
if (!fetchRecord(tdbb, impure->irsb_hash_table, i))
{
fb_assert(false);
return false;
}
}
impure->irsb_flags &= ~irsb_first;
}
else if (!fetchRecord(tdbb, impure->irsb_hash_table, m_args.getCount() - 1))
{
impure->irsb_flags |= irsb_mustread;
continue;
}
const FB_UINT64 position = (*collisions)[impure->irsb_collision++];
m_inner->locate(tdbb, position);
if (!m_inner->getRecord(tdbb))
{
fb_assert(false);
return false;
}
// Ensure that the comparison keys are really equal
if (compareKeys(tdbb, request))
{
@ -188,63 +312,85 @@ void HashJoin::dump(thread_db* tdbb, UCharBuffer& buffer)
buffer.add(isc_info_rsb_type);
buffer.add(isc_info_rsb_hash);
buffer.add(2);
const size_t count = m_args.getCount() + 1;
// This place must be reviewed if we allow more than 255 joins
fb_assert(count <= USHORT(MAX_UCHAR));
buffer.add((UCHAR) count);
m_outer->dump(tdbb, buffer);
m_inner->dump(tdbb, buffer);
m_leader->dump(tdbb, buffer);
for (size_t i = 0; i < m_args.getCount(); i++)
{
m_args[i]->dump(tdbb, buffer);
}
buffer.add(isc_info_rsb_end);
}
void HashJoin::markRecursive()
{
m_outer->markRecursive();
m_inner->markRecursive();
m_leader->markRecursive();
for (size_t i = 0; i < m_args.getCount(); i++)
{
m_args[i]->markRecursive();
}
}
void HashJoin::findUsedStreams(StreamsArray& streams)
{
m_outer->findUsedStreams(streams);
m_inner->findUsedStreams(streams);
m_leader->findUsedStreams(streams);
for (size_t i = 0; i < m_args.getCount(); i++)
{
m_args[i]->findUsedStreams(streams);
}
}
void HashJoin::invalidateRecords(jrd_req* request)
{
m_outer->invalidateRecords(request);
m_inner->invalidateRecords(request);
m_leader->invalidateRecords(request);
for (size_t i = 0; i < m_args.getCount(); i++)
{
m_args[i]->invalidateRecords(request);
}
}
void HashJoin::nullRecords(thread_db* tdbb)
{
m_outer->nullRecords(tdbb);
m_inner->nullRecords(tdbb);
m_leader->nullRecords(tdbb);
for (size_t i = 0; i < m_args.getCount(); i++)
{
m_args[i]->nullRecords(tdbb);
}
}
void HashJoin::saveRecords(thread_db* tdbb)
{
m_outer->saveRecords(tdbb);
m_inner->saveRecords(tdbb);
m_leader->saveRecords(tdbb);
for (size_t i = 0; i < m_args.getCount(); i++)
{
m_args[i]->saveRecords(tdbb);
}
}
void HashJoin::restoreRecords(thread_db* tdbb)
{
m_outer->restoreRecords(tdbb);
m_inner->restoreRecords(tdbb);
m_leader->restoreRecords(tdbb);
for (size_t i = 0; i < m_args.getCount(); i++)
{
m_args[i]->restoreRecords(tdbb);
}
}
USHORT HashJoin::hashKeys(thread_db* tdbb, jrd_req* request, bool outer)
size_t HashJoin::hashKeys(thread_db* tdbb, jrd_req* request, HashTable* table, jrd_nod* keys)
{
const jrd_nod* const list = outer ? m_outerKeys : m_innerKeys;
size_t hash_slot = 0;
ULONG hash_value = 0;
for (size_t i = 0; i < list->nod_count; i++)
for (size_t i = 0; i < keys->nod_count; i++)
{
const dsc* const desc = EVL_expr(tdbb, list->nod_arg[i]);
const dsc* const desc = EVL_expr(tdbb, keys->nod_arg[i]);
if (desc && !(request->req_flags & req_null))
{
USHORT length = desc->dsc_length;
size_t length = desc->dsc_length;
const UCHAR* address = desc->dsc_address;
if (desc->dsc_dtype == dtype_varying)
@ -258,43 +404,72 @@ USHORT HashJoin::hashKeys(thread_db* tdbb, jrd_req* request, bool outer)
length = strlen((char*) address);
}
UCHAR* p = NULL;
const UCHAR* q = address;
for (USHORT l = 0; l < length; l++)
{
if (!(l & 3))
{
p = (UCHAR*) &hash_value;
}
*p++ += *q++;
}
hash_slot ^= table->hash(address, length);
}
}
return (USHORT) (hash_value % HASH_SIZE);
return hash_slot;
}
bool HashJoin::compareKeys(thread_db* tdbb, jrd_req* request)
{
for (size_t i = 0; i < m_outerKeys->nod_count; i++)
for (size_t i = 0; i < m_leaderKeys->nod_count; i++)
{
const dsc* const desc1 = EVL_expr(tdbb, m_outerKeys->nod_arg[i]);
const dsc* const desc1 = EVL_expr(tdbb, m_leaderKeys->nod_arg[i]);
const bool null1 = (request->req_flags & req_null);
const dsc* const desc2 = EVL_expr(tdbb, m_innerKeys->nod_arg[i]);
const bool null2 = (request->req_flags & req_null);
if (null1 != null2)
for (size_t j = 0; j < m_keys.getCount(); j++)
{
return false;
}
const dsc* const desc2 = EVL_expr(tdbb, m_keys[j]->nod_arg[i]);
const bool null2 = (request->req_flags & req_null);
if (!null1 && !null2 && MOV_compare(desc1, desc2) != 0)
{
return false;
if (null1 != null2)
{
return false;
}
if (!null1 && !null2 && MOV_compare(desc1, desc2) != 0)
{
return false;
}
}
}
return true;
}
bool HashJoin::fetchRecord(thread_db* tdbb, HashTable* table, size_t stream)
{
BufferedStream* const arg = m_args[stream];
FB_UINT64 position;
if (table->iterate(stream, position))
{
arg->locate(tdbb, position);
if (arg->getRecord(tdbb))
{
return true;
}
}
while (true)
{
if (stream == 0 || !fetchRecord(tdbb, table, stream - 1))
{
return false;
}
table->reset(stream);
if (table->iterate(stream, position))
{
arg->locate(tdbb, position);
if (arg->getRecord(tdbb))
{
return true;
}
}
}
}

View File

@ -750,19 +750,16 @@ namespace Jrd
class HashJoin : public RecordSource
{
static const size_t HASH_SIZE = 1009;
typedef Firebird::Array<FB_UINT64> CollisionList;
class HashTable;
struct Impure : public RecordSource::Impure
{
CollisionList* irsb_hash_table[HASH_SIZE];
USHORT irsb_hash_slot;
size_t irsb_collision;
HashTable* irsb_hash_table;
};
public:
HashJoin(CompilerScratch* csb, RecordSource* outer, RecordSource* inner,
const jrd_nod* outerKeys, const jrd_nod* innerKeys);
HashJoin(CompilerScratch* csb, size_t count,
RecordSource* const* args, jrd_nod* const* keys);
void open(thread_db* tdbb);
void close(thread_db* tdbb);
@ -782,13 +779,14 @@ namespace Jrd
void restoreRecords(thread_db* tdbb);
private:
USHORT hashKeys(thread_db* tdbb, jrd_req* request, bool outer);
size_t hashKeys(thread_db* tdbb, jrd_req* request, HashTable* table, jrd_nod* keys);
bool compareKeys(thread_db* tdbb, jrd_req* request);
bool fetchRecord(thread_db* tdbb, HashTable* table, size_t stream);
RecordSource* const m_outer;
BufferedStream* const m_inner;
const jrd_nod* const m_outerKeys;
const jrd_nod* const m_innerKeys;
RecordSource* m_leader;
jrd_nod* m_leaderKeys;
Firebird::Array<BufferedStream*> m_args;
Firebird::Array<jrd_nod*> m_keys;
const bool m_outerJoin;
const bool m_semiJoin;
const bool m_antiJoin;