From e9f377f81e865c6da509c6a681750b2c1b8e1cd2 Mon Sep 17 00:00:00 2001 From: asfernandes Date: Wed, 20 Jan 2010 00:02:58 +0000 Subject: [PATCH] Replace the hash join by specialized join with the already sorted streams --- src/jrd/evl_proto.h | 6 + src/jrd/recsrc/WindowedStream.cpp | 242 +++++++++++++++++++++++++++++- 2 files changed, 247 insertions(+), 1 deletion(-) diff --git a/src/jrd/evl_proto.h b/src/jrd/evl_proto.h index 6bdec66e4a..c74cfb1243 100644 --- a/src/jrd/evl_proto.h +++ b/src/jrd/evl_proto.h @@ -26,6 +26,12 @@ #include "../jrd/intl_classes.h" +namespace Jrd +{ + struct Item; + struct ItemInfo; +} + // Implemented in evl.cpp dsc* EVL_add(const dsc*, const Jrd::jrd_nod*, Jrd::impure_value*); dsc* EVL_add2(const dsc*, const Jrd::jrd_nod*, Jrd::impure_value*); diff --git a/src/jrd/recsrc/WindowedStream.cpp b/src/jrd/recsrc/WindowedStream.cpp index 1f034f97ff..facaa770a4 100644 --- a/src/jrd/recsrc/WindowedStream.cpp +++ b/src/jrd/recsrc/WindowedStream.cpp @@ -22,7 +22,9 @@ #include "firebird.h" #include "../jrd/common.h" +#include "../jrd/mov_proto.h" #include "../jrd/opt_proto.h" +#include "../jrd/evl_proto.h" #include "../jrd/exe_proto.h" #include "RecordSource.h" @@ -68,6 +70,52 @@ namespace BufferedStream* m_next; }; + // Make join between outer stream and already sorted (aggregated) partition. + class WindowJoin : public RecordSource + { + struct DscNull + { + dsc* desc; + bool null; + }; + + struct Impure : public RecordSource::Impure + { + FB_UINT64 innerRecordCount; + }; + + public: + WindowJoin(CompilerScratch* csb, RecordSource* outer, RecordSource* inner, + const jrd_nod* outerKeys, const jrd_nod* innerKeys); + + void open(thread_db* tdbb); + void close(thread_db* tdbb); + + bool getRecord(thread_db* tdbb); + bool refetchRecord(thread_db* tdbb); + bool lockRecord(thread_db* tdbb); + + void dump(thread_db* tdbb, Firebird::UCharBuffer& buffer); + + void markRecursive(); + void invalidateRecords(jrd_req* request); + + void findUsedStreams(StreamsArray& streams); + void nullRecords(thread_db* tdbb); + void saveRecords(thread_db* tdbb); + void restoreRecords(thread_db* tdbb); + + private: + int compareKeys(thread_db* tdbb, jrd_req* request, DscNull* outerValues); + + RecordSource* const m_outer; + BufferedStream* const m_inner; + const jrd_nod* const m_outerKeys; + const jrd_nod* const m_innerKeys; + }; + + // BufferedStreamWindow implementation + BufferedStreamWindow::BufferedStreamWindow(CompilerScratch* csb, BufferedStream* next) : m_next(next) { @@ -151,6 +199,198 @@ namespace { m_next->restoreRecords(tdbb); } + + // WindowJoin implementation + + WindowJoin::WindowJoin(CompilerScratch* csb, RecordSource* outer, RecordSource* inner, + const jrd_nod* outerKeys, const jrd_nod* innerKeys) + : m_outer(outer), m_inner(FB_NEW(csb->csb_pool) BufferedStream(csb, inner)), + m_outerKeys(outerKeys), m_innerKeys(innerKeys) + { + fb_assert(m_outer && m_inner && m_innerKeys->nod_count == m_outerKeys->nod_count); + fb_assert(m_outerKeys && m_outerKeys->nod_type == nod_list); + fb_assert(m_innerKeys && m_innerKeys->nod_type == nod_list); + + m_impure = CMP_impure(csb, sizeof(Impure)); + } + + void WindowJoin::open(thread_db* tdbb) + { + jrd_req* const request = tdbb->getRequest(); + Impure* const impure = (Impure*) ((UCHAR*) request + m_impure); + + impure->irsb_flags = irsb_open; + + // Read and cache the inner stream. Also gets its total number of records. + + m_inner->open(tdbb); + FB_UINT64 position = 0; + + while (m_inner->getRecord(tdbb)) + ++position; + + impure->innerRecordCount = position; + + m_outer->open(tdbb); + } + + void WindowJoin::close(thread_db* tdbb) + { + jrd_req* const request = tdbb->getRequest(); + + invalidateRecords(request); + + Impure* const impure = (Impure*) ((UCHAR*) request + m_impure); + + if (impure->irsb_flags & irsb_open) + { + impure->irsb_flags &= ~irsb_open; + + m_outer->close(tdbb); + m_inner->close(tdbb); + } + } + + bool WindowJoin::getRecord(thread_db* tdbb) + { + jrd_req* const request = tdbb->getRequest(); + Impure* const impure = (Impure*) ((UCHAR*) request + m_impure); + + if (!(impure->irsb_flags & irsb_open)) + return false; + + if (!m_outer->getRecord(tdbb)) + return false; + + // Evaluate the outer stream keys. + + HalfStaticArray outerValues; + DscNull* outerValue = outerValues.getBuffer(m_outerKeys->nod_count); + + for (unsigned i = 0; i < m_outerKeys->nod_count; ++i) + { + outerValue->desc = EVL_expr(tdbb, m_outerKeys->nod_arg[i]); + outerValue->null = (request->req_flags & req_null); + ++outerValue; + } + + outerValue -= m_outerKeys->nod_count; // go back to begin + + // Join the streams. That should be a 1-to-1 join. + + SINT64 start = 0; + SINT64 finish = impure->innerRecordCount; + SINT64 pos = finish / 2; + + while (pos >= start && pos < finish) + { + m_inner->locate(tdbb, pos); + if (!m_inner->getRecord(tdbb)) + { + fb_assert(false); + return false; + } + + int cmp = compareKeys(tdbb, request, outerValue); + + if (cmp == 0) + return true; + else if (cmp < 0) + { + finish = pos; + pos -= MAX(1, (finish - start) / 2); + } + else //if (cmp > 0) + { + start = pos; + pos += MAX(1, (finish - start) / 2); + } + } + + fb_assert(false); + + return false; + } + + bool WindowJoin::refetchRecord(thread_db* tdbb) + { + return true; + } + + bool WindowJoin::lockRecord(thread_db* tdbb) + { + status_exception::raise(Arg::Gds(isc_record_lock_not_supp)); + return false; // compiler silencer + } + + void WindowJoin::dump(thread_db* tdbb, UCharBuffer& buffer) + { + buffer.add(isc_info_rsb_begin); + + m_outer->dump(tdbb, buffer); + m_inner->dump(tdbb, buffer); + + buffer.add(isc_info_rsb_end); + } + + void WindowJoin::markRecursive() + { + m_outer->markRecursive(); + m_inner->markRecursive(); + } + + void WindowJoin::findUsedStreams(StreamsArray& streams) + { + m_outer->findUsedStreams(streams); + m_inner->findUsedStreams(streams); + } + + void WindowJoin::invalidateRecords(jrd_req* request) + { + m_outer->invalidateRecords(request); + m_inner->invalidateRecords(request); + } + + void WindowJoin::nullRecords(thread_db* tdbb) + { + m_outer->nullRecords(tdbb); + m_inner->nullRecords(tdbb); + } + + void WindowJoin::saveRecords(thread_db* tdbb) + { + m_outer->saveRecords(tdbb); + m_inner->saveRecords(tdbb); + } + + void WindowJoin::restoreRecords(thread_db* tdbb) + { + m_outer->restoreRecords(tdbb); + m_inner->restoreRecords(tdbb); + } + + int WindowJoin::compareKeys(thread_db* tdbb, jrd_req* request, DscNull* outerValues) + { + int cmp; + + for (size_t i = 0; i < m_innerKeys->nod_count; i++) + { + const DscNull& outerValue = outerValues[i]; + const dsc* const innerDesc = EVL_expr(tdbb, m_innerKeys->nod_arg[i]); + const bool innerNull = (request->req_flags & req_null); + + if (outerValue.null && !innerNull) + return -1; + + if (!outerValue.null && innerNull) + return 1; + + if (!outerValue.null && !innerNull && (cmp = MOV_compare(outerValue.desc, innerDesc)) != 0) + return cmp; + } + + return 0; + } } // namespace // ------------------------------ @@ -203,7 +443,7 @@ WindowedStream::WindowedStream(CompilerScratch* csb, const jrd_nod* nodWindows, OPT_gen_aggregate_distincts(tdbb, csb, partitionMap); - m_joinedStream = FB_NEW(csb->csb_pool) HashJoin(csb, m_joinedStream, aggStream, + m_joinedStream = FB_NEW(csb->csb_pool) WindowJoin(csb, m_joinedStream, aggStream, partition, repartition); }