From 0ce6fb958c740cdf369c7983a49a4e69841759c6 Mon Sep 17 00:00:00 2001 From: asfernandes Date: Tue, 16 Feb 2010 00:26:53 +0000 Subject: [PATCH] Feature CORE-2869 - Window functions: LAG and LEAD. --- src/dsql/Nodes.h | 3 +- src/dsql/WinNodes.cpp | 166 +++++++++++++++++++++-- src/dsql/WinNodes.h | 53 +++++++- src/dsql/keywords.cpp | 2 + src/dsql/parse.y | 12 ++ src/jrd/recsrc/AggregatedStream.cpp | 181 +++++++++++++++++++++---- src/jrd/recsrc/RecordSource.h | 37 ++++- src/jrd/recsrc/WindowedStream.cpp | 202 +++++++++------------------- 8 files changed, 475 insertions(+), 181 deletions(-) diff --git a/src/dsql/Nodes.h b/src/dsql/Nodes.h index c95218ce44..7b373e95a8 100644 --- a/src/dsql/Nodes.h +++ b/src/dsql/Nodes.h @@ -36,6 +36,7 @@ class CompilerScratch; class dsql_nod; class ExprNode; class jrd_nod; +class SlidingWindow; class TypeClause; @@ -410,7 +411,7 @@ public: return false; } - virtual dsc* winPass(thread_db* tdbb, jrd_req* request) const + virtual dsc* winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const { return NULL; } diff --git a/src/dsql/WinNodes.cpp b/src/dsql/WinNodes.cpp index f4dd9e2307..db89ab1209 100644 --- a/src/dsql/WinNodes.cpp +++ b/src/dsql/WinNodes.cpp @@ -23,10 +23,13 @@ #include "firebird.h" #include "../jrd/common.h" #include "../dsql/WinNodes.h" +#include "../dsql/make_proto.h" +#include "../dsql/pass1_proto.h" #include "../jrd/cmp_proto.h" #include "../jrd/evl_proto.h" #include "../jrd/mov_proto.h" #include "../jrd/par_proto.h" +#include "../jrd/recsrc/RecordSource.h" using namespace Firebird; using namespace Jrd; @@ -50,20 +53,35 @@ DmlNode* WinFuncNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* MetaName name; PAR_name(csb, name); - UCHAR count = csb->csb_blr_reader.getByte(); - - fb_assert(count == 0); // Arguments not yet supported here. - if (count != 0) - PAR_error(csb, Arg::Gds(isc_funmismat) << name); + WinFuncNode* node = NULL; for (const Factory* factory = factories; factory; factory = factory->next) { if (name == factory->name) - return factory->newInstance(pool); + { + node = factory->newInstance(pool); + break; + } } - PAR_error(csb, Arg::Gds(isc_funmismat) << name); - return NULL; // silence + if (!node) + PAR_error(csb, Arg::Gds(isc_funnotdef) << name); + + UCHAR count = csb->csb_blr_reader.getByte(); + + if (count != node->jrdChildNodes.getCount()) + PAR_error(csb, Arg::Gds(isc_funmismat) << name); + + if (count != 0) + { + jrd_nod*** arg = node->jrdChildNodes.begin(); + do + { + **arg++ = PAR_parse_node(tdbb, csb, VALUE); + } while (--count); + } + + return node; } @@ -75,6 +93,9 @@ static WinFuncNode::Register denseRankWinInfo("DENSE_RANK"); DenseRankWinNode::DenseRankWinNode(MemoryPool& pool) : WinFuncNode(pool, denseRankWinInfo) { + fb_assert(dsqlChildNodes.getCount() == 1 && jrdChildNodes.getCount() == 1); + dsqlChildNodes.clear(); + jrdChildNodes.clear(); } void DenseRankWinNode::make(dsc* desc, dsql_nod* nullReplacement) @@ -127,6 +148,9 @@ RankWinNode::RankWinNode(MemoryPool& pool) : WinFuncNode(pool, rankWinInfo), tempImpure(0) { + fb_assert(dsqlChildNodes.getCount() == 1 && jrdChildNodes.getCount() == 1); + dsqlChildNodes.clear(); + jrdChildNodes.clear(); } void RankWinNode::make(dsc* desc, dsql_nod* nullReplacement) @@ -196,6 +220,9 @@ static WinFuncNode::Register rowNumberWinInfo("ROW_NUMBER"); RowNumberWinNode::RowNumberWinNode(MemoryPool& pool) : WinFuncNode(pool, rowNumberWinInfo) { + fb_assert(dsqlChildNodes.getCount() == 1 && jrdChildNodes.getCount() == 1); + dsqlChildNodes.clear(); + jrdChildNodes.clear(); } void RowNumberWinNode::make(dsc* desc, dsql_nod* nullReplacement) @@ -232,7 +259,7 @@ dsc* RowNumberWinNode::aggExecute(thread_db* tdbb, jrd_req* request) const return &impure->vlu_desc; } -dsc* RowNumberWinNode::winPass(thread_db* tdbb, jrd_req* request) const +dsc* RowNumberWinNode::winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const { impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + node->nod_impure); ++impure->vlu_misc.vlu_int64; @@ -245,4 +272,125 @@ AggNode* RowNumberWinNode::dsqlCopy() const } +//-------------------- + + +// A direction of -1 is LAG, and 1 is LEAD. +LagLeadWinNode::LagLeadWinNode(MemoryPool& pool, const AggInfo& aAggInfo, int aDirection, + dsql_nod* aArg, dsql_nod* aRows) + : WinFuncNode(pool, aAggInfo, aArg), + direction(aDirection), + dsqlRows(aRows), + rows(NULL) +{ + fb_assert(direction == -1 || direction == 1); + + dsqlChildNodes.add(&dsqlRows); + jrdChildNodes.add(&rows); +} + +void LagLeadWinNode::make(dsc* desc, dsql_nod* nullReplacement) +{ + MAKE_desc(dsqlScratch, desc, dsqlArg, nullReplacement); + desc->setNullable(true); +} + +void LagLeadWinNode::getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc) +{ + CMP_get_desc(tdbb, csb, arg, desc); +} + +void LagLeadWinNode::aggInit(thread_db* tdbb, jrd_req* request) const +{ + AggNode::aggInit(tdbb, request); + + impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + node->nod_impure); + impure->make_int64(0, 0); +} + +void LagLeadWinNode::aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const +{ +} + +dsc* LagLeadWinNode::aggExecute(thread_db* tdbb, jrd_req* request) const +{ + return NULL; +} + +dsc* LagLeadWinNode::winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const +{ + impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + node->nod_impure); + + dsc* desc = EVL_expr(tdbb, rows); + SINT64 records; + + if (!desc || (request->req_flags & req_null) || (records = MOV_get_int64(desc, 0)) < 0) + { + status_exception::raise(Arg::Gds(isc_sysf_argnmustbe_nonneg) << + Arg::Num(2) << Arg::Str(aggInfo.name)); + } + + if (!window->move(records * direction)) + return NULL; + + desc = EVL_expr(tdbb, arg); + if (!desc || (request->req_flags & req_null)) + return NULL; + + return desc; +} + + +//-------------------- + + +static WinFuncNode::Register lagWinInfo("LAG"); + +LagWinNode::LagWinNode(MemoryPool& pool, dsql_nod* aArg, dsql_nod* aRows) + : LagLeadWinNode(pool, lagWinInfo, -1, aArg, aRows) +{ +} + +ExprNode* LagWinNode::copy(thread_db* tdbb, NodeCopier& copier) const +{ + LagWinNode* node = FB_NEW(*tdbb->getDefaultPool()) LagWinNode(*tdbb->getDefaultPool()); + node->arg = copier.copy(tdbb, arg); + node->rows = copier.copy(tdbb, rows); + return node; +} + +AggNode* LagWinNode::dsqlCopy() const +{ + return FB_NEW(getPool()) LagWinNode(getPool(), + PASS1_node(dsqlScratch, dsqlArg), + PASS1_node(dsqlScratch, dsqlRows)); +} + + +//-------------------- + + +static WinFuncNode::Register leadWinInfo("LEAD"); + +LeadWinNode::LeadWinNode(MemoryPool& pool, dsql_nod* aArg, dsql_nod* aRows) + : LagLeadWinNode(pool, leadWinInfo, 1, aArg, aRows) +{ +} + +ExprNode* LeadWinNode::copy(thread_db* tdbb, NodeCopier& copier) const +{ + LeadWinNode* node = FB_NEW(*tdbb->getDefaultPool()) LeadWinNode(*tdbb->getDefaultPool()); + node->arg = copier.copy(tdbb, arg); + node->rows = copier.copy(tdbb, rows); + return node; +} + +AggNode* LeadWinNode::dsqlCopy() const +{ + return FB_NEW(getPool()) LeadWinNode(getPool(), + PASS1_node(dsqlScratch, dsqlArg), + PASS1_node(dsqlScratch, dsqlRows)); +} + + } // namespace Jrd diff --git a/src/dsql/WinNodes.h b/src/dsql/WinNodes.h index ce9e5553d5..e2d220e7a3 100644 --- a/src/dsql/WinNodes.h +++ b/src/dsql/WinNodes.h @@ -89,7 +89,58 @@ public: return true; } - virtual dsc* winPass(thread_db* tdbb, jrd_req* request) const; + virtual dsc* winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const; + +protected: + virtual AggNode* dsqlCopy() const; +}; + +// LAG/LEAD function. +class LagLeadWinNode : public WinFuncNode +{ +public: + explicit LagLeadWinNode(MemoryPool& pool, const AggInfo& aAggInfo, int aDirection, + dsql_nod* aArg = NULL, dsql_nod* aRows = NULL); + + virtual void make(dsc* desc, dsql_nod* nullReplacement); + virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc); + + virtual void aggInit(thread_db* tdbb, jrd_req* request) const; + virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const; + virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const; + + virtual bool shouldCallWinPass() const + { + return true; + } + + virtual dsc* winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const; + +protected: + const int direction; + dsql_nod* dsqlRows; + jrd_nod* rows; +}; + +// LAG function. +class LagWinNode : public LagLeadWinNode +{ +public: + explicit LagWinNode(MemoryPool& pool, dsql_nod* aArg = NULL, dsql_nod* aRows = NULL); + + virtual ExprNode* copy(thread_db* tdbb, NodeCopier& copier) const; + +protected: + virtual AggNode* dsqlCopy() const; +}; + +// LEAD function. +class LeadWinNode : public LagLeadWinNode +{ +public: + explicit LeadWinNode(MemoryPool& pool, dsql_nod* aArg = NULL, dsql_nod* aRows = NULL); + + virtual ExprNode* copy(thread_db* tdbb, NodeCopier& copier) const; protected: virtual AggNode* dsqlCopy() const; diff --git a/src/dsql/keywords.cpp b/src/dsql/keywords.cpp index 4a51228a24..04d95d638a 100644 --- a/src/dsql/keywords.cpp +++ b/src/dsql/keywords.cpp @@ -230,8 +230,10 @@ static const TOK tokens[] = {ISOLATION, "ISOLATION", 1, false}, {JOIN, "JOIN", 1, false}, {KEY, "KEY", 1, false}, + {LAG, "LAG", 2, false}, {LAST, "LAST", 2, true}, {LASTNAME, "LASTNAME", 2, false}, + {LEAD, "LEAD", 2, false}, {LEADING, "LEADING", 2, false}, {LEAVE, "LEAVE", 2, true}, {LEFT, "LEFT", 1, false}, diff --git a/src/dsql/parse.y b/src/dsql/parse.y index f66a0d364a..7c9d4f083f 100644 --- a/src/dsql/parse.y +++ b/src/dsql/parse.y @@ -576,6 +576,8 @@ inline void check_copy_incr(char*& to, const char ch, const char* const string) %token DETERMINISTIC %token IDENTITY %token DENSE_RANK +%token LAG +%token LEAD %token RANK %token ROW_NUMBER @@ -5328,6 +5330,14 @@ window_function { $$ = FB_NEW(getPool()) RankWinNode(getPool()); } | ROW_NUMBER '(' ')' { $$ = FB_NEW(getPool()) RowNumberWinNode(getPool()); } + | LAG '(' value ',' value ')' + { $$ = FB_NEW(getPool()) LagWinNode(getPool(), $3, $5); } + | LAG '(' value ')' + { $$ = FB_NEW(getPool()) LagWinNode(getPool(), $3, MAKE_const_slong(1)); } + | LEAD '(' value ',' value ')' + { $$ = FB_NEW(getPool()) LeadWinNode(getPool(), $3, $5); } + | LEAD '(' value ')' + { $$ = FB_NEW(getPool()) LeadWinNode(getPool(), $3, MAKE_const_slong(1)); } ; aggregate_window_function @@ -5959,6 +5969,8 @@ non_reserved_word : | RDB_SET_CONTEXT | KW_RELATIVE | DENSE_RANK + | LAG + | LEAD | RANK | ROW_NUMBER ; diff --git a/src/jrd/recsrc/AggregatedStream.cpp b/src/jrd/recsrc/AggregatedStream.cpp index 5e292b7cf3..338f2ef628 100644 --- a/src/jrd/recsrc/AggregatedStream.cpp +++ b/src/jrd/recsrc/AggregatedStream.cpp @@ -40,9 +40,9 @@ using namespace Jrd; // Note that we can have NULL order here, in case of window function with shouldCallWinPass // returning true, with partition, and without order. Example: ROW_NUMBER() OVER (PARTITION BY N). AggregatedStream::AggregatedStream(CompilerScratch* csb, UCHAR stream, jrd_nod* group, - jrd_nod* const map, RecordSource* next, jrd_nod* order) + jrd_nod* const map, BaseBufferedStream* next, jrd_nod* order) : RecordStream(csb, stream), - m_bufferedStream(FB_NEW(csb->csb_pool) BufferedStream(csb, next)), + m_bufferedStream(next), m_next(m_bufferedStream), m_group(group), m_map(map), @@ -109,10 +109,10 @@ bool AggregatedStream::getRecord(thread_db* tdbb) if (m_bufferedStream) { + FB_UINT64 position = m_bufferedStream->getPosition(request); + if (impure->pending == 0) { - FB_UINT64 position = m_bufferedStream->getPosition(request); - if (impure->state == STATE_PENDING) m_bufferedStream->getRecord(tdbb); @@ -129,29 +129,48 @@ bool AggregatedStream::getRecord(thread_db* tdbb) m_bufferedStream->locate(tdbb, position); } + if (m_winPassMap.hasData()) + { + SlidingWindow window(tdbb, m_bufferedStream, m_group, request); + dsc* desc; + + for (const jrd_nod* const* ptr = m_winPassMap.begin(); ptr != m_winPassMap.end(); ++ptr) + { + jrd_nod* from = (*ptr)->nod_arg[e_asgn_from]; + fb_assert(from->nod_type == nod_class_exprnode_jrd); + const AggNode* aggNode = reinterpret_cast(from->nod_arg[0]); + + jrd_nod* field = (*ptr)->nod_arg[e_asgn_to]; + const USHORT id = (USHORT)(IPTR) field->nod_arg[e_fld_id]; + Record* record = request->req_rpb[(int) (IPTR) field->nod_arg[e_fld_stream]].rpb_record; + + desc = aggNode->winPass(tdbb, request, &window); + + if (!desc) + SET_NULL(record, id); + else + { + MOV_move(tdbb, desc, EVL_assign_to(tdbb, field)); + CLEAR_NULL(record, id); + } + } + } + if (impure->pending > 0) --impure->pending; + m_bufferedStream->getRecord(tdbb); - dsc* desc; - - for (const jrd_nod* const* ptr = m_winPassMap.begin(); ptr != m_winPassMap.end(); ++ptr) + // If there is no group, we should reassign the map items. + if (!m_group) { - jrd_nod* from = (*ptr)->nod_arg[e_asgn_from]; - fb_assert(from->nod_type == nod_class_exprnode_jrd); - const AggNode* aggNode = reinterpret_cast(from->nod_arg[0]); - - jrd_nod* field = (*ptr)->nod_arg[e_asgn_to]; - const USHORT id = (USHORT)(IPTR) field->nod_arg[e_fld_id]; - Record* record = request->req_rpb[(int) (IPTR) field->nod_arg[e_fld_stream]].rpb_record; - - desc = aggNode->winPass(tdbb, request); - if (!desc) - SET_NULL(record, id); - else + for (jrd_nod** ptr = m_map->nod_arg, **end = ptr + m_map->nod_count; ptr < end; ptr++) { - MOV_move(tdbb, desc, EVL_assign_to(tdbb, field)); - CLEAR_NULL(record, id); + jrd_nod* from = (*ptr)->nod_arg[e_asgn_from]; + const AggNode* aggNode = ExprNode::as(from); + + if (!aggNode) + EXE_assignment(tdbb, *ptr); } } } @@ -299,7 +318,7 @@ AggregatedStream::State AggregatedStream::evaluateGroup(thread_db* tdbb, Aggrega for (ptr = m_group->nod_arg, end = ptr + m_group->nod_count; ptr < end; ptr++) { jrd_nod* from = *ptr; - impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + from->nod_impure); + impure_value* impure = (impure_value*) ((SCHAR*) request + from->nod_impure); desc = EVL_expr(tdbb, from); if (request->req_flags & req_null) impure->vlu_desc.dsc_address = NULL; @@ -313,7 +332,7 @@ AggregatedStream::State AggregatedStream::evaluateGroup(thread_db* tdbb, Aggrega for (ptr = m_order->nod_arg, end = ptr + m_order->nod_count; ptr < end; ptr++) { jrd_nod* from = *ptr; - impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + from->nod_impure); + impure_value* impure = (impure_value*) ((SCHAR*) request + from->nod_impure); desc = EVL_expr(tdbb, from); if (request->req_flags & req_null) impure->vlu_desc.dsc_address = NULL; @@ -342,7 +361,7 @@ AggregatedStream::State AggregatedStream::evaluateGroup(thread_db* tdbb, Aggrega for (ptr = m_group->nod_arg, end = ptr + m_group->nod_count; ptr < end; ptr++) { jrd_nod* from = *ptr; - impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + from->nod_impure); + impure_value* impure = (impure_value*) ((SCHAR*) request + from->nod_impure); if (impure->vlu_desc.dsc_address) EVL_make_value(tdbb, &impure->vlu_desc, &vtemp); @@ -379,7 +398,7 @@ AggregatedStream::State AggregatedStream::evaluateGroup(thread_db* tdbb, Aggrega for (ptr = m_order->nod_arg, end = ptr + m_order->nod_count; ptr < end; ptr++) { jrd_nod* from = *ptr; - impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + from->nod_impure); + impure_value* impure = (impure_value*) ((SCHAR*) request + from->nod_impure); if (impure->vlu_desc.dsc_address) EVL_make_value(tdbb, &impure->vlu_desc, &vtemp); @@ -482,3 +501,115 @@ void AggregatedStream::finiDistinct(thread_db* tdbb, jrd_req* request) aggNode->aggFinish(tdbb, request); } } + + +SlidingWindow::SlidingWindow(thread_db* aTdbb, BaseBufferedStream* aStream, jrd_nod* aGroup, + jrd_req* aRequest) + : tdbb(aTdbb), // Note: instanciate the class only as local variable + stream(aStream), + group(aGroup), + request(aRequest) +{ + savedPosition = stream->getPosition(request); +} + +SlidingWindow::~SlidingWindow() +{ + if (!moved) + return; + + for (impure_value* impure = partitionKeys.begin(); impure != partitionKeys.end(); ++impure) + delete impure->vlu_string; + + // Position the stream where we received it. + stream->locate(tdbb, savedPosition); +} + +// Move in the window without pass partition boundaries. +bool SlidingWindow::move(SINT64 delta) +{ + SINT64 newPosition = SINT64(savedPosition) + delta; + + // If we try to go out of bounds, no need to check the partition. + if (newPosition < 0 || newPosition >= (SINT64) stream->getCount(request)) + return false; + + if (!group) + { + // No partition, we may go everywhere. + + moved = true; + + stream->locate(tdbb, newPosition); + + if (!stream->getRecord(tdbb)) + { + fb_assert(false); + return false; + } + + return true; + } + + if (!moved) + { + // This is our first move. We should cache the partition values, so subsequente moves didn't + // need to evaluate them again. + + if (!stream->getRecord(tdbb)) + { + fb_assert(false); + return false; + } + + impure_value* impure = partitionKeys.getBuffer(group->nod_count); + memset(impure, 0, sizeof(impure_value) * group->nod_count); + + dsc* desc; + + for (jrd_nod** ptr = group->nod_arg, **end = ptr + group->nod_count; ptr < end; + ++ptr, ++impure) + { + jrd_nod* from = *ptr; + desc = EVL_expr(tdbb, from); + if (request->req_flags & req_null) + impure->vlu_desc.dsc_address = NULL; + else + EVL_make_value(tdbb, desc, impure); + } + + moved = true; + } + + stream->locate(tdbb, newPosition); + + if (!stream->getRecord(tdbb)) + { + fb_assert(false); + return false; + } + + // Verify if we're still inside the same partition. + + impure_value* impure = partitionKeys.begin(); + dsc* desc; + + for (jrd_nod** ptr = group->nod_arg, **end = ptr + group->nod_count; ptr < end; ++ptr, ++impure) + { + jrd_nod* from = *ptr; + desc = EVL_expr(tdbb, from); + + if (request->req_flags & req_null) + { + if (impure->vlu_desc.dsc_address) + return false; + } + else + { + if (!impure->vlu_desc.dsc_address || MOV_compare(&impure->vlu_desc, desc) != 0) + return false; + } + } + + return true; +} diff --git a/src/jrd/recsrc/RecordSource.h b/src/jrd/recsrc/RecordSource.h index 3a24ff644a..1478393afa 100644 --- a/src/jrd/recsrc/RecordSource.h +++ b/src/jrd/recsrc/RecordSource.h @@ -50,6 +50,7 @@ namespace Jrd struct sort_context; struct temporary_key; struct win; + class BaseBufferedStream; class BufferedStream; typedef Firebird::HalfStaticArray StreamsArray; @@ -559,6 +560,25 @@ namespace Jrd SortMap* const m_map; }; + // Make moves in a window without going out of partition boundaries. + class SlidingWindow + { + public: + SlidingWindow(thread_db* aTdbb, BaseBufferedStream* aStream, jrd_nod* aGroup, jrd_req* aRequest); + ~SlidingWindow(); + + bool move(SINT64 delta); + + private: + thread_db* tdbb; + BaseBufferedStream* stream; + jrd_nod* group; + jrd_req* request; + Firebird::Array partitionKeys; + bool moved; + FB_UINT64 savedPosition; + }; + class AggregatedStream : public RecordStream { enum State @@ -577,7 +597,7 @@ namespace Jrd public: AggregatedStream(CompilerScratch* csb, UCHAR stream, jrd_nod* const group, - jrd_nod* const map, RecordSource* next, jrd_nod* order); + jrd_nod* const map, BaseBufferedStream* next, jrd_nod* order); AggregatedStream(CompilerScratch* csb, UCHAR stream, jrd_nod* const group, jrd_nod* const map, RecordSource* next); @@ -602,7 +622,7 @@ namespace Jrd State evaluateGroup(thread_db* tdbb, State state); void finiDistinct(thread_db* tdbb, jrd_req* request); - BufferedStream* m_bufferedStream; + BaseBufferedStream* m_bufferedStream; RecordSource* const m_next; jrd_nod* const m_group; jrd_nod* const m_map; @@ -633,13 +653,20 @@ namespace Jrd void restoreRecords(thread_db* tdbb); private: - jrd_nod* m_mainMap; BufferedStream* m_next; RecordSource* m_joinedStream; - Firebird::Array m_winPassMap; }; - class BufferedStream : public RecordSource + // Abstract class for different implementations of buffered streams. + class BaseBufferedStream : public RecordSource + { + public: + virtual void locate(thread_db* tdbb, FB_UINT64 position) = 0; + virtual FB_UINT64 getCount(jrd_req* request) const = 0; + virtual FB_UINT64 getPosition(jrd_req* request) const = 0; + }; + + class BufferedStream : public BaseBufferedStream { struct FieldMap { diff --git a/src/jrd/recsrc/WindowedStream.cpp b/src/jrd/recsrc/WindowedStream.cpp index 5db0d022a9..da03b92232 100644 --- a/src/jrd/recsrc/WindowedStream.cpp +++ b/src/jrd/recsrc/WindowedStream.cpp @@ -41,7 +41,7 @@ namespace { // This stream makes possible to reuse a BufferedStream, so each usage maintains a different // cursor position. - class BufferedStreamWindow : public RecordSource + class BufferedStreamWindow : public BaseBufferedStream { struct Impure : public RecordSource::Impure { @@ -68,6 +68,24 @@ namespace void saveRecords(thread_db* tdbb); void restoreRecords(thread_db* tdbb); + void locate(thread_db* tdbb, FB_UINT64 position) + { + jrd_req* const request = tdbb->getRequest(); + Impure* const impure = (Impure*) ((UCHAR*) request + m_impure); + impure->irsb_position = position; + } + + FB_UINT64 getCount(jrd_req* request) const + { + return m_next->getCount(request); + } + + FB_UINT64 getPosition(jrd_req* request) const + { + Impure* const impure = (Impure*) ((UCHAR*) request + m_impure); + return impure->irsb_position; + } + public: BufferedStream* m_next; }; @@ -153,8 +171,12 @@ namespace if (!(impure->irsb_flags & irsb_open)) return false; - m_next->locate(tdbb, impure->irsb_position++); - return m_next->getRecord(tdbb); + m_next->locate(tdbb, impure->irsb_position); + if (!m_next->getRecord(tdbb)) + return false; + + ++impure->irsb_position; + return true; } bool BufferedStreamWindow::refetchRecord(thread_db* tdbb) @@ -399,88 +421,13 @@ namespace // ------------------------------ WindowedStream::WindowedStream(CompilerScratch* csb, const jrd_nod* nodWindows, RecordSource* next) - : m_mainMap(NULL), - m_winPassMap(csb->csb_pool) + : m_joinedStream(NULL) { thread_db* tdbb = JRD_get_thread_data(); m_next = FB_NEW(csb->csb_pool) BufferedStream(csb, next); - m_joinedStream = FB_NEW(csb->csb_pool) BufferedStreamWindow(csb, m_next); - AggregatedStream* mainWindow = NULL; - StreamsArray streams; - m_next->findUsedStreams(streams); - streams.insert(0, streams.getCount()); - - Array noAggregatedStreams; - noAggregatedStreams.resize(nodWindows->nod_count); - - // Process unordered partitions. - - for (unsigned i = 0; i < nodWindows->nod_count; ++i) - { - jrd_nod* const nodWindow = nodWindows->nod_arg[i]; - jrd_nod* const partition = nodWindow->nod_arg[e_part_group]; - jrd_nod* const partitionMap = nodWindow->nod_arg[e_part_map]; - jrd_nod* const repartition = nodWindow->nod_arg[e_part_regroup]; - jrd_nod* const order = nodWindow->nod_arg[e_part_order]; - const USHORT stream = (USHORT)(IPTR) nodWindow->nod_arg[e_part_stream]; - - noAggregatedStreams[i] = false; - - if (order) - noAggregatedStreams[i] = true; - else if (partition) - { - // In the case of an item needs winPass call, we should process the partition as an - // ordered one. - - jrd_nod* const* ptr = partitionMap->nod_arg; - for (const jrd_nod* const* end = ptr + partitionMap->nod_count; ptr < end; ptr++) - { - jrd_nod* const from = (*ptr)->nod_arg[e_asgn_from]; - const AggNode* aggNode = ExprNode::as(from); - - if (aggNode && aggNode->shouldCallWinPass()) - { - noAggregatedStreams[i] = true; - break; - } - } - } - - if (noAggregatedStreams[i]) - continue; - - if (!partition) - { - // This is the main window. It has special processing. - - fb_assert(!m_mainMap); - m_mainMap = partitionMap; - - fb_assert(!mainWindow); - mainWindow = FB_NEW(csb->csb_pool) AggregatedStream( - csb, stream, NULL, m_mainMap, - FB_NEW(csb->csb_pool) BufferedStreamWindow(csb, m_next)); - - OPT_gen_aggregate_distincts(tdbb, csb, m_mainMap); - - continue; - } - - SortedStream* const sortedStream = OPT_gen_sort(tdbb, csb, streams.begin(), NULL, - FB_NEW(csb->csb_pool) BufferedStreamWindow(csb, m_next), partition, false); - AggregatedStream* const aggStream = FB_NEW(csb->csb_pool) AggregatedStream( - csb, stream, partition, partitionMap, sortedStream); - - OPT_gen_aggregate_distincts(tdbb, csb, partitionMap); - - m_joinedStream = FB_NEW(csb->csb_pool) WindowJoin(csb, m_joinedStream, aggStream, - partition, repartition); - } - - // Process ordered partitions. + // Process the unpartioned and unordered map, if existent. for (unsigned i = 0; i < nodWindows->nod_count; ++i) { @@ -490,10 +437,7 @@ WindowedStream::WindowedStream(CompilerScratch* csb, const jrd_nod* nodWindows, jrd_nod* const order = nodWindow->nod_arg[e_part_order]; const USHORT stream = (USHORT)(IPTR) nodWindow->nod_arg[e_part_stream]; - if (!noAggregatedStreams[i]) - continue; - - // Verify not supported functions/clauses. + // While here, verify not supported functions/clauses. const jrd_nod* const* ptr = partitionMap->nod_arg; for (const jrd_nod* const* const end = ptr + partitionMap->nod_count; ptr < end; ++ptr) @@ -501,10 +445,36 @@ WindowedStream::WindowedStream(CompilerScratch* csb, const jrd_nod* nodWindows, jrd_nod* from = (*ptr)->nod_arg[e_asgn_from]; const AggNode* aggNode = ExprNode::as(from); - if (aggNode) + if (order && aggNode) aggNode->checkOrderedWindowCapable(); } + if (!partition && !order) + { + fb_assert(!m_joinedStream); + + m_joinedStream = FB_NEW(csb->csb_pool) AggregatedStream(csb, stream, NULL, + partitionMap, FB_NEW(csb->csb_pool) BufferedStreamWindow(csb, m_next), NULL); + + OPT_gen_aggregate_distincts(tdbb, csb, partitionMap); + } + } + + if (!m_joinedStream) + m_joinedStream = FB_NEW(csb->csb_pool) BufferedStreamWindow(csb, m_next); + + // Process ordered partitions. + + StreamsArray streams; + + for (unsigned i = 0; i < nodWindows->nod_count; ++i) + { + jrd_nod* const nodWindow = nodWindows->nod_arg[i]; + jrd_nod* const partition = nodWindow->nod_arg[e_part_group]; + jrd_nod* const partitionMap = nodWindow->nod_arg[e_part_map]; + jrd_nod* const order = nodWindow->nod_arg[e_part_order]; + const USHORT stream = (USHORT)(IPTR) nodWindow->nod_arg[e_part_stream]; + // Refresh the stream list based on the last m_joinedStream. streams.clear(); m_joinedStream->findUsedStreams(streams); @@ -547,27 +517,15 @@ WindowedStream::WindowedStream(CompilerScratch* csb, const jrd_nod* nodWindows, else partitionOrder = order; - SortedStream* sortedStream = OPT_gen_sort(tdbb, csb, streams.begin(), NULL, - m_joinedStream, partitionOrder, false); - - m_joinedStream = FB_NEW(csb->csb_pool) AggregatedStream(csb, stream, partition, - partitionMap, sortedStream, order); - } - - if (mainWindow) - { - // Make a cross join with the main window. - RecordSource* const rsbs[] = {mainWindow, m_joinedStream}; - m_joinedStream = FB_NEW(csb->csb_pool) NestedLoopJoin(csb, 2, rsbs); - - // Separate nodes that requires the winPass call. - for (jrd_nod** ptr = m_mainMap->nod_arg, **end = ptr + m_mainMap->nod_count; ptr < end; ptr++) + if (partitionOrder) { - jrd_nod* from = (*ptr)->nod_arg[e_asgn_from]; - const AggNode* aggNode = ExprNode::as(from); + SortedStream* sortedStream = OPT_gen_sort(tdbb, csb, streams.begin(), NULL, + m_joinedStream, partitionOrder, false); - if (aggNode && aggNode->shouldCallWinPass()) - m_winPassMap.add(*ptr); + m_joinedStream = FB_NEW(csb->csb_pool) AggregatedStream(csb, stream, partition, + partitionMap, FB_NEW(csb->csb_pool) BufferedStream(csb, sortedStream), order); + + OPT_gen_aggregate_distincts(tdbb, csb, partitionMap); } } } @@ -610,42 +568,6 @@ bool WindowedStream::getRecord(thread_db* tdbb) if (!m_joinedStream->getRecord(tdbb)) return false; - // Map the inner stream non-aggregate fields to the main partition. - if (m_mainMap) - { - dsc* desc; - - jrd_nod* const* ptr = m_mainMap->nod_arg; - for (const jrd_nod* const* end = ptr + m_mainMap->nod_count; ptr < end; ++ptr) - { - jrd_nod* const from = (*ptr)->nod_arg[e_asgn_from]; - const AggNode* aggNode = ExprNode::as(from); - - if (!aggNode) - EXE_assignment(tdbb, *ptr); - } - - for (ptr = m_winPassMap.begin(); ptr != m_winPassMap.end(); ++ptr) - { - jrd_nod* from = (*ptr)->nod_arg[e_asgn_from]; - fb_assert(from->nod_type == nod_class_exprnode_jrd); - const AggNode* aggNode = reinterpret_cast(from->nod_arg[0]); - - jrd_nod* field = (*ptr)->nod_arg[e_asgn_to]; - const USHORT id = (USHORT)(IPTR) field->nod_arg[e_fld_id]; - Record* record = request->req_rpb[(int) (IPTR) field->nod_arg[e_fld_stream]].rpb_record; - - desc = aggNode->winPass(tdbb, request); - if (!desc) - SET_NULL(record, id); - else - { - MOV_move(tdbb, desc, EVL_assign_to(tdbb, field)); - CLEAR_NULL(record, id); - } - } - } - return true; }