mirror of
https://github.com/FirebirdSQL/firebird.git
synced 2025-02-02 10:00:38 +01:00
Refactor window functions to support new types of functions.
This commit is contained in:
parent
056164ab95
commit
3c9df93939
@ -60,7 +60,8 @@ AggNode::AggNode(MemoryPool& pool, const AggInfo& aAggInfo, bool aDistinct, bool
|
||||
dialect1(aDialect1),
|
||||
arg(aArg),
|
||||
asb(NULL),
|
||||
indexed(false)
|
||||
indexed(false),
|
||||
ordered(false)
|
||||
{
|
||||
addChildNode(arg, arg);
|
||||
}
|
||||
@ -336,10 +337,14 @@ AggNode* AggNode::pass2(thread_db* tdbb, CompilerScratch* csb)
|
||||
return this;
|
||||
}
|
||||
|
||||
void AggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void AggNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->vlux_count = 0;
|
||||
|
||||
if (aggType == AGG_TYPE_GROUP)
|
||||
impure->vlux_count = 0;
|
||||
|
||||
impure->aggType = aggType;
|
||||
|
||||
if (distinct)
|
||||
{
|
||||
@ -622,9 +627,12 @@ string AvgAggNode::internalPrint(NodePrinter& printer) const
|
||||
return "AvgAggNode";
|
||||
}
|
||||
|
||||
void AvgAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void AvgAggNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
|
||||
@ -644,6 +652,10 @@ void AvgAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void AvgAggNode::aggPass(thread_db* /*tdbb*/, jrd_req* request, dsc* desc) const
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
|
||||
if (ordered && impure->aggType != AGG_TYPE_ORDER)
|
||||
return;
|
||||
|
||||
++impure->vlux_count;
|
||||
|
||||
if (dialect1)
|
||||
@ -748,9 +760,12 @@ string ListAggNode::internalPrint(NodePrinter& printer) const
|
||||
return "ListAggNode";
|
||||
}
|
||||
|
||||
void ListAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void ListAggNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
// We don't know here what should be the sub-type and text-type.
|
||||
// Defer blob creation for when first record is found.
|
||||
@ -763,6 +778,9 @@ void ListAggNode::aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
|
||||
if (ordered && impure->aggType != AGG_TYPE_ORDER)
|
||||
return;
|
||||
|
||||
if (!impure->vlu_blob)
|
||||
{
|
||||
impure->vlu_blob = blb::create(tdbb, request->req_transaction,
|
||||
@ -886,9 +904,12 @@ string CountAggNode::internalPrint(NodePrinter& printer) const
|
||||
return "CountAggNode";
|
||||
}
|
||||
|
||||
void CountAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void CountAggNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->make_int64(0);
|
||||
@ -898,6 +919,9 @@ void CountAggNode::aggPass(thread_db* /*tdbb*/, jrd_req* request, dsc* /*desc*/)
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
|
||||
if (ordered && impure->aggType != AGG_TYPE_ORDER)
|
||||
return;
|
||||
|
||||
if (dialect1)
|
||||
++impure->vlu_misc.vlu_long;
|
||||
else
|
||||
@ -1114,9 +1138,12 @@ string SumAggNode::internalPrint(NodePrinter& printer) const
|
||||
return "SumAggNode";
|
||||
}
|
||||
|
||||
void SumAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void SumAggNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
|
||||
@ -1133,6 +1160,10 @@ void SumAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void SumAggNode::aggPass(thread_db* /*tdbb*/, jrd_req* request, dsc* desc) const
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
|
||||
if (ordered && impure->aggType != AGG_TYPE_ORDER)
|
||||
return;
|
||||
|
||||
++impure->vlux_count;
|
||||
|
||||
if (dialect1)
|
||||
@ -1207,9 +1238,12 @@ string MaxMinAggNode::internalPrint(NodePrinter& printer) const
|
||||
return "MaxMinAggNode";
|
||||
}
|
||||
|
||||
void MaxMinAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void MaxMinAggNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->vlu_desc.dsc_dtype = 0;
|
||||
@ -1219,6 +1253,9 @@ void MaxMinAggNode::aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
|
||||
if (ordered && impure->aggType != AGG_TYPE_ORDER)
|
||||
return;
|
||||
|
||||
++impure->vlux_count;
|
||||
if (!impure->vlu_desc.dsc_dtype)
|
||||
{
|
||||
@ -1315,9 +1352,12 @@ string StdDevAggNode::internalPrint(NodePrinter& printer) const
|
||||
return "StdDevAggNode";
|
||||
}
|
||||
|
||||
void StdDevAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void StdDevAggNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->make_double(0);
|
||||
@ -1329,6 +1369,10 @@ void StdDevAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void StdDevAggNode::aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
|
||||
if (ordered && impure->aggType != AGG_TYPE_ORDER)
|
||||
return;
|
||||
|
||||
++impure->vlux_count;
|
||||
|
||||
const double d = MOV_get_double(desc);
|
||||
@ -1452,9 +1496,12 @@ string CorrAggNode::internalPrint(NodePrinter& printer) const
|
||||
return "CorrAggNode";
|
||||
}
|
||||
|
||||
void CorrAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void CorrAggNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->make_double(0);
|
||||
@ -1465,6 +1512,11 @@ void CorrAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
|
||||
bool CorrAggNode::aggPass(thread_db* tdbb, jrd_req* request) const
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
|
||||
if (ordered && impure->aggType != AGG_TYPE_ORDER)
|
||||
return true;
|
||||
|
||||
dsc* desc = NULL;
|
||||
dsc* desc2 = NULL;
|
||||
|
||||
@ -1476,7 +1528,6 @@ bool CorrAggNode::aggPass(thread_db* tdbb, jrd_req* request) const
|
||||
if (request->req_flags & req_null)
|
||||
return false;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
++impure->vlux_count;
|
||||
|
||||
const double y = MOV_get_double(desc);
|
||||
@ -1637,9 +1688,12 @@ string RegrAggNode::internalPrint(NodePrinter& printer) const
|
||||
return "RegrAggNode";
|
||||
}
|
||||
|
||||
void RegrAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void RegrAggNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->make_double(0);
|
||||
@ -1650,6 +1704,11 @@ void RegrAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
|
||||
bool RegrAggNode::aggPass(thread_db* tdbb, jrd_req* request) const
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
|
||||
if (ordered && impure->aggType != AGG_TYPE_ORDER)
|
||||
return true;
|
||||
|
||||
dsc* desc = NULL;
|
||||
dsc* desc2 = NULL;
|
||||
|
||||
@ -1661,7 +1720,6 @@ bool RegrAggNode::aggPass(thread_db* tdbb, jrd_req* request) const
|
||||
if (request->req_flags & req_null)
|
||||
return false;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
++impure->vlux_count;
|
||||
|
||||
const double y = MOV_get_double(desc);
|
||||
@ -1810,9 +1868,12 @@ string RegrCountAggNode::internalPrint(NodePrinter& printer) const
|
||||
return "RegrCountAggNode";
|
||||
}
|
||||
|
||||
void RegrCountAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void RegrCountAggNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->make_int64(0);
|
||||
@ -1820,6 +1881,11 @@ void RegrCountAggNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
|
||||
bool RegrCountAggNode::aggPass(thread_db* tdbb, jrd_req* request) const
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
|
||||
if (ordered && impure->aggType != AGG_TYPE_ORDER)
|
||||
return true;
|
||||
|
||||
dsc* desc = EVL_expr(tdbb, request, arg);
|
||||
if (request->req_flags & req_null)
|
||||
return false;
|
||||
@ -1828,7 +1894,6 @@ bool RegrCountAggNode::aggPass(thread_db* tdbb, jrd_req* request) const
|
||||
if (request->req_flags & req_null)
|
||||
return false;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
++impure->vlu_misc.vlu_int64;
|
||||
|
||||
return true;
|
||||
|
@ -43,7 +43,7 @@ public:
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
virtual AggNode* pass2(thread_db* tdbb, CompilerScratch* csb);
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
@ -77,7 +77,7 @@ public:
|
||||
"LIST is not supported in ordered windows");
|
||||
}
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
@ -101,7 +101,7 @@ public:
|
||||
virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc);
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
@ -121,7 +121,7 @@ public:
|
||||
virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc);
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
@ -147,7 +147,7 @@ public:
|
||||
virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc);
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
@ -184,7 +184,7 @@ public:
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
virtual AggNode* pass2(thread_db* tdbb, CompilerScratch* csb);
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
@ -224,7 +224,7 @@ public:
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
virtual AggNode* pass2(thread_db* tdbb, CompilerScratch* csb);
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual bool aggPass(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
@ -271,7 +271,7 @@ public:
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
virtual AggNode* pass2(thread_db* tdbb, CompilerScratch* csb);
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual bool aggPass(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
@ -300,7 +300,7 @@ public:
|
||||
virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc);
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual bool aggPass(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
@ -969,9 +969,8 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
virtual bool shouldCallWinPass() const
|
||||
virtual void aggSetup(bool& wantWinPass) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual dsc* winPass(thread_db* /*tdbb*/, jrd_req* /*request*/, SlidingWindow* /*window*/) const
|
||||
@ -979,7 +978,7 @@ public:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const = 0; // pure, but defined
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const = 0; // pure, but defined
|
||||
virtual void aggFinish(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual bool aggPass(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual dsc* execute(thread_db* tdbb, jrd_req* request) const;
|
||||
@ -1004,6 +1003,7 @@ public:
|
||||
NestConst<ValueExprNode> arg;
|
||||
const AggregateSort* asb;
|
||||
bool indexed;
|
||||
bool ordered;
|
||||
|
||||
private:
|
||||
static Factory* factories;
|
||||
|
@ -79,9 +79,12 @@ ValueExprNode* DenseRankWinNode::copy(thread_db* tdbb, NodeCopier& /*copier*/) c
|
||||
return FB_NEW_POOL(*tdbb->getDefaultPool()) DenseRankWinNode(*tdbb->getDefaultPool());
|
||||
}
|
||||
|
||||
void DenseRankWinNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void DenseRankWinNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->make_int64(0, 0);
|
||||
@ -94,7 +97,10 @@ void DenseRankWinNode::aggPass(thread_db* /*tdbb*/, jrd_req* /*request*/, dsc* /
|
||||
dsc* DenseRankWinNode::aggExecute(thread_db* /*tdbb*/, jrd_req* request) const
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
++impure->vlu_misc.vlu_int64;
|
||||
|
||||
if (!ordered || impure->aggType == AGG_TYPE_ORDER)
|
||||
++impure->vlu_misc.vlu_int64;
|
||||
|
||||
return &impure->vlu_desc;
|
||||
}
|
||||
|
||||
@ -152,9 +158,12 @@ AggNode* RankWinNode::pass2(thread_db* tdbb, CompilerScratch* csb)
|
||||
return this;
|
||||
}
|
||||
|
||||
void RankWinNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void RankWinNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->make_int64(1, 0);
|
||||
@ -164,7 +173,9 @@ void RankWinNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void RankWinNode::aggPass(thread_db* /*tdbb*/, jrd_req* request, dsc* /*desc*/) const
|
||||
{
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
++impure->vlux_count;
|
||||
|
||||
if (!ordered || impure->aggType == AGG_TYPE_ORDER)
|
||||
++impure->vlux_count;
|
||||
}
|
||||
|
||||
dsc* RankWinNode::aggExecute(thread_db* tdbb, jrd_req* request) const
|
||||
@ -226,9 +237,12 @@ ValueExprNode* RowNumberWinNode::copy(thread_db* tdbb, NodeCopier& /*copier*/) c
|
||||
return FB_NEW_POOL(*tdbb->getDefaultPool()) RowNumberWinNode(*tdbb->getDefaultPool());
|
||||
}
|
||||
|
||||
void RowNumberWinNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void RowNumberWinNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->make_int64(0, 0);
|
||||
@ -296,9 +310,12 @@ ValueExprNode* FirstValueWinNode::copy(thread_db* tdbb, NodeCopier& copier) cons
|
||||
return node;
|
||||
}
|
||||
|
||||
void FirstValueWinNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void FirstValueWinNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->make_int64(0, 0);
|
||||
@ -376,9 +393,9 @@ ValueExprNode* LastValueWinNode::copy(thread_db* tdbb, NodeCopier& copier) const
|
||||
return node;
|
||||
}
|
||||
|
||||
void LastValueWinNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void LastValueWinNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
}
|
||||
|
||||
void LastValueWinNode::aggPass(thread_db* /*tdbb*/, jrd_req* /*request*/, dsc* /*desc*/) const
|
||||
@ -460,9 +477,12 @@ ValueExprNode* NthValueWinNode::copy(thread_db* tdbb, NodeCopier& copier) const
|
||||
return node;
|
||||
}
|
||||
|
||||
void NthValueWinNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void NthValueWinNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
|
||||
if (aggType != AGG_TYPE_GROUP)
|
||||
return;
|
||||
|
||||
impure_value_ex* impure = request->getImpure<impure_value_ex>(impureOffset);
|
||||
impure->make_int64(0, 0);
|
||||
@ -573,9 +593,9 @@ void LagLeadWinNode::getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc)
|
||||
arg->getDesc(tdbb, csb, desc);
|
||||
}
|
||||
|
||||
void LagLeadWinNode::aggInit(thread_db* tdbb, jrd_req* request) const
|
||||
void LagLeadWinNode::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
AggNode::aggInit(tdbb, request);
|
||||
AggNode::aggInit(tdbb, request, aggType);
|
||||
}
|
||||
|
||||
void LagLeadWinNode::aggPass(thread_db* /*tdbb*/, jrd_req* /*request*/, dsc* /*desc*/) const
|
||||
|
@ -41,7 +41,7 @@ public:
|
||||
virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc);
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
@ -61,7 +61,7 @@ public:
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
virtual AggNode* pass2(thread_db* tdbb, CompilerScratch* csb);
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
@ -83,13 +83,13 @@ public:
|
||||
virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc);
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
virtual bool shouldCallWinPass() const
|
||||
virtual void aggSetup(bool& wantWinPass) const
|
||||
{
|
||||
return true;
|
||||
wantWinPass = true;
|
||||
}
|
||||
|
||||
virtual dsc* winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const;
|
||||
@ -109,13 +109,13 @@ public:
|
||||
virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc);
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
virtual bool shouldCallWinPass() const
|
||||
virtual void aggSetup(bool& wantWinPass) const
|
||||
{
|
||||
return true;
|
||||
wantWinPass = true;
|
||||
}
|
||||
|
||||
virtual dsc* winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const;
|
||||
@ -137,13 +137,13 @@ public:
|
||||
virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc);
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
virtual bool shouldCallWinPass() const
|
||||
virtual void aggSetup(bool& wantWinPass) const
|
||||
{
|
||||
return true;
|
||||
wantWinPass = true;
|
||||
}
|
||||
|
||||
virtual dsc* winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const;
|
||||
@ -173,13 +173,13 @@ public:
|
||||
virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc);
|
||||
virtual ValueExprNode* copy(thread_db* tdbb, NodeCopier& copier) const;
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
virtual bool shouldCallWinPass() const
|
||||
virtual void aggSetup(bool& wantWinPass) const
|
||||
{
|
||||
return true;
|
||||
wantWinPass = true;
|
||||
}
|
||||
|
||||
virtual dsc* winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const;
|
||||
@ -205,13 +205,13 @@ public:
|
||||
virtual void make(DsqlCompilerScratch* dsqlScratch, dsc* desc);
|
||||
virtual void getDesc(thread_db* tdbb, CompilerScratch* csb, dsc* desc);
|
||||
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request) const;
|
||||
virtual void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
virtual void aggPass(thread_db* tdbb, jrd_req* request, dsc* desc) const;
|
||||
virtual dsc* aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
|
||||
virtual bool shouldCallWinPass() const
|
||||
virtual void aggSetup(bool& wantWinPass) const
|
||||
{
|
||||
return true;
|
||||
wantWinPass = true;
|
||||
}
|
||||
|
||||
virtual dsc* winPass(thread_db* tdbb, jrd_req* request, SlidingWindow* window) const;
|
||||
|
@ -76,7 +76,11 @@ void AggregatedStream::open(thread_db* tdbb) const
|
||||
impure->irsb_flags = irsb_open;
|
||||
|
||||
impure->state = STATE_GROUPING;
|
||||
impure->pending = 0;
|
||||
impure->lastGroup = false;
|
||||
impure->partitionBlock.startPosition = impure->partitionBlock.endPosition =
|
||||
impure->partitionBlock.pending = 0;
|
||||
impure->orderBlock = impure->partitionBlock;
|
||||
|
||||
VIO_record(tdbb, &request->req_rpb[m_stream], m_format, tdbb->getDefaultPool());
|
||||
|
||||
unsigned impureCount = m_group ? m_group->getCount() : 0;
|
||||
@ -126,30 +130,61 @@ bool AggregatedStream::getRecord(thread_db* tdbb) const
|
||||
{
|
||||
const FB_UINT64 position = m_bufferedStream->getPosition(request);
|
||||
|
||||
if (impure->pending == 0)
|
||||
if (impure->orderBlock.pending == 0)
|
||||
{
|
||||
if (impure->state == STATE_PENDING)
|
||||
if (impure->partitionBlock.pending == 0)
|
||||
{
|
||||
if (!m_bufferedStream->getRecord(tdbb))
|
||||
if (impure->lastGroup || !evaluateGroup(tdbb, AGG_TYPE_GROUP, MAX_UINT64))
|
||||
{
|
||||
rpb->rpb_number.setValid(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
impure->partitionBlock.startPosition = position;
|
||||
impure->partitionBlock.endPosition = m_bufferedStream->getPosition(request) - 1 -
|
||||
(impure->state == STATE_FETCHED ? 1 : 0);
|
||||
impure->partitionBlock.pending =
|
||||
impure->partitionBlock.endPosition - impure->partitionBlock.startPosition + 1;
|
||||
|
||||
fb_assert(impure->partitionBlock.pending > 0);
|
||||
|
||||
m_bufferedStream->locate(tdbb, position);
|
||||
impure->state = STATE_GROUPING;
|
||||
|
||||
impure->lastGroup = impure->state == STATE_EOF;
|
||||
}
|
||||
|
||||
// Check if we need to re-aggregate by the ORDER BY clause.
|
||||
if (!m_order && m_winPassSources.isEmpty())
|
||||
impure->orderBlock = impure->partitionBlock;
|
||||
else
|
||||
{
|
||||
if (!evaluateGroup(tdbb, AGG_TYPE_ORDER, impure->partitionBlock.pending))
|
||||
fb_assert(false);
|
||||
|
||||
impure->orderBlock.startPosition = position;
|
||||
impure->orderBlock.endPosition = m_bufferedStream->getPosition(request) - 1 -
|
||||
(impure->state == STATE_FETCHED ? 1 : 0);
|
||||
impure->orderBlock.pending =
|
||||
impure->orderBlock.endPosition - impure->orderBlock.startPosition + 1;
|
||||
|
||||
fb_assert(impure->orderBlock.pending > 0);
|
||||
}
|
||||
|
||||
impure->state = evaluateGroup(tdbb, impure->state);
|
||||
|
||||
if (impure->state == STATE_PROCESS_EOF)
|
||||
{
|
||||
rpb->rpb_number.setValid(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
impure->pending = m_bufferedStream->getPosition(request) - position -
|
||||
(impure->state == STATE_EOF_FOUND ? 0 : 1);
|
||||
m_bufferedStream->locate(tdbb, position);
|
||||
impure->state = STATE_GROUPING;
|
||||
}
|
||||
|
||||
fb_assert(impure->orderBlock.pending > 0 && impure->partitionBlock.pending > 0);
|
||||
|
||||
--impure->orderBlock.pending;
|
||||
--impure->partitionBlock.pending;
|
||||
|
||||
if (m_winPassSources.hasData())
|
||||
{
|
||||
SlidingWindow window(tdbb, m_bufferedStream, m_group, request);
|
||||
SlidingWindow window(tdbb, m_bufferedStream, m_group, request,
|
||||
impure->partitionBlock.startPosition, impure->partitionBlock.endPosition,
|
||||
impure->orderBlock.startPosition, impure->orderBlock.endPosition);
|
||||
dsc* desc;
|
||||
|
||||
const NestConst<ValueExprNode>* const sourceEnd = m_winPassSources.end();
|
||||
@ -177,14 +212,8 @@ bool AggregatedStream::getRecord(thread_db* tdbb) const
|
||||
}
|
||||
}
|
||||
|
||||
if (impure->pending > 0)
|
||||
--impure->pending;
|
||||
|
||||
if (!m_bufferedStream->getRecord(tdbb))
|
||||
{
|
||||
rpb->rpb_number.setValid(false);
|
||||
return false;
|
||||
}
|
||||
fb_assert(false);
|
||||
|
||||
// If there is no group, we should reassign the map items.
|
||||
if (!m_group)
|
||||
@ -205,9 +234,7 @@ bool AggregatedStream::getRecord(thread_db* tdbb) const
|
||||
}
|
||||
else
|
||||
{
|
||||
impure->state = evaluateGroup(tdbb, impure->state);
|
||||
|
||||
if (impure->state == STATE_PROCESS_EOF)
|
||||
if (!evaluateGroup(tdbb, AGG_TYPE_GROUP, MAX_UINT64))
|
||||
{
|
||||
rpb->rpb_number.setValid(false);
|
||||
return false;
|
||||
@ -233,9 +260,7 @@ void AggregatedStream::print(thread_db* tdbb, string& plan,
|
||||
bool detailed, unsigned level) const
|
||||
{
|
||||
if (detailed)
|
||||
{
|
||||
plan += printIndent(++level) + (m_bufferedStream ? "Window" : "Aggregate");
|
||||
}
|
||||
|
||||
m_next->print(tdbb, plan, detailed, level);
|
||||
}
|
||||
@ -276,6 +301,100 @@ void AggregatedStream::init(thread_db* tdbb, CompilerScratch* csb)
|
||||
|
||||
// Separate nodes that requires the winPass call.
|
||||
|
||||
NestConst<ValueExprNode>* const sourceEnd = m_map->sourceList.end();
|
||||
|
||||
for (NestConst<ValueExprNode>* source = m_map->sourceList.begin(),
|
||||
*target = m_map->targetList.begin();
|
||||
source != sourceEnd;
|
||||
++source, ++target)
|
||||
{
|
||||
AggNode* aggNode = (*source)->as<AggNode>();
|
||||
|
||||
if (aggNode)
|
||||
{
|
||||
aggNode->ordered = m_order != NULL;
|
||||
|
||||
bool wantWinPass = false;
|
||||
aggNode->aggSetup(wantWinPass);
|
||||
|
||||
if (wantWinPass)
|
||||
{
|
||||
m_winPassSources.add(*source);
|
||||
m_winPassTargets.add(*target);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Compute the next aggregated record of a value group.
|
||||
bool AggregatedStream::evaluateGroup(thread_db* tdbb, AggType aggType, FB_UINT64 limit) const
|
||||
{
|
||||
const NestValueArray* const group = aggType == AGG_TYPE_GROUP ? m_group : m_order;
|
||||
unsigned groupOffset = aggType == AGG_TYPE_GROUP || !m_group ? 0 : m_group->getCount();
|
||||
jrd_req* const request = tdbb->getRequest();
|
||||
|
||||
if (--tdbb->tdbb_quantum < 0)
|
||||
JRD_reschedule(tdbb, 0, true);
|
||||
|
||||
Impure* const impure = request->getImpure<Impure>(m_impure);
|
||||
|
||||
// if we found the last record last time, we're all done
|
||||
if (impure->state == STATE_EOF)
|
||||
return false;
|
||||
|
||||
try
|
||||
{
|
||||
if (aggType == AGG_TYPE_GROUP || group != NULL)
|
||||
aggInit(tdbb, request, aggType);
|
||||
|
||||
// If there isn't a record pending, open the stream and get one
|
||||
|
||||
if (!getNextRecord(tdbb, request, limit))
|
||||
{
|
||||
impure->state = STATE_EOF;
|
||||
|
||||
if (group || m_bufferedStream)
|
||||
{
|
||||
finiDistinct(tdbb, request);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else
|
||||
cacheValues(tdbb, request, group, groupOffset);
|
||||
|
||||
// Loop thru records until either a value change or EOF
|
||||
|
||||
while (impure->state == STATE_GROUPING)
|
||||
{
|
||||
if ((aggType == AGG_TYPE_GROUP || group != NULL) && !aggPass(tdbb, request))
|
||||
impure->state = STATE_EOF;
|
||||
else if (getNextRecord(tdbb, request, limit))
|
||||
{
|
||||
// In the case of a group by, look for a change in value of any of
|
||||
// the columns; if we find one, stop aggregating and return what we have.
|
||||
|
||||
if (lookForChange(tdbb, request, group, groupOffset))
|
||||
impure->state = STATE_FETCHED;
|
||||
}
|
||||
else
|
||||
impure->state = STATE_EOF;
|
||||
}
|
||||
|
||||
if (aggType == AGG_TYPE_GROUP || group != NULL)
|
||||
aggExecute(tdbb, request);
|
||||
}
|
||||
catch (const Exception&)
|
||||
{
|
||||
finiDistinct(tdbb, request);
|
||||
throw;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// Initialize the aggregate record
|
||||
void AggregatedStream::aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const
|
||||
{
|
||||
const NestConst<ValueExprNode>* const sourceEnd = m_map->sourceList.end();
|
||||
|
||||
for (const NestConst<ValueExprNode>* source = m_map->sourceList.begin(),
|
||||
@ -285,153 +404,89 @@ void AggregatedStream::init(thread_db* tdbb, CompilerScratch* csb)
|
||||
{
|
||||
const AggNode* aggNode = (*source)->as<AggNode>();
|
||||
|
||||
if (aggNode && aggNode->shouldCallWinPass())
|
||||
if (aggNode)
|
||||
aggNode->aggInit(tdbb, request, aggType);
|
||||
else if ((*source)->is<LiteralNode>())
|
||||
EXE_assignment(tdbb, *source, *target);
|
||||
}
|
||||
}
|
||||
|
||||
// Go through and compute all the aggregates on this record
|
||||
bool AggregatedStream::aggPass(thread_db* tdbb, jrd_req* request) const
|
||||
{
|
||||
bool ret = true;
|
||||
const NestConst<ValueExprNode>* const sourceEnd = m_map->sourceList.end();
|
||||
|
||||
for (const NestConst<ValueExprNode>* source = m_map->sourceList.begin(),
|
||||
*target = m_map->targetList.begin();
|
||||
source != sourceEnd;
|
||||
++source, ++target)
|
||||
{
|
||||
const AggNode* aggNode = (*source)->as<AggNode>();
|
||||
|
||||
if (aggNode)
|
||||
{
|
||||
m_winPassSources.add(*source);
|
||||
m_winPassTargets.add(*target);
|
||||
if (aggNode->aggPass(tdbb, request))
|
||||
{
|
||||
// If a max or min has been mapped to an index, then the first record is the EOF.
|
||||
if (aggNode->indexed)
|
||||
ret = false;
|
||||
}
|
||||
}
|
||||
else
|
||||
EXE_assignment(tdbb, *source, *target);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void AggregatedStream::aggExecute(thread_db* tdbb, jrd_req* request) const
|
||||
{
|
||||
const NestConst<ValueExprNode>* const sourceEnd = m_map->sourceList.end();
|
||||
|
||||
for (const NestConst<ValueExprNode>* source = m_map->sourceList.begin(),
|
||||
*target = m_map->targetList.begin();
|
||||
source != sourceEnd;
|
||||
++source, ++target)
|
||||
{
|
||||
const AggNode* aggNode = (*source)->as<AggNode>();
|
||||
|
||||
if (aggNode)
|
||||
{
|
||||
const FieldNode* field = (*target)->as<FieldNode>();
|
||||
const USHORT id = field->fieldId;
|
||||
Record* record = request->req_rpb[field->fieldStream].rpb_record;
|
||||
|
||||
dsc* desc = aggNode->execute(tdbb, request);
|
||||
if (!desc || !desc->dsc_dtype)
|
||||
record->setNull(id);
|
||||
else
|
||||
{
|
||||
MOV_move(tdbb, desc, EVL_assign_to(tdbb, *target));
|
||||
record->clearNull(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Compute the next aggregated record of a value group. evlGroup is driven by, and returns, a state
|
||||
// variable.
|
||||
AggregatedStream::State AggregatedStream::evaluateGroup(thread_db* tdbb, State state) const
|
||||
bool AggregatedStream::getNextRecord(thread_db* tdbb, jrd_req* request, FB_UINT64& limit) const
|
||||
{
|
||||
jrd_req* const request = tdbb->getRequest();
|
||||
|
||||
if (--tdbb->tdbb_quantum < 0)
|
||||
JRD_reschedule(tdbb, 0, true);
|
||||
|
||||
Impure* const impure = request->getImpure<Impure>(m_impure);
|
||||
|
||||
// if we found the last record last time, we're all done
|
||||
|
||||
if (state == STATE_EOF_FOUND)
|
||||
return STATE_PROCESS_EOF;
|
||||
|
||||
try
|
||||
if (limit == 0)
|
||||
return false;
|
||||
else if (impure->state == STATE_FETCHED)
|
||||
{
|
||||
const NestConst<ValueExprNode>* const sourceEnd = m_map->sourceList.end();
|
||||
|
||||
// If there isn't a record pending, open the stream and get one
|
||||
|
||||
if (!m_order || state == STATE_PROCESS_EOF || state == STATE_GROUPING)
|
||||
{
|
||||
// Initialize the aggregate record
|
||||
|
||||
for (const NestConst<ValueExprNode>* source = m_map->sourceList.begin(),
|
||||
*target = m_map->targetList.begin();
|
||||
source != sourceEnd;
|
||||
++source, ++target)
|
||||
{
|
||||
const AggNode* aggNode = (*source)->as<AggNode>();
|
||||
|
||||
if (aggNode)
|
||||
aggNode->aggInit(tdbb, request);
|
||||
else if ((*source)->is<LiteralNode>())
|
||||
EXE_assignment(tdbb, *source, *target);
|
||||
}
|
||||
}
|
||||
|
||||
if (state == STATE_PROCESS_EOF || state == STATE_GROUPING)
|
||||
{
|
||||
if (!m_next->getRecord(tdbb))
|
||||
{
|
||||
if (m_group)
|
||||
{
|
||||
finiDistinct(tdbb, request);
|
||||
return STATE_PROCESS_EOF;
|
||||
}
|
||||
|
||||
state = STATE_EOF_FOUND;
|
||||
}
|
||||
}
|
||||
|
||||
cacheValues(tdbb, request, m_group, 0);
|
||||
|
||||
if (state != STATE_EOF_FOUND)
|
||||
cacheValues(tdbb, request, m_order, (m_group ? m_group->getCount() : 0));
|
||||
|
||||
// Loop thru records until either a value change or EOF
|
||||
|
||||
while (state != STATE_EOF_FOUND)
|
||||
{
|
||||
state = STATE_PENDING;
|
||||
|
||||
// go through and compute all the aggregates on this record
|
||||
|
||||
for (const NestConst<ValueExprNode>* source = m_map->sourceList.begin(),
|
||||
*target = m_map->targetList.begin();
|
||||
source != sourceEnd;
|
||||
++source, ++target)
|
||||
{
|
||||
const AggNode* aggNode = (*source)->as<AggNode>();
|
||||
|
||||
if (aggNode)
|
||||
{
|
||||
if (aggNode->aggPass(tdbb, request))
|
||||
{
|
||||
// If a max or min has been mapped to an index, then the first record is the EOF.
|
||||
if (aggNode->indexed)
|
||||
state = STATE_EOF_FOUND;
|
||||
}
|
||||
}
|
||||
else
|
||||
EXE_assignment(tdbb, *source, *target);
|
||||
}
|
||||
|
||||
if (state != STATE_EOF_FOUND && m_next->getRecord(tdbb))
|
||||
{
|
||||
// In the case of a group by, look for a change in value of any of
|
||||
// the columns; if we find one, stop aggregating and return what we have.
|
||||
|
||||
if (lookForChange(tdbb, request, m_group, 0))
|
||||
{
|
||||
if (m_order)
|
||||
state = STATE_GROUPING;
|
||||
break;
|
||||
}
|
||||
|
||||
if (lookForChange(tdbb, request, m_order, (m_group ? m_group->getCount() : 0)))
|
||||
break;
|
||||
}
|
||||
else
|
||||
state = STATE_EOF_FOUND;
|
||||
}
|
||||
|
||||
// Finish up any residual computations and get out
|
||||
|
||||
for (const NestConst<ValueExprNode>* source = m_map->sourceList.begin(),
|
||||
*target = m_map->targetList.begin();
|
||||
source != sourceEnd;
|
||||
++source, ++target)
|
||||
{
|
||||
const AggNode* aggNode = (*source)->as<AggNode>();
|
||||
|
||||
if (aggNode)
|
||||
{
|
||||
const FieldNode* field = (*target)->as<FieldNode>();
|
||||
const USHORT id = field->fieldId;
|
||||
Record* record = request->req_rpb[field->fieldStream].rpb_record;
|
||||
|
||||
dsc* desc = aggNode->execute(tdbb, request);
|
||||
if (!desc || !desc->dsc_dtype)
|
||||
record->setNull(id);
|
||||
else
|
||||
{
|
||||
MOV_move(tdbb, desc, EVL_assign_to(tdbb, *target));
|
||||
record->clearNull(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
impure->state = STATE_GROUPING;
|
||||
return true;
|
||||
}
|
||||
catch (const Exception&)
|
||||
else if (m_next->getRecord(tdbb))
|
||||
{
|
||||
finiDistinct(tdbb, request);
|
||||
throw;
|
||||
--limit;
|
||||
return true;
|
||||
}
|
||||
|
||||
return state;
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
// Cache the values of a group/order in the impure.
|
||||
@ -507,11 +562,17 @@ void AggregatedStream::finiDistinct(thread_db* tdbb, jrd_req* request) const
|
||||
|
||||
|
||||
SlidingWindow::SlidingWindow(thread_db* aTdbb, const BaseBufferedStream* aStream,
|
||||
const NestValueArray* aGroup, jrd_req* aRequest)
|
||||
const NestValueArray* aGroup, jrd_req* aRequest,
|
||||
FB_UINT64 aPartitionStart, FB_UINT64 aPartitionEnd,
|
||||
FB_UINT64 aOrderStart, FB_UINT64 aOrderEnd)
|
||||
: tdbb(aTdbb), // Note: instanciate the class only as local variable
|
||||
stream(aStream),
|
||||
group(aGroup),
|
||||
request(aRequest),
|
||||
partitionStart(aPartitionStart),
|
||||
partitionEnd(aPartitionEnd),
|
||||
orderStart(aOrderStart),
|
||||
orderEnd(aOrderEnd),
|
||||
moved(false)
|
||||
{
|
||||
savedPosition = stream->getPosition(request);
|
||||
|
@ -602,9 +602,41 @@ namespace Jrd
|
||||
{
|
||||
public:
|
||||
SlidingWindow(thread_db* aTdbb, const BaseBufferedStream* aStream,
|
||||
const NestValueArray* aGroup, jrd_req* aRequest);
|
||||
const NestValueArray* aGroup, jrd_req* aRequest,
|
||||
FB_UINT64 aPartitionStart, FB_UINT64 aPartitionEnd,
|
||||
FB_UINT64 aOrderStart, FB_UINT64 aOrderEnd);
|
||||
~SlidingWindow();
|
||||
|
||||
FB_UINT64 getPartitionStart() const
|
||||
{
|
||||
return partitionStart;
|
||||
}
|
||||
|
||||
FB_UINT64 getPartitionEnd() const
|
||||
{
|
||||
return partitionEnd;
|
||||
}
|
||||
|
||||
FB_UINT64 getPartitionSize() const
|
||||
{
|
||||
return partitionEnd - partitionStart + 1;
|
||||
}
|
||||
|
||||
FB_UINT64 getOrderStart() const
|
||||
{
|
||||
return orderStart;
|
||||
}
|
||||
|
||||
FB_UINT64 getOrderEnd() const
|
||||
{
|
||||
return orderEnd;
|
||||
}
|
||||
|
||||
FB_UINT64 getOrderSize() const
|
||||
{
|
||||
return orderEnd - orderStart + 1;
|
||||
}
|
||||
|
||||
bool move(SINT64 delta);
|
||||
|
||||
private:
|
||||
@ -613,25 +645,37 @@ namespace Jrd
|
||||
const NestValueArray* group;
|
||||
jrd_req* request;
|
||||
Firebird::Array<impure_value> partitionKeys;
|
||||
bool moved;
|
||||
FB_UINT64 partitionStart;
|
||||
FB_UINT64 partitionEnd;
|
||||
FB_UINT64 orderStart;
|
||||
FB_UINT64 orderEnd;
|
||||
FB_UINT64 savedPosition;
|
||||
bool moved;
|
||||
};
|
||||
|
||||
class AggregatedStream : public RecordStream
|
||||
{
|
||||
enum State
|
||||
{
|
||||
STATE_PROCESS_EOF = 0, // We processed everything now process (EOF)
|
||||
STATE_PENDING, // Values are pending from a prior fetch
|
||||
STATE_EOF_FOUND, // We encountered EOF from the last attempted fetch
|
||||
STATE_GROUPING // Entering EVL group before fetching the first record
|
||||
STATE_EOF, // We processed everything now process EOF
|
||||
STATE_FETCHED, // Values are pending from a prior fetch
|
||||
STATE_GROUPING // Entering EVL group before fetching the first record
|
||||
};
|
||||
|
||||
struct Block
|
||||
{
|
||||
FB_UINT64 startPosition;
|
||||
FB_UINT64 endPosition;
|
||||
FB_UINT64 pending;
|
||||
};
|
||||
|
||||
struct Impure : public RecordSource::Impure
|
||||
{
|
||||
State state;
|
||||
FB_UINT64 pending;
|
||||
impure_value* impureValues;
|
||||
Block partitionBlock;
|
||||
Block orderBlock;
|
||||
State state;
|
||||
bool lastGroup;
|
||||
};
|
||||
|
||||
public:
|
||||
@ -661,7 +705,11 @@ namespace Jrd
|
||||
private:
|
||||
void init(thread_db* tdbb, CompilerScratch* csb);
|
||||
|
||||
State evaluateGroup(thread_db* tdbb, State state) const;
|
||||
bool evaluateGroup(thread_db* tdbb, AggType aggType, FB_UINT64 limit) const;
|
||||
void aggInit(thread_db* tdbb, jrd_req* request, AggType aggType) const;
|
||||
bool aggPass(thread_db* tdbb, jrd_req* request) const;
|
||||
void aggExecute(thread_db* tdbb, jrd_req* request) const;
|
||||
bool getNextRecord(thread_db* tdbb, jrd_req* request, FB_UINT64& limit) const;
|
||||
void cacheValues(thread_db* tdbb, jrd_req* request,
|
||||
const NestValueArray* group, unsigned impureOffset) const;
|
||||
bool lookForChange(thread_db* tdbb, jrd_req* request,
|
||||
|
@ -127,10 +127,17 @@ inline void impure_value::make_double(const double val)
|
||||
this->vlu_desc.dsc_address = reinterpret_cast<UCHAR*>(&this->vlu_misc.vlu_double);
|
||||
}
|
||||
|
||||
enum AggType
|
||||
{
|
||||
AGG_TYPE_GROUP,
|
||||
AGG_TYPE_ORDER
|
||||
};
|
||||
|
||||
struct impure_value_ex : public impure_value
|
||||
{
|
||||
SINT64 vlux_count;
|
||||
blb* vlu_blob;
|
||||
AggType aggType;
|
||||
};
|
||||
|
||||
const int VLU_computed = 1; // An invariant sub-query has been computed
|
||||
|
Loading…
Reference in New Issue
Block a user