8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 21:23:03 +01:00

Feature CORE-2869 - Window functions: LAG and LEAD.

This commit is contained in:
asfernandes 2010-02-16 00:26:53 +00:00
parent 221de5850a
commit 0ce6fb958c
8 changed files with 475 additions and 181 deletions

View File

@ -36,6 +36,7 @@ class CompilerScratch;
class dsql_nod;
class ExprNode;
class jrd_nod;
class SlidingWindow;
class TypeClause;
@ -410,7 +411,7 @@ public:
return false;
}
virtual dsc* winPass(thread_db* tdbb, jrd_req* request) const
virtual dsc* winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const
{
return NULL;
}

View File

@ -23,10 +23,13 @@
#include "firebird.h"
#include "../jrd/common.h"
#include "../dsql/WinNodes.h"
#include "../dsql/make_proto.h"
#include "../dsql/pass1_proto.h"
#include "../jrd/cmp_proto.h"
#include "../jrd/evl_proto.h"
#include "../jrd/mov_proto.h"
#include "../jrd/par_proto.h"
#include "../jrd/recsrc/RecordSource.h"
using namespace Firebird;
using namespace Jrd;
@ -50,20 +53,35 @@ DmlNode* WinFuncNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch*
MetaName name;
PAR_name(csb, name);
UCHAR count = csb->csb_blr_reader.getByte();
fb_assert(count == 0); // Arguments not yet supported here.
if (count != 0)
PAR_error(csb, Arg::Gds(isc_funmismat) << name);
WinFuncNode* node = NULL;
for (const Factory* factory = factories; factory; factory = factory->next)
{
if (name == factory->name)
return factory->newInstance(pool);
{
node = factory->newInstance(pool);
break;
}
}
PAR_error(csb, Arg::Gds(isc_funmismat) << name);
return NULL; // silence
if (!node)
PAR_error(csb, Arg::Gds(isc_funnotdef) << name);
UCHAR count = csb->csb_blr_reader.getByte();
if (count != node->jrdChildNodes.getCount())
PAR_error(csb, Arg::Gds(isc_funmismat) << name);
if (count != 0)
{
jrd_nod*** arg = node->jrdChildNodes.begin();
do
{
**arg++ = PAR_parse_node(tdbb, csb, VALUE);
} while (--count);
}
return node;
}
@ -75,6 +93,9 @@ static WinFuncNode::Register<DenseRankWinNode> denseRankWinInfo("DENSE_RANK");
DenseRankWinNode::DenseRankWinNode(MemoryPool& pool)
: WinFuncNode(pool, denseRankWinInfo)
{
fb_assert(dsqlChildNodes.getCount() == 1 && jrdChildNodes.getCount() == 1);
dsqlChildNodes.clear();
jrdChildNodes.clear();
}
void DenseRankWinNode::make(dsc* desc, dsql_nod* nullReplacement)
@ -127,6 +148,9 @@ RankWinNode::RankWinNode(MemoryPool& pool)
: WinFuncNode(pool, rankWinInfo),
tempImpure(0)
{
fb_assert(dsqlChildNodes.getCount() == 1 && jrdChildNodes.getCount() == 1);
dsqlChildNodes.clear();
jrdChildNodes.clear();
}
void RankWinNode::make(dsc* desc, dsql_nod* nullReplacement)
@ -196,6 +220,9 @@ static WinFuncNode::Register<RowNumberWinNode> rowNumberWinInfo("ROW_NUMBER");
RowNumberWinNode::RowNumberWinNode(MemoryPool& pool)
: WinFuncNode(pool, rowNumberWinInfo)
{
fb_assert(dsqlChildNodes.getCount() == 1 && jrdChildNodes.getCount() == 1);
dsqlChildNodes.clear();
jrdChildNodes.clear();
}
void RowNumberWinNode::make(dsc* desc, dsql_nod* nullReplacement)
@ -232,7 +259,7 @@ dsc* RowNumberWinNode::aggExecute(thread_db* tdbb, jrd_req* request) const
return &impure->vlu_desc;
}
dsc* RowNumberWinNode::winPass(thread_db* tdbb, jrd_req* request) const
dsc* RowNumberWinNode::winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const
{
impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + node->nod_impure);
++impure->vlu_misc.vlu_int64;
@ -245,4 +272,125 @@ AggNode* RowNumberWinNode::dsqlCopy() const
}
//--------------------
// A direction of -1 is LAG, and 1 is LEAD.
LagLeadWinNode::LagLeadWinNode(MemoryPool& pool, const AggInfo& aAggInfo, int aDirection,
dsql_nod* aArg, dsql_nod* aRows)
: WinFuncNode(pool, aAggInfo, aArg),
direction(aDirection),
dsqlRows(aRows),
rows(NULL)
{
fb_assert(direction == -1 || direction == 1);
dsqlChildNodes.add(&dsqlRows);
jrdChildNodes.add(&rows);
}
void LagLeadWinNode::make(dsc* desc, dsql_nod* nullReplacement)
{
MAKE_desc(dsqlScratch, desc, dsqlArg, nullReplacement);
desc->setNullable(true);
}
void LagLeadWinNode::getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc)
{
CMP_get_desc(tdbb, csb, arg, desc);
}
void LagLeadWinNode::aggInit(thread_db* tdbb, jrd_req* request) const
{
AggNode::aggInit(tdbb, request);
impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + node->nod_impure);
impure->make_int64(0, 0);
}
void LagLeadWinNode::aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const
{
}
dsc* LagLeadWinNode::aggExecute(thread_db* tdbb, jrd_req* request) const
{
return NULL;
}
dsc* LagLeadWinNode::winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const
{
impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + node->nod_impure);
dsc* desc = EVL_expr(tdbb, rows);
SINT64 records;
if (!desc || (request->req_flags & req_null) || (records = MOV_get_int64(desc, 0)) < 0)
{
status_exception::raise(Arg::Gds(isc_sysf_argnmustbe_nonneg) <<
Arg::Num(2) << Arg::Str(aggInfo.name));
}
if (!window->move(records * direction))
return NULL;
desc = EVL_expr(tdbb, arg);
if (!desc || (request->req_flags & req_null))
return NULL;
return desc;
}
//--------------------
static WinFuncNode::Register<LagWinNode> lagWinInfo("LAG");
LagWinNode::LagWinNode(MemoryPool& pool, dsql_nod* aArg, dsql_nod* aRows)
: LagLeadWinNode(pool, lagWinInfo, -1, aArg, aRows)
{
}
ExprNode* LagWinNode::copy(thread_db* tdbb, NodeCopier& copier) const
{
LagWinNode* node = FB_NEW(*tdbb->getDefaultPool()) LagWinNode(*tdbb->getDefaultPool());
node->arg = copier.copy(tdbb, arg);
node->rows = copier.copy(tdbb, rows);
return node;
}
AggNode* LagWinNode::dsqlCopy() const
{
return FB_NEW(getPool()) LagWinNode(getPool(),
PASS1_node(dsqlScratch, dsqlArg),
PASS1_node(dsqlScratch, dsqlRows));
}
//--------------------
static WinFuncNode::Register<LeadWinNode> leadWinInfo("LEAD");
LeadWinNode::LeadWinNode(MemoryPool& pool, dsql_nod* aArg, dsql_nod* aRows)
: LagLeadWinNode(pool, leadWinInfo, 1, aArg, aRows)
{
}
ExprNode* LeadWinNode::copy(thread_db* tdbb, NodeCopier& copier) const
{
LeadWinNode* node = FB_NEW(*tdbb->getDefaultPool()) LeadWinNode(*tdbb->getDefaultPool());
node->arg = copier.copy(tdbb, arg);
node->rows = copier.copy(tdbb, rows);
return node;
}
AggNode* LeadWinNode::dsqlCopy() const
{
return FB_NEW(getPool()) LeadWinNode(getPool(),
PASS1_node(dsqlScratch, dsqlArg),
PASS1_node(dsqlScratch, dsqlRows));
}
} // namespace Jrd

View File

@ -89,7 +89,58 @@ public:
return true;
}
virtual dsc* winPass(thread_db* tdbb, jrd_req* request) const;
virtual dsc* winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const;
protected:
virtual AggNode* dsqlCopy() const;
};
// LAG/LEAD function.
class LagLeadWinNode : public WinFuncNode
{
public:
explicit LagLeadWinNode(MemoryPool& pool, const AggInfo& aAggInfo, int aDirection,
dsql_nod* aArg = NULL, dsql_nod* aRows = NULL);
virtual void make(dsc* desc, dsql_nod* nullReplacement);
virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc);
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
virtual bool shouldCallWinPass() const
{
return true;
}
virtual dsc* winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const;
protected:
const int direction;
dsql_nod* dsqlRows;
jrd_nod* rows;
};
// LAG function.
class LagWinNode : public LagLeadWinNode
{
public:
explicit LagWinNode(MemoryPool& pool, dsql_nod* aArg = NULL, dsql_nod* aRows = NULL);
virtual ExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
protected:
virtual AggNode* dsqlCopy() const;
};
// LEAD function.
class LeadWinNode : public LagLeadWinNode
{
public:
explicit LeadWinNode(MemoryPool& pool, dsql_nod* aArg = NULL, dsql_nod* aRows = NULL);
virtual ExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
protected:
virtual AggNode* dsqlCopy() const;

View File

@ -230,8 +230,10 @@ static const TOK tokens[] =
{ISOLATION, "ISOLATION", 1, false},
{JOIN, "JOIN", 1, false},
{KEY, "KEY", 1, false},
{LAG, "LAG", 2, false},
{LAST, "LAST", 2, true},
{LASTNAME, "LASTNAME", 2, false},
{LEAD, "LEAD", 2, false},
{LEADING, "LEADING", 2, false},
{LEAVE, "LEAVE", 2, true},
{LEFT, "LEFT", 1, false},

View File

@ -576,6 +576,8 @@ inline void check_copy_incr(char*& to, const char ch, const char* const string)
%token <legacyNode> DETERMINISTIC
%token <legacyNode> IDENTITY
%token <legacyNode> DENSE_RANK
%token <legacyNode> LAG
%token <legacyNode> LEAD
%token <legacyNode> RANK
%token <legacyNode> ROW_NUMBER
@ -5328,6 +5330,14 @@ window_function
{ $$ = FB_NEW(getPool()) RankWinNode(getPool()); }
| ROW_NUMBER '(' ')'
{ $$ = FB_NEW(getPool()) RowNumberWinNode(getPool()); }
| LAG '(' value ',' value ')'
{ $$ = FB_NEW(getPool()) LagWinNode(getPool(), $3, $5); }
| LAG '(' value ')'
{ $$ = FB_NEW(getPool()) LagWinNode(getPool(), $3, MAKE_const_slong(1)); }
| LEAD '(' value ',' value ')'
{ $$ = FB_NEW(getPool()) LeadWinNode(getPool(), $3, $5); }
| LEAD '(' value ')'
{ $$ = FB_NEW(getPool()) LeadWinNode(getPool(), $3, MAKE_const_slong(1)); }
;
aggregate_window_function
@ -5959,6 +5969,8 @@ non_reserved_word :
| RDB_SET_CONTEXT
| KW_RELATIVE
| DENSE_RANK
| LAG
| LEAD
| RANK
| ROW_NUMBER
;

View File

@ -40,9 +40,9 @@ using namespace Jrd;
// Note that we can have NULL order here, in case of window function with shouldCallWinPass
// returning true, with partition, and without order. Example: ROW_NUMBER() OVER (PARTITION BY N).
AggregatedStream::AggregatedStream(CompilerScratch* csb, UCHAR stream, jrd_nod* group,
jrd_nod* const map, RecordSource* next, jrd_nod* order)
jrd_nod* const map, BaseBufferedStream* next, jrd_nod* order)
: RecordStream(csb, stream),
m_bufferedStream(FB_NEW(csb->csb_pool) BufferedStream(csb, next)),
m_bufferedStream(next),
m_next(m_bufferedStream),
m_group(group),
m_map(map),
@ -109,10 +109,10 @@ bool AggregatedStream::getRecord(thread_db* tdbb)
if (m_bufferedStream)
{
FB_UINT64 position = m_bufferedStream->getPosition(request);
if (impure->pending == 0)
{
FB_UINT64 position = m_bufferedStream->getPosition(request);
if (impure->state == STATE_PENDING)
m_bufferedStream->getRecord(tdbb);
@ -129,29 +129,48 @@ bool AggregatedStream::getRecord(thread_db* tdbb)
m_bufferedStream->locate(tdbb, position);
}
if (m_winPassMap.hasData())
{
SlidingWindow window(tdbb, m_bufferedStream, m_group, request);
dsc* desc;
for (const jrd_nod* const* ptr = m_winPassMap.begin(); ptr != m_winPassMap.end(); ++ptr)
{
jrd_nod* from = (*ptr)->nod_arg[e_asgn_from];
fb_assert(from->nod_type == nod_class_exprnode_jrd);
const AggNode* aggNode = reinterpret_cast<const AggNode*>(from->nod_arg[0]);
jrd_nod* field = (*ptr)->nod_arg[e_asgn_to];
const USHORT id = (USHORT)(IPTR) field->nod_arg[e_fld_id];
Record* record = request->req_rpb[(int) (IPTR) field->nod_arg[e_fld_stream]].rpb_record;
desc = aggNode->winPass(tdbb, request, &window);
if (!desc)
SET_NULL(record, id);
else
{
MOV_move(tdbb, desc, EVL_assign_to(tdbb, field));
CLEAR_NULL(record, id);
}
}
}
if (impure->pending > 0)
--impure->pending;
m_bufferedStream->getRecord(tdbb);
dsc* desc;
for (const jrd_nod* const* ptr = m_winPassMap.begin(); ptr != m_winPassMap.end(); ++ptr)
// If there is no group, we should reassign the map items.
if (!m_group)
{
jrd_nod* from = (*ptr)->nod_arg[e_asgn_from];
fb_assert(from->nod_type == nod_class_exprnode_jrd);
const AggNode* aggNode = reinterpret_cast<const AggNode*>(from->nod_arg[0]);
jrd_nod* field = (*ptr)->nod_arg[e_asgn_to];
const USHORT id = (USHORT)(IPTR) field->nod_arg[e_fld_id];
Record* record = request->req_rpb[(int) (IPTR) field->nod_arg[e_fld_stream]].rpb_record;
desc = aggNode->winPass(tdbb, request);
if (!desc)
SET_NULL(record, id);
else
for (jrd_nod** ptr = m_map->nod_arg, **end = ptr + m_map->nod_count; ptr < end; ptr++)
{
MOV_move(tdbb, desc, EVL_assign_to(tdbb, field));
CLEAR_NULL(record, id);
jrd_nod* from = (*ptr)->nod_arg[e_asgn_from];
const AggNode* aggNode = ExprNode::as<AggNode>(from);
if (!aggNode)
EXE_assignment(tdbb, *ptr);
}
}
}
@ -299,7 +318,7 @@ AggregatedStream::State AggregatedStream::evaluateGroup(thread_db* tdbb, Aggrega
for (ptr = m_group->nod_arg, end = ptr + m_group->nod_count; ptr < end; ptr++)
{
jrd_nod* from = *ptr;
impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + from->nod_impure);
impure_value* impure = (impure_value*) ((SCHAR*) request + from->nod_impure);
desc = EVL_expr(tdbb, from);
if (request->req_flags & req_null)
impure->vlu_desc.dsc_address = NULL;
@ -313,7 +332,7 @@ AggregatedStream::State AggregatedStream::evaluateGroup(thread_db* tdbb, Aggrega
for (ptr = m_order->nod_arg, end = ptr + m_order->nod_count; ptr < end; ptr++)
{
jrd_nod* from = *ptr;
impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + from->nod_impure);
impure_value* impure = (impure_value*) ((SCHAR*) request + from->nod_impure);
desc = EVL_expr(tdbb, from);
if (request->req_flags & req_null)
impure->vlu_desc.dsc_address = NULL;
@ -342,7 +361,7 @@ AggregatedStream::State AggregatedStream::evaluateGroup(thread_db* tdbb, Aggrega
for (ptr = m_group->nod_arg, end = ptr + m_group->nod_count; ptr < end; ptr++)
{
jrd_nod* from = *ptr;
impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + from->nod_impure);
impure_value* impure = (impure_value*) ((SCHAR*) request + from->nod_impure);
if (impure->vlu_desc.dsc_address)
EVL_make_value(tdbb, &impure->vlu_desc, &vtemp);
@ -379,7 +398,7 @@ AggregatedStream::State AggregatedStream::evaluateGroup(thread_db* tdbb, Aggrega
for (ptr = m_order->nod_arg, end = ptr + m_order->nod_count; ptr < end; ptr++)
{
jrd_nod* from = *ptr;
impure_value_ex* impure = (impure_value_ex*) ((SCHAR*) request + from->nod_impure);
impure_value* impure = (impure_value*) ((SCHAR*) request + from->nod_impure);
if (impure->vlu_desc.dsc_address)
EVL_make_value(tdbb, &impure->vlu_desc, &vtemp);
@ -482,3 +501,115 @@ void AggregatedStream::finiDistinct(thread_db* tdbb, jrd_req* request)
aggNode->aggFinish(tdbb, request);
}
}
SlidingWindow::SlidingWindow(thread_db* aTdbb, BaseBufferedStream* aStream, jrd_nod* aGroup,
jrd_req* aRequest)
: tdbb(aTdbb), // Note: instanciate the class only as local variable
stream(aStream),
group(aGroup),
request(aRequest)
{
savedPosition = stream->getPosition(request);
}
SlidingWindow::~SlidingWindow()
{
if (!moved)
return;
for (impure_value* impure = partitionKeys.begin(); impure != partitionKeys.end(); ++impure)
delete impure->vlu_string;
// Position the stream where we received it.
stream->locate(tdbb, savedPosition);
}
// Move in the window without pass partition boundaries.
bool SlidingWindow::move(SINT64 delta)
{
SINT64 newPosition = SINT64(savedPosition) + delta;
// If we try to go out of bounds, no need to check the partition.
if (newPosition < 0 || newPosition >= (SINT64) stream->getCount(request))
return false;
if (!group)
{
// No partition, we may go everywhere.
moved = true;
stream->locate(tdbb, newPosition);
if (!stream->getRecord(tdbb))
{
fb_assert(false);
return false;
}
return true;
}
if (!moved)
{
// This is our first move. We should cache the partition values, so subsequente moves didn't
// need to evaluate them again.
if (!stream->getRecord(tdbb))
{
fb_assert(false);
return false;
}
impure_value* impure = partitionKeys.getBuffer(group->nod_count);
memset(impure, 0, sizeof(impure_value) * group->nod_count);
dsc* desc;
for (jrd_nod** ptr = group->nod_arg, **end = ptr + group->nod_count; ptr < end;
++ptr, ++impure)
{
jrd_nod* from = *ptr;
desc = EVL_expr(tdbb, from);
if (request->req_flags & req_null)
impure->vlu_desc.dsc_address = NULL;
else
EVL_make_value(tdbb, desc, impure);
}
moved = true;
}
stream->locate(tdbb, newPosition);
if (!stream->getRecord(tdbb))
{
fb_assert(false);
return false;
}
// Verify if we're still inside the same partition.
impure_value* impure = partitionKeys.begin();
dsc* desc;
for (jrd_nod** ptr = group->nod_arg, **end = ptr + group->nod_count; ptr < end; ++ptr, ++impure)
{
jrd_nod* from = *ptr;
desc = EVL_expr(tdbb, from);
if (request->req_flags & req_null)
{
if (impure->vlu_desc.dsc_address)
return false;
}
else
{
if (!impure->vlu_desc.dsc_address || MOV_compare(&impure->vlu_desc, desc) != 0)
return false;
}
}
return true;
}

View File

@ -50,6 +50,7 @@ namespace Jrd
struct sort_context;
struct temporary_key;
struct win;
class BaseBufferedStream;
class BufferedStream;
typedef Firebird::HalfStaticArray<UCHAR, OPT_STATIC_ITEMS> StreamsArray;
@ -559,6 +560,25 @@ namespace Jrd
SortMap* const m_map;
};
// Make moves in a window without going out of partition boundaries.
class SlidingWindow
{
public:
SlidingWindow(thread_db* aTdbb, BaseBufferedStream* aStream, jrd_nod* aGroup, jrd_req* aRequest);
~SlidingWindow();
bool move(SINT64 delta);
private:
thread_db* tdbb;
BaseBufferedStream* stream;
jrd_nod* group;
jrd_req* request;
Firebird::Array<impure_value> partitionKeys;
bool moved;
FB_UINT64 savedPosition;
};
class AggregatedStream : public RecordStream
{
enum State
@ -577,7 +597,7 @@ namespace Jrd
public:
AggregatedStream(CompilerScratch* csb, UCHAR stream, jrd_nod* const group,
jrd_nod* const map, RecordSource* next, jrd_nod* order);
jrd_nod* const map, BaseBufferedStream* next, jrd_nod* order);
AggregatedStream(CompilerScratch* csb, UCHAR stream, jrd_nod* const group,
jrd_nod* const map, RecordSource* next);
@ -602,7 +622,7 @@ namespace Jrd
State evaluateGroup(thread_db* tdbb, State state);
void finiDistinct(thread_db* tdbb, jrd_req* request);
BufferedStream* m_bufferedStream;
BaseBufferedStream* m_bufferedStream;
RecordSource* const m_next;
jrd_nod* const m_group;
jrd_nod* const m_map;
@ -633,13 +653,20 @@ namespace Jrd
void restoreRecords(thread_db* tdbb);
private:
jrd_nod* m_mainMap;
BufferedStream* m_next;
RecordSource* m_joinedStream;
Firebird::Array<jrd_nod*> m_winPassMap;
};
class BufferedStream : public RecordSource
// Abstract class for different implementations of buffered streams.
class BaseBufferedStream : public RecordSource
{
public:
virtual void locate(thread_db* tdbb, FB_UINT64 position) = 0;
virtual FB_UINT64 getCount(jrd_req* request) const = 0;
virtual FB_UINT64 getPosition(jrd_req* request) const = 0;
};
class BufferedStream : public BaseBufferedStream
{
struct FieldMap
{

View File

@ -41,7 +41,7 @@ namespace
{
// This stream makes possible to reuse a BufferedStream, so each usage maintains a different
// cursor position.
class BufferedStreamWindow : public RecordSource
class BufferedStreamWindow : public BaseBufferedStream
{
struct Impure : public RecordSource::Impure
{
@ -68,6 +68,24 @@ namespace
void saveRecords(thread_db* tdbb);
void restoreRecords(thread_db* tdbb);
void locate(thread_db* tdbb, FB_UINT64 position)
{
jrd_req* const request = tdbb->getRequest();
Impure* const impure = (Impure*) ((UCHAR*) request + m_impure);
impure->irsb_position = position;
}
FB_UINT64 getCount(jrd_req* request) const
{
return m_next->getCount(request);
}
FB_UINT64 getPosition(jrd_req* request) const
{
Impure* const impure = (Impure*) ((UCHAR*) request + m_impure);
return impure->irsb_position;
}
public:
BufferedStream* m_next;
};
@ -153,8 +171,12 @@ namespace
if (!(impure->irsb_flags & irsb_open))
return false;
m_next->locate(tdbb, impure->irsb_position++);
return m_next->getRecord(tdbb);
m_next->locate(tdbb, impure->irsb_position);
if (!m_next->getRecord(tdbb))
return false;
++impure->irsb_position;
return true;
}
bool BufferedStreamWindow::refetchRecord(thread_db* tdbb)
@ -399,88 +421,13 @@ namespace
// ------------------------------
WindowedStream::WindowedStream(CompilerScratch* csb, const jrd_nod* nodWindows, RecordSource* next)
: m_mainMap(NULL),
m_winPassMap(csb->csb_pool)
: m_joinedStream(NULL)
{
thread_db* tdbb = JRD_get_thread_data();
m_next = FB_NEW(csb->csb_pool) BufferedStream(csb, next);
m_joinedStream = FB_NEW(csb->csb_pool) BufferedStreamWindow(csb, m_next);
AggregatedStream* mainWindow = NULL;
StreamsArray streams;
m_next->findUsedStreams(streams);
streams.insert(0, streams.getCount());
Array<bool> noAggregatedStreams;
noAggregatedStreams.resize(nodWindows->nod_count);
// Process unordered partitions.
for (unsigned i = 0; i < nodWindows->nod_count; ++i)
{
jrd_nod* const nodWindow = nodWindows->nod_arg[i];
jrd_nod* const partition = nodWindow->nod_arg[e_part_group];
jrd_nod* const partitionMap = nodWindow->nod_arg[e_part_map];
jrd_nod* const repartition = nodWindow->nod_arg[e_part_regroup];
jrd_nod* const order = nodWindow->nod_arg[e_part_order];
const USHORT stream = (USHORT)(IPTR) nodWindow->nod_arg[e_part_stream];
noAggregatedStreams[i] = false;
if (order)
noAggregatedStreams[i] = true;
else if (partition)
{
// In the case of an item needs winPass call, we should process the partition as an
// ordered one.
jrd_nod* const* ptr = partitionMap->nod_arg;
for (const jrd_nod* const* end = ptr + partitionMap->nod_count; ptr < end; ptr++)
{
jrd_nod* const from = (*ptr)->nod_arg[e_asgn_from];
const AggNode* aggNode = ExprNode::as<AggNode>(from);
if (aggNode && aggNode->shouldCallWinPass())
{
noAggregatedStreams[i] = true;
break;
}
}
}
if (noAggregatedStreams[i])
continue;
if (!partition)
{
// This is the main window. It has special processing.
fb_assert(!m_mainMap);
m_mainMap = partitionMap;
fb_assert(!mainWindow);
mainWindow = FB_NEW(csb->csb_pool) AggregatedStream(
csb, stream, NULL, m_mainMap,
FB_NEW(csb->csb_pool) BufferedStreamWindow(csb, m_next));
OPT_gen_aggregate_distincts(tdbb, csb, m_mainMap);
continue;
}
SortedStream* const sortedStream = OPT_gen_sort(tdbb, csb, streams.begin(), NULL,
FB_NEW(csb->csb_pool) BufferedStreamWindow(csb, m_next), partition, false);
AggregatedStream* const aggStream = FB_NEW(csb->csb_pool) AggregatedStream(
csb, stream, partition, partitionMap, sortedStream);
OPT_gen_aggregate_distincts(tdbb, csb, partitionMap);
m_joinedStream = FB_NEW(csb->csb_pool) WindowJoin(csb, m_joinedStream, aggStream,
partition, repartition);
}
// Process ordered partitions.
// Process the unpartioned and unordered map, if existent.
for (unsigned i = 0; i < nodWindows->nod_count; ++i)
{
@ -490,10 +437,7 @@ WindowedStream::WindowedStream(CompilerScratch* csb, const jrd_nod* nodWindows,
jrd_nod* const order = nodWindow->nod_arg[e_part_order];
const USHORT stream = (USHORT)(IPTR) nodWindow->nod_arg[e_part_stream];
if (!noAggregatedStreams[i])
continue;
// Verify not supported functions/clauses.
// While here, verify not supported functions/clauses.
const jrd_nod* const* ptr = partitionMap->nod_arg;
for (const jrd_nod* const* const end = ptr + partitionMap->nod_count; ptr < end; ++ptr)
@ -501,10 +445,36 @@ WindowedStream::WindowedStream(CompilerScratch* csb, const jrd_nod* nodWindows,
jrd_nod* from = (*ptr)->nod_arg[e_asgn_from];
const AggNode* aggNode = ExprNode::as<AggNode>(from);
if (aggNode)
if (order && aggNode)
aggNode->checkOrderedWindowCapable();
}
if (!partition && !order)
{
fb_assert(!m_joinedStream);
m_joinedStream = FB_NEW(csb->csb_pool) AggregatedStream(csb, stream, NULL,
partitionMap, FB_NEW(csb->csb_pool) BufferedStreamWindow(csb, m_next), NULL);
OPT_gen_aggregate_distincts(tdbb, csb, partitionMap);
}
}
if (!m_joinedStream)
m_joinedStream = FB_NEW(csb->csb_pool) BufferedStreamWindow(csb, m_next);
// Process ordered partitions.
StreamsArray streams;
for (unsigned i = 0; i < nodWindows->nod_count; ++i)
{
jrd_nod* const nodWindow = nodWindows->nod_arg[i];
jrd_nod* const partition = nodWindow->nod_arg[e_part_group];
jrd_nod* const partitionMap = nodWindow->nod_arg[e_part_map];
jrd_nod* const order = nodWindow->nod_arg[e_part_order];
const USHORT stream = (USHORT)(IPTR) nodWindow->nod_arg[e_part_stream];
// Refresh the stream list based on the last m_joinedStream.
streams.clear();
m_joinedStream->findUsedStreams(streams);
@ -547,27 +517,15 @@ WindowedStream::WindowedStream(CompilerScratch* csb, const jrd_nod* nodWindows,
else
partitionOrder = order;
SortedStream* sortedStream = OPT_gen_sort(tdbb, csb, streams.begin(), NULL,
m_joinedStream, partitionOrder, false);
m_joinedStream = FB_NEW(csb->csb_pool) AggregatedStream(csb, stream, partition,
partitionMap, sortedStream, order);
}
if (mainWindow)
{
// Make a cross join with the main window.
RecordSource* const rsbs[] = {mainWindow, m_joinedStream};
m_joinedStream = FB_NEW(csb->csb_pool) NestedLoopJoin(csb, 2, rsbs);
// Separate nodes that requires the winPass call.
for (jrd_nod** ptr = m_mainMap->nod_arg, **end = ptr + m_mainMap->nod_count; ptr < end; ptr++)
if (partitionOrder)
{
jrd_nod* from = (*ptr)->nod_arg[e_asgn_from];
const AggNode* aggNode = ExprNode::as<AggNode>(from);
SortedStream* sortedStream = OPT_gen_sort(tdbb, csb, streams.begin(), NULL,
m_joinedStream, partitionOrder, false);
if (aggNode && aggNode->shouldCallWinPass())
m_winPassMap.add(*ptr);
m_joinedStream = FB_NEW(csb->csb_pool) AggregatedStream(csb, stream, partition,
partitionMap, FB_NEW(csb->csb_pool) BufferedStream(csb, sortedStream), order);
OPT_gen_aggregate_distincts(tdbb, csb, partitionMap);
}
}
}
@ -610,42 +568,6 @@ bool WindowedStream::getRecord(thread_db* tdbb)
if (!m_joinedStream->getRecord(tdbb))
return false;
// Map the inner stream non-aggregate fields to the main partition.
if (m_mainMap)
{
dsc* desc;
jrd_nod* const* ptr = m_mainMap->nod_arg;
for (const jrd_nod* const* end = ptr + m_mainMap->nod_count; ptr < end; ++ptr)
{
jrd_nod* const from = (*ptr)->nod_arg[e_asgn_from];
const AggNode* aggNode = ExprNode::as<AggNode>(from);
if (!aggNode)
EXE_assignment(tdbb, *ptr);
}
for (ptr = m_winPassMap.begin(); ptr != m_winPassMap.end(); ++ptr)
{
jrd_nod* from = (*ptr)->nod_arg[e_asgn_from];
fb_assert(from->nod_type == nod_class_exprnode_jrd);
const AggNode* aggNode = reinterpret_cast<const AggNode*>(from->nod_arg[0]);
jrd_nod* field = (*ptr)->nod_arg[e_asgn_to];
const USHORT id = (USHORT)(IPTR) field->nod_arg[e_fld_id];
Record* record = request->req_rpb[(int) (IPTR) field->nod_arg[e_fld_stream]].rpb_record;
desc = aggNode->winPass(tdbb, request);
if (!desc)
SET_NULL(record, id);
else
{
MOV_move(tdbb, desc, EVL_assign_to(tdbb, field));
CLEAR_NULL(record, id);
}
}
}
return true;
}