mirror of
https://github.com/FirebirdSQL/firebird.git
synced 2025-01-24 00:03:03 +01:00
Replace the hash join by specialized join with the already sorted streams
This commit is contained in:
parent
25caf02fe1
commit
e9f377f81e
@ -26,6 +26,12 @@
|
||||
|
||||
#include "../jrd/intl_classes.h"
|
||||
|
||||
namespace Jrd
|
||||
{
|
||||
struct Item;
|
||||
struct ItemInfo;
|
||||
}
|
||||
|
||||
// Implemented in evl.cpp
|
||||
dsc* EVL_add(const dsc*, const Jrd::jrd_nod*, Jrd::impure_value*);
|
||||
dsc* EVL_add2(const dsc*, const Jrd::jrd_nod*, Jrd::impure_value*);
|
||||
|
@ -22,7 +22,9 @@
|
||||
|
||||
#include "firebird.h"
|
||||
#include "../jrd/common.h"
|
||||
#include "../jrd/mov_proto.h"
|
||||
#include "../jrd/opt_proto.h"
|
||||
#include "../jrd/evl_proto.h"
|
||||
#include "../jrd/exe_proto.h"
|
||||
#include "RecordSource.h"
|
||||
|
||||
@ -68,6 +70,52 @@ namespace
|
||||
BufferedStream* m_next;
|
||||
};
|
||||
|
||||
// Make join between outer stream and already sorted (aggregated) partition.
|
||||
class WindowJoin : public RecordSource
|
||||
{
|
||||
struct DscNull
|
||||
{
|
||||
dsc* desc;
|
||||
bool null;
|
||||
};
|
||||
|
||||
struct Impure : public RecordSource::Impure
|
||||
{
|
||||
FB_UINT64 innerRecordCount;
|
||||
};
|
||||
|
||||
public:
|
||||
WindowJoin(CompilerScratch* csb, RecordSource* outer, RecordSource* inner,
|
||||
const jrd_nod* outerKeys, const jrd_nod* innerKeys);
|
||||
|
||||
void open(thread_db* tdbb);
|
||||
void close(thread_db* tdbb);
|
||||
|
||||
bool getRecord(thread_db* tdbb);
|
||||
bool refetchRecord(thread_db* tdbb);
|
||||
bool lockRecord(thread_db* tdbb);
|
||||
|
||||
void dump(thread_db* tdbb, Firebird::UCharBuffer& buffer);
|
||||
|
||||
void markRecursive();
|
||||
void invalidateRecords(jrd_req* request);
|
||||
|
||||
void findUsedStreams(StreamsArray& streams);
|
||||
void nullRecords(thread_db* tdbb);
|
||||
void saveRecords(thread_db* tdbb);
|
||||
void restoreRecords(thread_db* tdbb);
|
||||
|
||||
private:
|
||||
int compareKeys(thread_db* tdbb, jrd_req* request, DscNull* outerValues);
|
||||
|
||||
RecordSource* const m_outer;
|
||||
BufferedStream* const m_inner;
|
||||
const jrd_nod* const m_outerKeys;
|
||||
const jrd_nod* const m_innerKeys;
|
||||
};
|
||||
|
||||
// BufferedStreamWindow implementation
|
||||
|
||||
BufferedStreamWindow::BufferedStreamWindow(CompilerScratch* csb, BufferedStream* next)
|
||||
: m_next(next)
|
||||
{
|
||||
@ -151,6 +199,198 @@ namespace
|
||||
{
|
||||
m_next->restoreRecords(tdbb);
|
||||
}
|
||||
|
||||
// WindowJoin implementation
|
||||
|
||||
WindowJoin::WindowJoin(CompilerScratch* csb, RecordSource* outer, RecordSource* inner,
|
||||
const jrd_nod* outerKeys, const jrd_nod* innerKeys)
|
||||
: m_outer(outer), m_inner(FB_NEW(csb->csb_pool) BufferedStream(csb, inner)),
|
||||
m_outerKeys(outerKeys), m_innerKeys(innerKeys)
|
||||
{
|
||||
fb_assert(m_outer && m_inner && m_innerKeys->nod_count == m_outerKeys->nod_count);
|
||||
fb_assert(m_outerKeys && m_outerKeys->nod_type == nod_list);
|
||||
fb_assert(m_innerKeys && m_innerKeys->nod_type == nod_list);
|
||||
|
||||
m_impure = CMP_impure(csb, sizeof(Impure));
|
||||
}
|
||||
|
||||
void WindowJoin::open(thread_db* tdbb)
|
||||
{
|
||||
jrd_req* const request = tdbb->getRequest();
|
||||
Impure* const impure = (Impure*) ((UCHAR*) request + m_impure);
|
||||
|
||||
impure->irsb_flags = irsb_open;
|
||||
|
||||
// Read and cache the inner stream. Also gets its total number of records.
|
||||
|
||||
m_inner->open(tdbb);
|
||||
FB_UINT64 position = 0;
|
||||
|
||||
while (m_inner->getRecord(tdbb))
|
||||
++position;
|
||||
|
||||
impure->innerRecordCount = position;
|
||||
|
||||
m_outer->open(tdbb);
|
||||
}
|
||||
|
||||
void WindowJoin::close(thread_db* tdbb)
|
||||
{
|
||||
jrd_req* const request = tdbb->getRequest();
|
||||
|
||||
invalidateRecords(request);
|
||||
|
||||
Impure* const impure = (Impure*) ((UCHAR*) request + m_impure);
|
||||
|
||||
if (impure->irsb_flags & irsb_open)
|
||||
{
|
||||
impure->irsb_flags &= ~irsb_open;
|
||||
|
||||
m_outer->close(tdbb);
|
||||
m_inner->close(tdbb);
|
||||
}
|
||||
}
|
||||
|
||||
bool WindowJoin::getRecord(thread_db* tdbb)
|
||||
{
|
||||
jrd_req* const request = tdbb->getRequest();
|
||||
Impure* const impure = (Impure*) ((UCHAR*) request + m_impure);
|
||||
|
||||
if (!(impure->irsb_flags & irsb_open))
|
||||
return false;
|
||||
|
||||
if (!m_outer->getRecord(tdbb))
|
||||
return false;
|
||||
|
||||
// Evaluate the outer stream keys.
|
||||
|
||||
HalfStaticArray<DscNull, 8> outerValues;
|
||||
DscNull* outerValue = outerValues.getBuffer(m_outerKeys->nod_count);
|
||||
|
||||
for (unsigned i = 0; i < m_outerKeys->nod_count; ++i)
|
||||
{
|
||||
outerValue->desc = EVL_expr(tdbb, m_outerKeys->nod_arg[i]);
|
||||
outerValue->null = (request->req_flags & req_null);
|
||||
++outerValue;
|
||||
}
|
||||
|
||||
outerValue -= m_outerKeys->nod_count; // go back to begin
|
||||
|
||||
// Join the streams. That should be a 1-to-1 join.
|
||||
|
||||
SINT64 start = 0;
|
||||
SINT64 finish = impure->innerRecordCount;
|
||||
SINT64 pos = finish / 2;
|
||||
|
||||
while (pos >= start && pos < finish)
|
||||
{
|
||||
m_inner->locate(tdbb, pos);
|
||||
if (!m_inner->getRecord(tdbb))
|
||||
{
|
||||
fb_assert(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
int cmp = compareKeys(tdbb, request, outerValue);
|
||||
|
||||
if (cmp == 0)
|
||||
return true;
|
||||
else if (cmp < 0)
|
||||
{
|
||||
finish = pos;
|
||||
pos -= MAX(1, (finish - start) / 2);
|
||||
}
|
||||
else //if (cmp > 0)
|
||||
{
|
||||
start = pos;
|
||||
pos += MAX(1, (finish - start) / 2);
|
||||
}
|
||||
}
|
||||
|
||||
fb_assert(false);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool WindowJoin::refetchRecord(thread_db* tdbb)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
bool WindowJoin::lockRecord(thread_db* tdbb)
|
||||
{
|
||||
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
|
||||
return false; // compiler silencer
|
||||
}
|
||||
|
||||
void WindowJoin::dump(thread_db* tdbb, UCharBuffer& buffer)
|
||||
{
|
||||
buffer.add(isc_info_rsb_begin);
|
||||
|
||||
m_outer->dump(tdbb, buffer);
|
||||
m_inner->dump(tdbb, buffer);
|
||||
|
||||
buffer.add(isc_info_rsb_end);
|
||||
}
|
||||
|
||||
void WindowJoin::markRecursive()
|
||||
{
|
||||
m_outer->markRecursive();
|
||||
m_inner->markRecursive();
|
||||
}
|
||||
|
||||
void WindowJoin::findUsedStreams(StreamsArray& streams)
|
||||
{
|
||||
m_outer->findUsedStreams(streams);
|
||||
m_inner->findUsedStreams(streams);
|
||||
}
|
||||
|
||||
void WindowJoin::invalidateRecords(jrd_req* request)
|
||||
{
|
||||
m_outer->invalidateRecords(request);
|
||||
m_inner->invalidateRecords(request);
|
||||
}
|
||||
|
||||
void WindowJoin::nullRecords(thread_db* tdbb)
|
||||
{
|
||||
m_outer->nullRecords(tdbb);
|
||||
m_inner->nullRecords(tdbb);
|
||||
}
|
||||
|
||||
void WindowJoin::saveRecords(thread_db* tdbb)
|
||||
{
|
||||
m_outer->saveRecords(tdbb);
|
||||
m_inner->saveRecords(tdbb);
|
||||
}
|
||||
|
||||
void WindowJoin::restoreRecords(thread_db* tdbb)
|
||||
{
|
||||
m_outer->restoreRecords(tdbb);
|
||||
m_inner->restoreRecords(tdbb);
|
||||
}
|
||||
|
||||
int WindowJoin::compareKeys(thread_db* tdbb, jrd_req* request, DscNull* outerValues)
|
||||
{
|
||||
int cmp;
|
||||
|
||||
for (size_t i = 0; i < m_innerKeys->nod_count; i++)
|
||||
{
|
||||
const DscNull& outerValue = outerValues[i];
|
||||
const dsc* const innerDesc = EVL_expr(tdbb, m_innerKeys->nod_arg[i]);
|
||||
const bool innerNull = (request->req_flags & req_null);
|
||||
|
||||
if (outerValue.null && !innerNull)
|
||||
return -1;
|
||||
|
||||
if (!outerValue.null && innerNull)
|
||||
return 1;
|
||||
|
||||
if (!outerValue.null && !innerNull && (cmp = MOV_compare(outerValue.desc, innerDesc)) != 0)
|
||||
return cmp;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
// ------------------------------
|
||||
@ -203,7 +443,7 @@ WindowedStream::WindowedStream(CompilerScratch* csb, const jrd_nod* nodWindows,
|
||||
|
||||
OPT_gen_aggregate_distincts(tdbb, csb, partitionMap);
|
||||
|
||||
m_joinedStream = FB_NEW(csb->csb_pool) HashJoin(csb, m_joinedStream, aggStream,
|
||||
m_joinedStream = FB_NEW(csb->csb_pool) WindowJoin(csb, m_joinedStream, aggStream,
|
||||
partition, repartition);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user