/* * The contents of this file are subject to the Initial * Developer's Public License Version 1.0 (the "License"); * you may not use this file except in compliance with the * License. You may obtain a copy of the License at * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl. * * Software distributed under the License is distributed AS IS, * WITHOUT WARRANTY OF ANY KIND, either express or implied. * See the License for the specific language governing rights * and limitations under the License. * * The Original Code was created by Dmitry Yemanov * for the Firebird Open Source RDBMS project. * * Copyright (c) 2009 Dmitry Yemanov * and all contributors signed below. * * All Rights Reserved. * Contributor(s): ______________________________________. */ #include "firebird.h" #include "../jrd/common.h" #include "../jrd/jrd.h" #include "../jrd/btr.h" #include "../jrd/req.h" #include "../jrd/intl.h" #include "../jrd/cmp_proto.h" #include "../jrd/evl_proto.h" #include "../jrd/mov_proto.h" #include "../jrd/intl_proto.h" #include "RecordSource.h" using namespace Firebird; using namespace Jrd; // ---------------------- // Data access: hash join // ---------------------- class HashJoin::HashTable : public PermanentStorage { static const size_t HASH_SIZE = 1009; typedef Firebird::Array CollisionList; public: HashTable(MemoryPool& pool, size_t streamCount, size_t tableSize = HASH_SIZE) : PermanentStorage(pool), m_streamCount(streamCount), m_tableSize(tableSize), m_slot(0) { m_collisions = FB_NEW(pool) CollisionList*[streamCount * tableSize]; memset(m_collisions, 0, streamCount * tableSize * sizeof(CollisionList*)); m_iterators = FB_NEW(pool) size_t[streamCount]; memset(m_iterators, 0, streamCount * sizeof(size_t)); } ~HashTable() { for (size_t i = 0; i < m_streamCount * m_tableSize; i++) { delete m_collisions[i]; } delete[] m_collisions; delete[] m_iterators; } size_t hash(const UCHAR* address, size_t length) { size_t hash_value = 0; UCHAR* p = NULL; const UCHAR* q = address; for (size_t l = 0; l < length; l++) { if (!(l & 3)) { p = (UCHAR*) &hash_value; } *p++ += *q++; } return (hash_value % m_tableSize); } void put(size_t stream, size_t slot, FB_UINT64 value) { fb_assert(stream < m_streamCount); fb_assert(slot < m_tableSize); CollisionList* collisions = m_collisions[stream * m_streamCount + slot]; if (!collisions) { collisions = FB_NEW(getPool()) CollisionList(getPool()); m_collisions[stream * m_streamCount + slot] = collisions; } collisions->add(value); } bool setup(size_t slot) { fb_assert(slot < m_tableSize); for (size_t i = 0; i < m_streamCount; i++) { if (!m_collisions[i * m_streamCount + slot]) { return false; } reset(i); } m_slot = slot; return true; } void reset(size_t stream) { fb_assert(stream < m_streamCount); m_iterators[stream] = 0; } bool iterate(size_t stream, FB_UINT64& value) { fb_assert(stream < m_streamCount); CollisionList* const collisions = m_collisions[stream * m_streamCount + m_slot]; size_t& iterator = m_iterators[stream]; if (iterator < collisions->getCount()) { value = (*collisions)[iterator++]; return true; } return false; } private: const size_t m_streamCount; const size_t m_tableSize; CollisionList** m_collisions; size_t* m_iterators; size_t m_slot; }; HashJoin::HashJoin(CompilerScratch* csb, size_t count, RecordSource* const* args, jrd_nod* const* keys) : m_args(csb->csb_pool, count - 1), m_keys(csb->csb_pool, count - 1), m_outerJoin(false), m_semiJoin(false), m_antiJoin(false) { fb_assert(count >= 2); m_impure = CMP_impure(csb, sizeof(Impure)); fb_assert(args[0]); m_leader = args[0]; fb_assert(keys[0]); m_leaderKeys = keys[0]; for (size_t i = 1; i < count; i++) { fb_assert(args[i]); m_args.add(FB_NEW(csb->csb_pool) BufferedStream(csb, args[i])); fb_assert(keys[i] && keys[i]->nod_type == nod_list); m_keys.add(keys[i]); } } void HashJoin::open(thread_db* tdbb) { jrd_req* const request = tdbb->getRequest(); Impure* const impure = request->getImpure(m_impure); impure->irsb_flags = irsb_open | irsb_mustread; delete impure->irsb_hash_table; MemoryPool& pool = *tdbb->getDefaultPool(); impure->irsb_hash_table = FB_NEW(pool) HashTable(pool, m_args.getCount()); for (size_t i = 0; i < m_args.getCount(); i++) { // Read and cache the inner streams. While doing that, // hash the join condition values and populate hash tables. m_args[i]->open(tdbb); FB_UINT64 position = 0; while (m_args[i]->getRecord(tdbb)) { const size_t hash_slot = hashKeys(tdbb, request, impure->irsb_hash_table, m_keys[i]); impure->irsb_hash_table->put(i, hash_slot, position++); } } m_leader->open(tdbb); } void HashJoin::close(thread_db* tdbb) { jrd_req* const request = tdbb->getRequest(); Impure* const impure = request->getImpure(m_impure); invalidateRecords(request); if (impure->irsb_flags & irsb_open) { impure->irsb_flags &= ~irsb_open; delete impure->irsb_hash_table; impure->irsb_hash_table = NULL; for (size_t i = 0; i < m_args.getCount(); i++) { m_args[i]->close(tdbb); } m_leader->close(tdbb); } } bool HashJoin::getRecord(thread_db* tdbb) { jrd_req* const request = tdbb->getRequest(); Impure* const impure = request->getImpure(m_impure); if (!(impure->irsb_flags & irsb_open)) { return false; } while (true) { if (impure->irsb_flags & irsb_mustread) { // Fetch the record from the leading stream if (!m_leader->getRecord(tdbb)) { return false; } // Hash the comparison keys const size_t hash_slot = hashKeys(tdbb, request, impure->irsb_hash_table, m_leaderKeys); // Ensure the every inner stream having matches for this hash slot. // Setup the hash table for the iteration through collisions. if (!impure->irsb_hash_table->setup(hash_slot)) { continue; } impure->irsb_flags &= ~irsb_mustread; impure->irsb_flags |= irsb_first; } // Fetch collisions from the inner streams if (impure->irsb_flags & irsb_first) { for (size_t i = 0; i < m_args.getCount(); i++) { if (!fetchRecord(tdbb, impure->irsb_hash_table, i)) { fb_assert(false); return false; } } impure->irsb_flags &= ~irsb_first; } else if (!fetchRecord(tdbb, impure->irsb_hash_table, m_args.getCount() - 1)) { impure->irsb_flags |= irsb_mustread; continue; } // Ensure that the comparison keys are really equal if (compareKeys(tdbb, request)) { break; } } return true; } bool HashJoin::refetchRecord(thread_db* tdbb) { return true; } bool HashJoin::lockRecord(thread_db* tdbb) { status_exception::raise(Arg::Gds(isc_record_lock_not_supp)); return false; // compiler silencer } void HashJoin::dump(thread_db* tdbb, UCharBuffer& buffer) { buffer.add(isc_info_rsb_begin); buffer.add(isc_info_rsb_type); buffer.add(isc_info_rsb_hash); const size_t count = m_args.getCount() + 1; // This place must be reviewed if we allow more than 255 joins fb_assert(count <= USHORT(MAX_UCHAR)); buffer.add((UCHAR) count); m_leader->dump(tdbb, buffer); for (size_t i = 0; i < m_args.getCount(); i++) { m_args[i]->dump(tdbb, buffer); } buffer.add(isc_info_rsb_end); } void HashJoin::markRecursive() { m_leader->markRecursive(); for (size_t i = 0; i < m_args.getCount(); i++) { m_args[i]->markRecursive(); } } void HashJoin::findUsedStreams(StreamsArray& streams) { m_leader->findUsedStreams(streams); for (size_t i = 0; i < m_args.getCount(); i++) { m_args[i]->findUsedStreams(streams); } } void HashJoin::invalidateRecords(jrd_req* request) { m_leader->invalidateRecords(request); for (size_t i = 0; i < m_args.getCount(); i++) { m_args[i]->invalidateRecords(request); } } void HashJoin::nullRecords(thread_db* tdbb) { m_leader->nullRecords(tdbb); for (size_t i = 0; i < m_args.getCount(); i++) { m_args[i]->nullRecords(tdbb); } } void HashJoin::saveRecords(thread_db* tdbb) { m_leader->saveRecords(tdbb); for (size_t i = 0; i < m_args.getCount(); i++) { m_args[i]->saveRecords(tdbb); } } void HashJoin::restoreRecords(thread_db* tdbb) { m_leader->restoreRecords(tdbb); for (size_t i = 0; i < m_args.getCount(); i++) { m_args[i]->restoreRecords(tdbb); } } size_t HashJoin::hashKeys(thread_db* tdbb, jrd_req* request, HashTable* table, jrd_nod* keys) { size_t hash_slot = 0; for (size_t i = 0; i < keys->nod_count; i++) { const dsc* const desc = EVL_expr(tdbb, keys->nod_arg[i]); if (desc && !(request->req_flags & req_null)) { fb_assert(!desc->isBlob()); size_t length = desc->dsc_length; const UCHAR* address = desc->dsc_address; MoveBuffer buffer; if (desc->isText()) { // Adjust the data length to the real string length if (desc->dsc_dtype == dtype_varying) { const vary* const string = (vary*) address; length = string->vary_length; address = (const UCHAR*) string->vary_string; } else if (desc->dsc_dtype == dtype_cstring) { length = strlen((char*) address); } if (IS_INTL_DATA(desc)) { // Convert the INTL string into the binary comparable form TextType* const obj = INTL_texttype_lookup(tdbb, desc->getTextType()); const USHORT key_length = obj->key_length(length); length = obj->string_to_key(length, address, key_length, buffer.getBuffer(key_length), INTL_KEY_UNIQUE); address = buffer.begin(); } else { // Adjust the data length to ignore trailing spaces CHARSET_ID charset = desc->getCharSet(); if (charset == ttype_dynamic) { charset = tdbb->getCharSet(); } const UCHAR space = (charset == ttype_binary) ? '\0' : ' '; const UCHAR* ptr = address + length; while (length && *--ptr == space) { length--; } } } hash_slot ^= table->hash(address, length); } } return hash_slot; } bool HashJoin::compareKeys(thread_db* tdbb, jrd_req* request) { for (size_t i = 0; i < m_leaderKeys->nod_count; i++) { const dsc* const desc1 = EVL_expr(tdbb, m_leaderKeys->nod_arg[i]); const bool null1 = (request->req_flags & req_null); for (size_t j = 0; j < m_keys.getCount(); j++) { const dsc* const desc2 = EVL_expr(tdbb, m_keys[j]->nod_arg[i]); const bool null2 = (request->req_flags & req_null); if (null1 != null2) { return false; } if (!null1 && !null2 && MOV_compare(desc1, desc2) != 0) { return false; } } } return true; } bool HashJoin::fetchRecord(thread_db* tdbb, HashTable* table, size_t stream) { BufferedStream* const arg = m_args[stream]; FB_UINT64 position; if (table->iterate(stream, position)) { arg->locate(tdbb, position); if (arg->getRecord(tdbb)) { return true; } } while (true) { if (stream == 0 || !fetchRecord(tdbb, table, stream - 1)) { return false; } table->reset(stream); if (table->iterate(stream, position)) { arg->locate(tdbb, position); if (arg->getRecord(tdbb)) { return true; } } } }