diff --git a/src/jrd/recsrc/HashJoin.cpp b/src/jrd/recsrc/HashJoin.cpp index 700afe96b3..487567cc06 100644 --- a/src/jrd/recsrc/HashJoin.cpp +++ b/src/jrd/recsrc/HashJoin.cpp @@ -37,17 +37,140 @@ using namespace Jrd; // Data access: hash join // ---------------------- -HashJoin::HashJoin(CompilerScratch* csb, RecordSource* outer, RecordSource* inner, - const jrd_nod* outerKeys, const jrd_nod* innerKeys) - : m_outer(outer), m_inner(FB_NEW(csb->csb_pool) BufferedStream(csb, inner)), - m_outerKeys(outerKeys), m_innerKeys(innerKeys), +class HashJoin::HashTable : public PermanentStorage +{ + static const size_t HASH_SIZE = 1009; + + typedef Firebird::Array CollisionList; + +public: + HashTable(MemoryPool& pool, size_t streamCount, size_t tableSize = HASH_SIZE) + : PermanentStorage(pool), m_streamCount(streamCount), m_tableSize(tableSize), m_slot(0) + { + m_collisions = FB_NEW(pool) CollisionList*[streamCount * tableSize]; + memset(m_collisions, 0, streamCount * tableSize * sizeof(CollisionList*)); + + m_iterators = FB_NEW(pool) size_t[streamCount]; + memset(m_iterators, 0, streamCount * sizeof(size_t)); + } + + ~HashTable() + { + for (size_t i = 0; i < m_streamCount * m_tableSize; i++) + { + delete m_collisions[i]; + } + + delete[] m_collisions; + delete[] m_iterators; + } + + size_t hash(const UCHAR* address, size_t length) + { + size_t hash_value = 0; + UCHAR* p = NULL; + const UCHAR* q = address; + for (size_t l = 0; l < length; l++) + { + if (!(l & 3)) + { + p = (UCHAR*) &hash_value; + } + + *p++ += *q++; + } + + return (hash_value % m_tableSize); + } + + void put(size_t stream, size_t slot, FB_UINT64 value) + { + fb_assert(stream < m_streamCount); + fb_assert(slot < m_tableSize); + + CollisionList* collisions = m_collisions[stream * m_streamCount + slot]; + + if (!collisions) + { + collisions = FB_NEW(getPool()) CollisionList(getPool()); + m_collisions[stream * m_streamCount + slot] = collisions; + } + + collisions->add(value); + } + + bool setup(size_t slot) + { + fb_assert(slot < m_tableSize); + + for (size_t i = 0; i < m_streamCount; i++) + { + if (!m_collisions[i * m_streamCount + slot]) + { + return false; + } + + reset(i); + } + + m_slot = slot; + return true; + } + + void reset(size_t stream) + { + fb_assert(stream < m_streamCount); + + m_iterators[stream] = 0; + } + + bool iterate(size_t stream, FB_UINT64& value) + { + fb_assert(stream < m_streamCount); + + CollisionList* const collisions = m_collisions[stream * m_streamCount + m_slot]; + size_t& iterator = m_iterators[stream]; + + if (iterator < collisions->getCount()) + { + value = (*collisions)[iterator++]; + return true; + } + + return false; + } + +private: + const size_t m_streamCount; + const size_t m_tableSize; + CollisionList** m_collisions; + size_t* m_iterators; + size_t m_slot; +}; + + +HashJoin::HashJoin(CompilerScratch* csb, size_t count, + RecordSource* const* args, jrd_nod* const* keys) + : m_args(csb->csb_pool, count - 1), m_keys(csb->csb_pool, count - 1), m_outerJoin(false), m_semiJoin(false), m_antiJoin(false) { - fb_assert(m_outer && m_inner); - fb_assert(m_outerKeys && m_outerKeys->nod_type == nod_list); - fb_assert(m_innerKeys && m_innerKeys->nod_type == nod_list); + fb_assert(count >= 2); m_impure = CMP_impure(csb, sizeof(Impure)); + + fb_assert(args[0]); + m_leader = args[0]; + fb_assert(keys[0]); + m_leaderKeys = keys[0]; + + for (size_t i = 1; i < count; i++) + { + fb_assert(args[i]); + m_args.add(FB_NEW(csb->csb_pool) BufferedStream(csb, args[i])); + + fb_assert(keys[i] && keys[i]->nod_type == nod_list); + m_keys.add(keys[i]); + } } void HashJoin::open(thread_db* tdbb) @@ -57,59 +180,48 @@ void HashJoin::open(thread_db* tdbb) impure->irsb_flags = irsb_open | irsb_mustread; - for (size_t i = 0; i < HASH_SIZE; i++) + delete impure->irsb_hash_table; + MemoryPool& pool = *tdbb->getDefaultPool(); + impure->irsb_hash_table = FB_NEW(pool) HashTable(pool, m_args.getCount()); + + for (size_t i = 0; i < m_args.getCount(); i++) { - delete impure->irsb_hash_table[i]; - impure->irsb_hash_table[i] = NULL; - } + // Read and cache the inner streams. While doing that, + // hash the join condition values and populate hash tables. - // Read and cache the inner stream. While doing that, - // hash the join condition values and populate a hash table. + m_args[i]->open(tdbb); - m_inner->open(tdbb); - FB_UINT64 position = 0; - - while (m_inner->getRecord(tdbb)) - { - const USHORT hash_slot = hashKeys(tdbb, request, false); - CollisionList* collisions = impure->irsb_hash_table[hash_slot]; - - if (!collisions) + FB_UINT64 position = 0; + while (m_args[i]->getRecord(tdbb)) { - MemoryPool& pool = *tdbb->getDefaultPool(); - collisions = FB_NEW(pool) CollisionList(pool); - impure->irsb_hash_table[hash_slot] = collisions; + const size_t hash_slot = hashKeys(tdbb, request, impure->irsb_hash_table, m_keys[i]); + impure->irsb_hash_table->put(i, hash_slot, position++); } - - collisions->add(position++); } - impure->irsb_hash_slot = 0; - impure->irsb_collision = 0; - - m_outer->open(tdbb); + m_leader->open(tdbb); } void HashJoin::close(thread_db* tdbb) { jrd_req* const request = tdbb->getRequest(); + Impure* const impure = (Impure*) ((UCHAR*) request + m_impure); invalidateRecords(request); - Impure* const impure = (Impure*) ((UCHAR*) request + m_impure); - if (impure->irsb_flags & irsb_open) { impure->irsb_flags &= ~irsb_open; - for (size_t i = 0; i < HASH_SIZE; i++) + delete impure->irsb_hash_table; + impure->irsb_hash_table = NULL; + + for (size_t i = 0; i < m_args.getCount(); i++) { - delete impure->irsb_hash_table[i]; - impure->irsb_hash_table[i] = NULL; + m_args[i]->close(tdbb); } - m_outer->close(tdbb); - m_inner->close(tdbb); + m_leader->close(tdbb); } } @@ -127,39 +239,51 @@ bool HashJoin::getRecord(thread_db* tdbb) { if (impure->irsb_flags & irsb_mustread) { - if (!m_outer->getRecord(tdbb)) + // Fetch the record from the leading stream + + if (!m_leader->getRecord(tdbb)) { return false; } - const USHORT hash_slot = hashKeys(tdbb, request, true); + // Hash the comparison keys - if (!impure->irsb_hash_table[hash_slot]) + const size_t hash_slot = hashKeys(tdbb, request, impure->irsb_hash_table, m_leaderKeys); + + // Ensure the every inner stream having matches for this hash slot. + // Setup the hash table for the iteration through collisions. + + if (!impure->irsb_hash_table->setup(hash_slot)) { continue; } - impure->irsb_hash_slot = hash_slot; - impure->irsb_collision = 0; impure->irsb_flags &= ~irsb_mustread; + impure->irsb_flags |= irsb_first; } - CollisionList* const collisions = impure->irsb_hash_table[impure->irsb_hash_slot]; + // Fetch collisions from the inner streams - if (impure->irsb_collision >= collisions->getCount()) + if (impure->irsb_flags & irsb_first) + { + for (size_t i = 0; i < m_args.getCount(); i++) + { + if (!fetchRecord(tdbb, impure->irsb_hash_table, i)) + { + fb_assert(false); + return false; + } + } + + impure->irsb_flags &= ~irsb_first; + } + else if (!fetchRecord(tdbb, impure->irsb_hash_table, m_args.getCount() - 1)) { impure->irsb_flags |= irsb_mustread; continue; } - const FB_UINT64 position = (*collisions)[impure->irsb_collision++]; - m_inner->locate(tdbb, position); - - if (!m_inner->getRecord(tdbb)) - { - fb_assert(false); - return false; - } + // Ensure that the comparison keys are really equal if (compareKeys(tdbb, request)) { @@ -188,63 +312,85 @@ void HashJoin::dump(thread_db* tdbb, UCharBuffer& buffer) buffer.add(isc_info_rsb_type); buffer.add(isc_info_rsb_hash); - buffer.add(2); + const size_t count = m_args.getCount() + 1; + // This place must be reviewed if we allow more than 255 joins + fb_assert(count <= USHORT(MAX_UCHAR)); + buffer.add((UCHAR) count); - m_outer->dump(tdbb, buffer); - m_inner->dump(tdbb, buffer); + m_leader->dump(tdbb, buffer); + for (size_t i = 0; i < m_args.getCount(); i++) + { + m_args[i]->dump(tdbb, buffer); + } buffer.add(isc_info_rsb_end); } void HashJoin::markRecursive() { - m_outer->markRecursive(); - m_inner->markRecursive(); + m_leader->markRecursive(); + for (size_t i = 0; i < m_args.getCount(); i++) + { + m_args[i]->markRecursive(); + } } void HashJoin::findUsedStreams(StreamsArray& streams) { - m_outer->findUsedStreams(streams); - m_inner->findUsedStreams(streams); + m_leader->findUsedStreams(streams); + for (size_t i = 0; i < m_args.getCount(); i++) + { + m_args[i]->findUsedStreams(streams); + } } void HashJoin::invalidateRecords(jrd_req* request) { - m_outer->invalidateRecords(request); - m_inner->invalidateRecords(request); + m_leader->invalidateRecords(request); + for (size_t i = 0; i < m_args.getCount(); i++) + { + m_args[i]->invalidateRecords(request); + } } void HashJoin::nullRecords(thread_db* tdbb) { - m_outer->nullRecords(tdbb); - m_inner->nullRecords(tdbb); + m_leader->nullRecords(tdbb); + for (size_t i = 0; i < m_args.getCount(); i++) + { + m_args[i]->nullRecords(tdbb); + } } void HashJoin::saveRecords(thread_db* tdbb) { - m_outer->saveRecords(tdbb); - m_inner->saveRecords(tdbb); + m_leader->saveRecords(tdbb); + for (size_t i = 0; i < m_args.getCount(); i++) + { + m_args[i]->saveRecords(tdbb); + } } void HashJoin::restoreRecords(thread_db* tdbb) { - m_outer->restoreRecords(tdbb); - m_inner->restoreRecords(tdbb); + m_leader->restoreRecords(tdbb); + for (size_t i = 0; i < m_args.getCount(); i++) + { + m_args[i]->restoreRecords(tdbb); + } } -USHORT HashJoin::hashKeys(thread_db* tdbb, jrd_req* request, bool outer) +size_t HashJoin::hashKeys(thread_db* tdbb, jrd_req* request, HashTable* table, jrd_nod* keys) { - const jrd_nod* const list = outer ? m_outerKeys : m_innerKeys; + size_t hash_slot = 0; - ULONG hash_value = 0; - - for (size_t i = 0; i < list->nod_count; i++) + for (size_t i = 0; i < keys->nod_count; i++) { - const dsc* const desc = EVL_expr(tdbb, list->nod_arg[i]); + const dsc* const desc = EVL_expr(tdbb, keys->nod_arg[i]); if (desc && !(request->req_flags & req_null)) { - USHORT length = desc->dsc_length; + size_t length = desc->dsc_length; const UCHAR* address = desc->dsc_address; if (desc->dsc_dtype == dtype_varying) @@ -258,43 +404,72 @@ USHORT HashJoin::hashKeys(thread_db* tdbb, jrd_req* request, bool outer) length = strlen((char*) address); } - UCHAR* p = NULL; - const UCHAR* q = address; - for (USHORT l = 0; l < length; l++) - { - if (!(l & 3)) - { - p = (UCHAR*) &hash_value; - } - - *p++ += *q++; - } + hash_slot ^= table->hash(address, length); } } - return (USHORT) (hash_value % HASH_SIZE); + return hash_slot; } bool HashJoin::compareKeys(thread_db* tdbb, jrd_req* request) { - for (size_t i = 0; i < m_outerKeys->nod_count; i++) + for (size_t i = 0; i < m_leaderKeys->nod_count; i++) { - const dsc* const desc1 = EVL_expr(tdbb, m_outerKeys->nod_arg[i]); + const dsc* const desc1 = EVL_expr(tdbb, m_leaderKeys->nod_arg[i]); const bool null1 = (request->req_flags & req_null); - const dsc* const desc2 = EVL_expr(tdbb, m_innerKeys->nod_arg[i]); - const bool null2 = (request->req_flags & req_null); - - if (null1 != null2) + for (size_t j = 0; j < m_keys.getCount(); j++) { - return false; - } + const dsc* const desc2 = EVL_expr(tdbb, m_keys[j]->nod_arg[i]); + const bool null2 = (request->req_flags & req_null); - if (!null1 && !null2 && MOV_compare(desc1, desc2) != 0) - { - return false; + if (null1 != null2) + { + return false; + } + + if (!null1 && !null2 && MOV_compare(desc1, desc2) != 0) + { + return false; + } } } return true; } + +bool HashJoin::fetchRecord(thread_db* tdbb, HashTable* table, size_t stream) +{ + BufferedStream* const arg = m_args[stream]; + FB_UINT64 position; + + if (table->iterate(stream, position)) + { + arg->locate(tdbb, position); + + if (arg->getRecord(tdbb)) + { + return true; + } + } + + while (true) + { + if (stream == 0 || !fetchRecord(tdbb, table, stream - 1)) + { + return false; + } + + table->reset(stream); + + if (table->iterate(stream, position)) + { + arg->locate(tdbb, position); + + if (arg->getRecord(tdbb)) + { + return true; + } + } + } +} diff --git a/src/jrd/recsrc/RecordSource.h b/src/jrd/recsrc/RecordSource.h index 76f844a8b5..804f268702 100644 --- a/src/jrd/recsrc/RecordSource.h +++ b/src/jrd/recsrc/RecordSource.h @@ -750,19 +750,16 @@ namespace Jrd class HashJoin : public RecordSource { - static const size_t HASH_SIZE = 1009; - typedef Firebird::Array CollisionList; + class HashTable; struct Impure : public RecordSource::Impure { - CollisionList* irsb_hash_table[HASH_SIZE]; - USHORT irsb_hash_slot; - size_t irsb_collision; + HashTable* irsb_hash_table; }; public: - HashJoin(CompilerScratch* csb, RecordSource* outer, RecordSource* inner, - const jrd_nod* outerKeys, const jrd_nod* innerKeys); + HashJoin(CompilerScratch* csb, size_t count, + RecordSource* const* args, jrd_nod* const* keys); void open(thread_db* tdbb); void close(thread_db* tdbb); @@ -782,13 +779,14 @@ namespace Jrd void restoreRecords(thread_db* tdbb); private: - USHORT hashKeys(thread_db* tdbb, jrd_req* request, bool outer); + size_t hashKeys(thread_db* tdbb, jrd_req* request, HashTable* table, jrd_nod* keys); bool compareKeys(thread_db* tdbb, jrd_req* request); + bool fetchRecord(thread_db* tdbb, HashTable* table, size_t stream); - RecordSource* const m_outer; - BufferedStream* const m_inner; - const jrd_nod* const m_outerKeys; - const jrd_nod* const m_innerKeys; + RecordSource* m_leader; + jrd_nod* m_leaderKeys; + Firebird::Array m_args; + Firebird::Array m_keys; const bool m_outerJoin; const bool m_semiJoin; const bool m_antiJoin;