8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 20:03:02 +01:00

Merge branch 'semi-joins-v5' into v5.0-release

This commit is contained in:
Dmitry Yemanov 2024-07-31 09:18:40 +03:00
commit dd1e11474d
11 changed files with 684 additions and 184 deletions

View File

@ -503,6 +503,26 @@
#OuterJoinConversion = true
# ----------------------------
# Defines whether subqueries inside IN/EXISTS predicates are attempted to be merged
# with the outer query and converted into semi-joins, if possible.
#
# Being an experimental feature, it's disabled by default.
# Enable to give it a try for a possibly better performance due to subquery being
# evaluated just once and then cached.
#
# CAUTION!
# There is no guarantee that this setting will be available in future Firebird
# versions. Once this feature is proved to be superior in most use cases,
# this setting will be removed and this conversion will become unconditional.
#
# Per-database configurable.
#
# Type: boolean
#
#SubQueryConversion = false
# ============================
# Plugin settings
# ============================

View File

@ -193,6 +193,7 @@ enum ConfigKey
KEY_MAX_PARALLEL_WORKERS,
KEY_OPTIMIZE_FOR_FIRST_ROWS,
KEY_OUTER_JOIN_CONVERSION,
KEY_SUBQUERY_CONVERSION,
MAX_CONFIG_KEY // keep it last
};
@ -312,7 +313,8 @@ constexpr ConfigEntry entries[MAX_CONFIG_KEY] =
{TYPE_INTEGER, "ParallelWorkers", true, 1},
{TYPE_INTEGER, "MaxParallelWorkers", true, 1},
{TYPE_BOOLEAN, "OptimizeForFirstRows", false, false},
{TYPE_BOOLEAN, "OuterJoinConversion", false, true}
{TYPE_BOOLEAN, "OuterJoinConversion", false, true},
{TYPE_BOOLEAN, "SubQueryConversion", false, false}
};
@ -648,6 +650,8 @@ public:
CONFIG_GET_PER_DB_BOOL(getOptimizeForFirstRows, KEY_OPTIMIZE_FOR_FIRST_ROWS);
CONFIG_GET_PER_DB_BOOL(getOuterJoinConversion, KEY_OUTER_JOIN_CONVERSION);
CONFIG_GET_PER_DB_BOOL(getSubQueryConversion, KEY_SUBQUERY_CONVERSION);
};
// Implementation of interface to access master configuration file

View File

@ -52,6 +52,128 @@ static void genDeliverUnmapped(CompilerScratch* csb, const BoolExprNodeStack& pa
static ValueExprNode* resolveUsingField(DsqlCompilerScratch* dsqlScratch, const MetaName& name,
ValueListNode* list, const FieldNode* flawedNode, const TEXT* side, dsql_ctx*& ctx);
namespace
{
// Search through the list of ANDed booleans to find comparisons
// referring streams of other select expressions.
// Extract those booleans and return them to the caller.
bool findDependentBooleans(CompilerScratch* csb,
const StreamList& rseStreams,
BoolExprNode** parentBoolean,
BoolExprNodeStack& booleanStack)
{
const auto boolean = *parentBoolean;
const auto binaryNode = nodeAs<BinaryBoolNode>(boolean);
if (binaryNode && binaryNode->blrOp == blr_and)
{
const bool found1 = findDependentBooleans(csb, rseStreams,
binaryNode->arg1.getAddress(), booleanStack);
const bool found2 = findDependentBooleans(csb, rseStreams,
binaryNode->arg2.getAddress(), booleanStack);
if (!binaryNode->arg1 && !binaryNode->arg2)
*parentBoolean = nullptr;
else if (!binaryNode->arg1)
*parentBoolean = binaryNode->arg2;
else if (!binaryNode->arg2)
*parentBoolean = binaryNode->arg1;
return (found1 || found2);
}
if (const auto cmpNode = nodeAs<ComparativeBoolNode>(boolean))
{
if (cmpNode->blrOp == blr_eql || cmpNode->blrOp == blr_equiv)
{
SortedStreamList streams;
cmpNode->collectStreams(streams);
for (const auto stream : streams)
{
if (!rseStreams.exist(stream))
{
booleanStack.push(boolean);
*parentBoolean = nullptr;
return true;
}
}
}
}
return false;
}
// Search through the list of ANDed booleans to find correlated EXISTS/IN sub-queries.
// They are candidates to be converted into semi- or anti-joins.
bool findPossibleJoins(CompilerScratch* csb,
BoolExprNode** parentBoolean,
RecordSourceNodeStack& rseStack,
BoolExprNodeStack& booleanStack)
{
auto boolNode = *parentBoolean;
const auto binaryNode = nodeAs<BinaryBoolNode>(boolNode);
if (binaryNode && binaryNode->blrOp == blr_and)
{
const bool found1 = findPossibleJoins(csb, binaryNode->arg1.getAddress(),
rseStack, booleanStack);
const bool found2 = findPossibleJoins(csb, binaryNode->arg2.getAddress(),
rseStack, booleanStack);
if (!binaryNode->arg1 && !binaryNode->arg2)
*parentBoolean = nullptr;
else if (!binaryNode->arg1)
*parentBoolean = binaryNode->arg2;
else if (!binaryNode->arg2)
*parentBoolean = binaryNode->arg1;
return (found1 || found2);
}
const auto rseNode = nodeAs<RseBoolNode>(boolNode);
// Both EXISTS (blr_any) and IN (blr_ansi_any) sub-queries are handled
if (rseNode && (rseNode->blrOp == blr_any || rseNode->blrOp == blr_ansi_any))
{
auto rse = rseNode->rse;
fb_assert(rse);
if (rse->rse_boolean && rse->rse_jointype == blr_inner &&
!rse->rse_first && !rse->rse_skip && !rse->rse_plan)
{
StreamList streams;
rse->computeRseStreams(streams);
BoolExprNodeStack booleans;
if (findDependentBooleans(csb, streams,
rse->rse_boolean.getAddress(),
booleans))
{
fb_assert(booleans.hasData());
auto boolean = booleans.pop();
while (booleans.hasData())
{
const auto andNode = FB_NEW_POOL(csb->csb_pool)
BinaryBoolNode(csb->csb_pool, blr_and);
andNode->arg1 = boolean;
andNode->arg2 = booleans.pop();
boolean = andNode;
}
rse->flags |= RseNode::FLAG_SEMI_JOINED;
rseStack.push(rse);
booleanStack.push(boolean);
*parentBoolean = nullptr;
return true;
}
}
}
return false;
}
}
//--------------------
@ -2783,6 +2905,9 @@ RseNode* RseNode::pass1(thread_db* tdbb, CompilerScratch* csb)
{
SET_TDBB(tdbb);
if (const auto newRse = processPossibleJoins(tdbb, csb))
return newRse->pass1(tdbb, csb);
// for scoping purposes, maintain a stack of RseNode's which are
// currently being parsed; if there are none on the stack as
// yet, mark the RseNode as variant to make sure that statement-
@ -2890,6 +3015,12 @@ void RseNode::pass1Source(thread_db* tdbb, CompilerScratch* csb, RseNode* rse,
{
const auto dbb = tdbb->getDatabase();
if (const auto newRse = processPossibleJoins(tdbb, csb))
{
newRse->pass1Source(tdbb, csb, rse, boolean, stack);
return;
}
if (rse_jointype != blr_inner && dbb->dbb_config->getOuterJoinConversion())
{
// Check whether any of the upper level booleans (those belonging to the WHERE clause)
@ -2943,7 +3074,7 @@ void RseNode::pass1Source(thread_db* tdbb, CompilerScratch* csb, RseNode* rse,
}
}
// in the case of an RseNode, it is possible that a new RseNode will be generated,
// In the case of an RseNode, it is possible that a new RseNode will be generated,
// so wait to process the source before we push it on the stack (bug 8039)
// The addition of the JOIN syntax for specifying inner joins causes an
@ -2951,7 +3082,7 @@ void RseNode::pass1Source(thread_db* tdbb, CompilerScratch* csb, RseNode* rse,
// where we are just trying to inner join more than 2 streams. If possible,
// try to flatten the tree out before we go any further.
if (!isLateral() &&
if (!isLateral() && !isSemiJoined() &&
rse->rse_jointype == blr_inner &&
rse_jointype == blr_inner &&
!rse_sorted && !rse_projection &&
@ -3056,11 +3187,11 @@ RecordSource* RseNode::compile(thread_db* tdbb, Optimizer* opt, bool innerSubStr
StreamStateHolder stateHolder(csb, opt->getOuterStreams());
if (opt->isLeftJoin() || isLateral())
if (opt->isLeftJoin() || isLateral() || isSemiJoined())
{
stateHolder.activate();
if (opt->isLeftJoin())
if (opt->isLeftJoin() || isSemiJoined())
{
// Push all conjuncts except "missing" ones (e.g. IS NULL)
for (auto iter = opt->getConjuncts(false, true); iter.hasData(); ++iter)
@ -3083,6 +3214,46 @@ RecordSource* RseNode::compile(thread_db* tdbb, Optimizer* opt, bool innerSubStr
return opt->compile(this, &conjunctStack);
}
RseNode* RseNode::processPossibleJoins(thread_db* tdbb, CompilerScratch* csb)
{
if (rse_jointype != blr_inner || !rse_boolean)
return nullptr;
const auto dbb = tdbb->getDatabase();
if (!dbb->dbb_config->getSubQueryConversion())
return nullptr;
RecordSourceNodeStack rseStack;
BoolExprNodeStack booleanStack;
// Find possibly joinable sub-queries
if (!findPossibleJoins(csb, rse_boolean.getAddress(), rseStack, booleanStack))
return nullptr;
fb_assert(rseStack.hasData() && booleanStack.hasData());
fb_assert(rseStack.getCount() == booleanStack.getCount());
// Create joins between the original node and detected joinable nodes
auto rse = this;
while (rseStack.hasData())
{
const auto newRse = FB_NEW_POOL(*tdbb->getDefaultPool())
RseNode(*tdbb->getDefaultPool());
newRse->rse_relations.add(rse);
newRse->rse_relations.add(rseStack.pop());
newRse->rse_jointype = blr_inner;
newRse->rse_boolean = booleanStack.pop();
rse = newRse;
}
return rse;
}
// Check that all streams in the RseNode have a plan specified for them.
// If they are not, there are streams in the RseNode which were not mentioned in the plan.
void RseNode::planCheck(const CompilerScratch* csb) const

View File

@ -731,7 +731,8 @@ public:
FLAG_DSQL_COMPARATIVE = 0x10, // transformed from DSQL ComparativeBoolNode
FLAG_LATERAL = 0x20, // lateral derived table
FLAG_SKIP_LOCKED = 0x40, // skip locked
FLAG_SUB_QUERY = 0x80 // sub-query
FLAG_SUB_QUERY = 0x80, // sub-query
FLAG_SEMI_JOINED = 0x100 // participates in semi-join
};
bool isInvariant() const
@ -759,6 +760,11 @@ public:
return (flags & FLAG_SUB_QUERY) != 0;
}
bool isSemiJoined() const
{
return (flags & FLAG_SEMI_JOINED) != 0;
}
bool hasWriteLock() const
{
return (flags & FLAG_WRITELOCK) != 0;
@ -863,6 +869,7 @@ public:
private:
void planCheck(const CompilerScratch* csb) const;
static void planSet(CompilerScratch* csb, PlanNode* plan);
RseNode* processPossibleJoins(thread_db* tdbb, CompilerScratch* csb);
public:
NestConst<ValueExprNode> dsqlFirst;

View File

@ -108,6 +108,8 @@ void InnerJoin::calculateStreamInfo()
innerStream->baseIndexes = candidate->indexes;
innerStream->baseUnique = candidate->unique;
innerStream->baseNavigated = candidate->navigated;
innerStream->baseMatches = candidate->matches;
innerStream->baseDependentFromStreams = candidate->dependentFromStreams;
csb->csb_rpt[innerStream->number].deactivate();
}
@ -579,13 +581,15 @@ River* InnerJoin::formRiver()
// Create a hash join
rsb = FB_NEW_POOL(getPool())
HashJoin(tdbb, csb, 2, hashJoinRsbs, keys.begin(), stream.selectivity);
HashJoin(tdbb, csb, INNER_JOIN, 2, hashJoinRsbs, keys.begin(), stream.selectivity);
// Clear priorly processed rsb's, as they're already incorporated into a hash join
rsbs.clear();
}
else
{
rsb = optimizer->generateRetrieval(stream.number, sortPtr, false, false);
}
rsbs.add(rsb);
streams.add(stream.number);

View File

@ -168,9 +168,14 @@ namespace
class CrossJoin : public River
{
public:
CrossJoin(CompilerScratch* csb, RiverList& rivers)
: River(csb, nullptr, rivers)
CrossJoin(Optimizer* opt, RiverList& rivers, JoinType joinType)
: River(opt->getCompilerScratch(), nullptr, rivers)
{
fb_assert(joinType != OUTER_JOIN);
const auto csb = opt->getCompilerScratch();
Optimizer::ConjunctIterator iter(opt->getBaseConjuncts());
// Save states of the underlying streams and restore them afterwards
StreamStateHolder stateHolder(csb, m_streams);
@ -182,57 +187,76 @@ namespace
if (riverCount == 1)
{
River* const sub_river = rivers.pop();
m_rsb = sub_river->getRecordSource();
const auto subRiver = rivers.pop();
const auto subRsb = subRiver->getRecordSource();
subRiver->activate(csb);
m_rsb = opt->applyBoolean(subRsb, iter);
}
else
{
HalfStaticArray<RecordSource*, OPT_STATIC_ITEMS> rsbs(riverCount);
// Reorder input rivers according to their possible inter-dependencies
while (rivers.hasData())
if (joinType == INNER_JOIN)
{
const auto orgCount = rsbs.getCount();
// Reorder input rivers according to their possible inter-dependencies
for (auto& subRiver : rivers)
while (rivers.hasData())
{
const auto subRsb = subRiver->getRecordSource();
fb_assert(!rsbs.exist(subRsb));
const auto orgCount = rsbs.getCount();
subRiver->activate(csb);
if (subRiver->isComputable(csb))
for (auto& subRiver : rivers)
{
rsbs.add(subRsb);
rivers.remove(&subRiver);
break;
auto subRsb = subRiver->getRecordSource();
subRiver->activate(csb);
subRsb = opt->applyBoolean(subRsb, iter);
if (subRiver->isComputable(csb))
{
rsbs.add(subRsb);
rivers.remove(&subRiver);
break;
}
subRiver->deactivate(csb);
}
subRiver->deactivate(csb);
if (rsbs.getCount() == orgCount)
break;
}
if (rsbs.getCount() == orgCount)
break;
}
if (rivers.hasData())
{
// Ideally, we should never get here. But just in case it happened, handle it.
for (auto& subRiver : rivers)
if (rivers.hasData())
{
const auto subRsb = subRiver->getRecordSource();
fb_assert(!rsbs.exist(subRsb));
// Ideally, we should never get here. But just in case it happened, handle it.
const auto pos = &subRiver - rivers.begin();
rsbs.insert(pos, subRsb);
for (auto& subRiver : rivers)
{
auto subRsb = subRiver->getRecordSource();
subRiver->activate(csb);
subRsb = opt->applyBoolean(subRsb, iter);
const auto pos = &subRiver - rivers.begin();
rsbs.insert(pos, subRsb);
}
rivers.clear();
}
}
else
{
for (const auto subRiver : rivers)
{
auto subRsb = subRiver->getRecordSource();
subRiver->activate(csb);
if (subRiver != rivers.front())
subRsb = opt->applyBoolean(subRsb, iter);
rsbs.add(subRsb);
}
rivers.clear();
}
m_rsb = FB_NEW_POOL(csb->csb_pool) NestedLoopJoin(csb, rsbs.getCount(), rsbs.begin());
m_rsb = FB_NEW_POOL(csb->csb_pool)
NestedLoopJoin(csb, rsbs.getCount(), rsbs.begin(), joinType);
}
}
};
@ -267,7 +291,6 @@ namespace
}
}
unsigned getRiverCount(unsigned count, const ValueExprNode* const* eq_class)
{
// Given an sort/merge join equivalence class (vector of node pointers
@ -672,7 +695,7 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
// AB: If we have limit our retrieval with FIRST / SKIP syntax then
// we may not deliver above conditions (from higher rse's) to this
// rse, because the results should be consistent.
if (rse->rse_skip || rse->rse_first)
if (rse->rse_skip || rse->rse_first || isSemiJoined())
parentStack = nullptr;
// Set base-point before the parent/distributed nodes begin.
@ -814,14 +837,25 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
// Go through the record selection expression generating
// record source blocks for all streams
RiverList rivers, dependentRivers;
RiverList rivers, dependentRivers, activateRivers;
bool semiJoin = false;
bool innerSubStream = false;
for (auto node : rse->rse_relations)
{
fb_assert(sort == rse->rse_sorted);
fb_assert(aggregate == rse->rse_aggregate);
const auto subRse = nodeAs<RseNode>(node);
if (subRse && subRse->isSemiJoined())
{
fb_assert(rse->rse_jointype == blr_inner);
semiJoin = true;
}
else
fb_assert(!semiJoin);
// Find the stream number and place it at the end of the bedStreams array
// (if this is really a stream and not another RseNode)
@ -861,6 +895,9 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
{
outerStreams.join(localStreams);
rivers.add(river);
if (!semiJoin)
activateRivers.add(river);
}
else
{
@ -869,6 +906,7 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
}
else
{
fb_assert(!semiJoin);
// We have a relation, just add its stream
fb_assert(bedStreams.hasData());
outerStreams.add(bedStreams.back());
@ -883,11 +921,6 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
if (compileStreams.getCount() > 5)
CCH_expand(tdbb, (ULONG) (compileStreams.getCount() * CACHE_PAGES_PER_STREAM));
// At this point we are ready to start optimizing.
// We will use the opt block to hold information of
// a global nature, meaning that it needs to stick
// around for the rest of the optimization process.
// Attempt to optimize aggregates via an index, if possible
if (aggregate && !sort)
sort = aggregate;
@ -895,7 +928,7 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
rse->rse_aggregate = aggregate = nullptr;
// Activate the priorly used rivers
for (const auto river : rivers)
for (const auto river : activateRivers)
river->activate(csb);
bool sortCanBeUsed = true;
@ -921,6 +954,19 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
}
else
{
// Compile the main streams before processing the semi-join itself
if (semiJoin && compileStreams.hasData())
{
generateInnerJoin(compileStreams, rivers, &sort, rse->rse_plan);
fb_assert(compileStreams.isEmpty());
// Ensure the main query river is stored before the semi-joined ones
const auto river = rivers.pop();
rivers.insert(0, river);
}
const JoinType joinType = semiJoin ? SEMI_JOIN : INNER_JOIN;
// AB: If previous rsb's are already on the stack we can't use
// a navigational-retrieval for an ORDER BY because the next
// streams are JOINed to the previous ones
@ -931,7 +977,7 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
// AB: We could already have multiple rivers at this
// point so try to do some hashing or sort/merging now.
while (generateEquiJoin(rivers))
while (generateEquiJoin(rivers, joinType))
;
}
@ -968,7 +1014,7 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
// Generate one river which holds a cross join rsb between
// all currently available rivers
rivers.add(FB_NEW_POOL(getPool()) CrossJoin(csb, rivers));
rivers.add(FB_NEW_POOL(getPool()) CrossJoin(this, rivers, joinType));
rivers.back()->activate(csb);
}
else
@ -993,11 +1039,11 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
river->activate(csb);
// If there are multiple rivers, try some hashing or sort/merging
while (generateEquiJoin(rivers))
while (generateEquiJoin(rivers, joinType))
;
rivers.join(dependentRivers);
rsb = CrossJoin(csb, rivers).getRecordSource();
rsb = CrossJoin(this, rivers, joinType).getRecordSource();
// Pick up any residual boolean that may have fallen thru the cracks
rsb = generateResidualBoolean(rsb);
@ -1622,6 +1668,51 @@ SortedStream* Optimizer::generateSort(const StreamList& streams,
}
//
// Compose a filter including all computable booleans
//
RecordSource* Optimizer::applyBoolean(RecordSource* rsb,
ConjunctIterator& iter)
{
BoolExprNode* boolean = nullptr;
double selectivity = MAXIMUM_SELECTIVITY;
for (iter.rewind(); iter.hasData(); ++iter)
{
if (!(iter & CONJUNCT_USED) &&
!(iter->nodFlags & ExprNode::FLAG_RESIDUAL) &&
iter->computable(csb, INVALID_STREAM, false))
{
compose(getPool(), &boolean, iter);
iter |= CONJUNCT_USED;
selectivity *= getSelectivity(*iter);
}
}
return boolean ? FB_NEW_POOL(getPool()) FilteredStream(csb, rsb, boolean, selectivity) : rsb;
}
//
// Find conjuncts local to the given river and compose an appropriate filter
//
RecordSource* Optimizer::applyLocalBoolean(RecordSource* rsb,
const StreamList& streams,
ConjunctIterator& iter)
{
StreamStateHolder globalHolder(csb);
globalHolder.deactivate();
StreamStateHolder localHolder(csb, streams);
localHolder.activate(csb);
return applyBoolean(rsb, iter);
}
//
// Check to make sure that the user-specified indices were actually utilized by the optimizer
//
@ -2253,16 +2344,38 @@ void Optimizer::formRivers(const StreamList& streams,
// If the whole things is a moby no-op, return false.
//
bool Optimizer::generateEquiJoin(RiverList& orgRivers)
bool Optimizer::generateEquiJoin(RiverList& rivers, JoinType joinType)
{
fb_assert(joinType != OUTER_JOIN);
ULONG selected_rivers[OPT_STREAM_BITS], selected_rivers2[OPT_STREAM_BITS];
ValueExprNode** eq_class;
RiverList orgRivers(rivers);
// Find dependent rivers and exclude them from processing
for (River** iter = orgRivers.begin(); iter < orgRivers.end();)
{
const auto river = *iter;
StreamStateHolder stateHolder2(csb, river->getStreams());
stateHolder2.activate();
if (river->isComputable(csb))
{
iter++;
continue;
}
orgRivers.remove(iter);
}
// Count the number of "rivers" involved in the operation, then allocate
// a scratch block large enough to hold values to compute equality
// classes.
const unsigned orgCount = (unsigned) orgRivers.getCount();
const auto orgCount = (unsigned) orgRivers.getCount();
if (orgCount < 2)
return false;
@ -2369,7 +2482,7 @@ bool Optimizer::generateEquiJoin(RiverList& orgRivers)
// Prepare rivers for joining
StreamList streams;
RiverList rivers;
RiverList joinedRivers;
HalfStaticArray<NestValueArray*, OPT_STATIC_ITEMS> keys;
unsigned position = 0, maxCardinalityPosition = 0, lowestPosition = MAX_ULONG;
double maxCardinality1 = 0, maxCardinality2 = 0;
@ -2398,13 +2511,13 @@ bool Optimizer::generateEquiJoin(RiverList& orgRivers)
{
maxCardinality2 = maxCardinality1;
maxCardinality1 = cardinality;
maxCardinalityPosition = rivers.getCount();
maxCardinalityPosition = joinedRivers.getCount();
}
else if (cardinality > maxCardinality2)
maxCardinality2 = cardinality;
streams.join(river->getStreams());
rivers.add(river);
joinedRivers.add(river);
orgRivers.remove(iter);
// Collect keys to join on
@ -2427,10 +2540,11 @@ bool Optimizer::generateEquiJoin(RiverList& orgRivers)
HalfStaticArray<RecordSource*, OPT_STATIC_ITEMS> rsbs;
RecordSource* finalRsb = nullptr;
if (useMergeJoin)
// MERGE JOIN does not support other join types yet
if (useMergeJoin && joinType == INNER_JOIN)
{
position = 0;
for (const auto river : rivers)
for (const auto river : joinedRivers)
{
const auto sort = FB_NEW_POOL(getPool()) SortNode(getPool());
@ -2454,29 +2568,36 @@ bool Optimizer::generateEquiJoin(RiverList& orgRivers)
}
else
{
// Ensure that the largest river is placed at the first position.
// It's important for a hash join to be efficient.
if (joinType == INNER_JOIN)
{
// Ensure that the largest river is placed at the first position.
// It's important for a hash join to be efficient.
const auto maxCardinalityRiver = rivers[maxCardinalityPosition];
rivers[maxCardinalityPosition] = rivers[0];
rivers[0] = maxCardinalityRiver;
const auto maxCardinalityRiver = joinedRivers[maxCardinalityPosition];
joinedRivers[maxCardinalityPosition] = joinedRivers[0];
joinedRivers[0] = maxCardinalityRiver;
const auto maxCardinalityKey = keys[maxCardinalityPosition];
keys[maxCardinalityPosition] = keys[0];
keys[0] = maxCardinalityKey;
const auto maxCardinalityKey = keys[maxCardinalityPosition];
keys[maxCardinalityPosition] = keys[0];
keys[0] = maxCardinalityKey;
}
for (const auto river : rivers)
for (const auto river : joinedRivers)
rsbs.add(river->getRecordSource());
finalRsb = FB_NEW_POOL(getPool())
HashJoin(tdbb, csb, rsbs.getCount(), rsbs.begin(), keys.begin());
HashJoin(tdbb, csb, joinType, rsbs.getCount(), rsbs.begin(), keys.begin());
}
// Pick up any boolean that may apply
finalRsb = applyLocalBoolean(finalRsb, streams, iter);
const auto finalRiver = FB_NEW_POOL(getPool()) River(csb, finalRsb, rivers);
orgRivers.insert(lowestPosition, finalRiver);
const auto finalRiver = FB_NEW_POOL(getPool()) River(csb, finalRsb, joinedRivers);
for (const auto river : joinedRivers)
rivers.findAndRemove(river);
rivers.insert(lowestPosition, finalRiver);
return true;
}
@ -2599,8 +2720,7 @@ RecordSource* Optimizer::generateOuterJoin(RiverList& rivers,
// Allocate and fill in the rsb
return FB_NEW_POOL(getPool())
NestedLoopJoin(csb, stream_o.stream_rsb, stream_i.stream_rsb,
boolean, OUTER_JOIN);
NestedLoopJoin(csb, stream_o.stream_rsb, stream_i.stream_rsb, boolean);
}
bool hasOuterRsb = true, hasInnerRsb = true;
@ -2623,7 +2743,7 @@ RecordSource* Optimizer::generateOuterJoin(RiverList& rivers,
const auto innerRsb = generateResidualBoolean(stream_i.stream_rsb);
const auto rsb1 = FB_NEW_POOL(getPool())
NestedLoopJoin(csb, stream_o.stream_rsb, innerRsb, boolean, OUTER_JOIN);
NestedLoopJoin(csb, stream_o.stream_rsb, innerRsb, boolean);
for (auto iter = getConjuncts(); iter.hasData(); ++iter)
{
@ -2653,10 +2773,13 @@ RecordSource* Optimizer::generateOuterJoin(RiverList& rivers,
const auto outerRsb = generateResidualBoolean(stream_o.stream_rsb);
const auto rsb2 = FB_NEW_POOL(getPool())
NestedLoopJoin(csb, stream_i.stream_rsb, outerRsb, boolean, ANTI_JOIN);
StreamList outerStreams;
outerRsb->findUsedStreams(outerStreams);
return FB_NEW_POOL(getPool()) FullOuterJoin(csb, rsb1, rsb2);
const auto rsb2 = FB_NEW_POOL(getPool())
NestedLoopJoin(csb, stream_i.stream_rsb, outerRsb, boolean);
return FB_NEW_POOL(getPool()) FullOuterJoin(csb, rsb1, rsb2, outerStreams);
}
@ -2882,41 +3005,6 @@ RecordSource* Optimizer::generateRetrieval(StreamType stream,
}
//
// Find conjuncts local to the given river and compose an appropriate filter
//
RecordSource* Optimizer::applyLocalBoolean(RecordSource* rsb,
const StreamList& streams,
ConjunctIterator& iter)
{
StreamStateHolder globalHolder(csb);
globalHolder.deactivate();
StreamStateHolder localHolder(csb, streams);
localHolder.activate(csb);
BoolExprNode* boolean = nullptr;
double selectivity = MAXIMUM_SELECTIVITY;
for (iter.rewind(); iter.hasData(); ++iter)
{
if (!(iter & CONJUNCT_USED) &&
!(iter->nodFlags & ExprNode::FLAG_RESIDUAL) &&
iter->computable(csb, INVALID_STREAM, false))
{
compose(getPool(), &boolean, iter);
iter |= CONJUNCT_USED;
if (!(iter & (CONJUNCT_MATCHED | CONJUNCT_JOINED)))
selectivity *= getSelectivity(*iter);
}
}
return boolean ? FB_NEW_POOL(getPool()) FilteredStream(csb, rsb, boolean, selectivity) : rsb;
}
//
// Check whether the given boolean can be involved in a equi-join relationship
//

View File

@ -40,6 +40,7 @@
#include "../dsql/ExprNodes.h"
#include "../jrd/RecordSourceNodes.h"
#include "../jrd/exe.h"
#include "../jrd/recsrc/RecordSource.h"
namespace Jrd {
@ -494,9 +495,16 @@ public:
return firstRows;
}
bool isSemiJoined() const
{
return (rse->flags & RseNode::FLAG_SEMI_JOINED) != 0;
}
RecordSource* applyBoolean(RecordSource* rsb, ConjunctIterator& iter);
RecordSource* applyLocalBoolean(RecordSource* rsb,
const StreamList& streams,
ConjunctIterator& iter);
bool checkEquiJoin(BoolExprNode* boolean);
bool getEquiJoinKeys(BoolExprNode* boolean,
NestConst<ValueExprNode>* node1,
@ -521,7 +529,7 @@ private:
RiverList& rivers,
SortNode** sortClause,
const PlanNode* planClause);
bool generateEquiJoin(RiverList& org_rivers);
bool generateEquiJoin(RiverList& rivers, JoinType joinType = INNER_JOIN);
void generateInnerJoin(StreamList& streams,
RiverList& rivers,
SortNode** sortClause,
@ -778,7 +786,8 @@ class InnerJoin : private Firebird::PermanentStorage
{
public:
StreamInfo(MemoryPool& p, StreamType num)
: number(num), indexedRelationships(p)
: number(num), baseMatches(p), baseDependentFromStreams(p),
indexedRelationships(p)
{}
bool isIndependent() const
@ -825,6 +834,9 @@ class InnerJoin : private Firebird::PermanentStorage
bool used = false;
unsigned previousExpectedStreams = 0;
MatchedBooleanList baseMatches;
SortedStreamList baseDependentFromStreams;
IndexedRelationships indexedRelationships;
};

View File

@ -37,10 +37,13 @@ using namespace Jrd;
// Data access: full outer join
// ----------------------------
FullOuterJoin::FullOuterJoin(CompilerScratch* csb, RecordSource* arg1, RecordSource* arg2)
FullOuterJoin::FullOuterJoin(CompilerScratch* csb,
RecordSource* arg1, RecordSource* arg2,
const StreamList& checkStreams)
: RecordSource(csb),
m_arg1(arg1),
m_arg2(arg2)
m_arg2(arg2),
m_checkStreams(csb->csb_pool, checkStreams)
{
fb_assert(m_arg1 && m_arg2);
@ -97,7 +100,27 @@ bool FullOuterJoin::internalGetRecord(thread_db* tdbb) const
m_arg2->open(tdbb);
}
return m_arg2->getRecord(tdbb);
// We should exclude matching records from the right-joined (second) record source,
// as they're already returned from the left-joined (first) record source
while (m_arg2->getRecord(tdbb))
{
bool matched = false;
for (const auto i : m_checkStreams)
{
if (request->req_rpb[i].rpb_number.isValid())
{
matched = true;
break;
}
}
if (!matched)
return true;
}
return false;
}
bool FullOuterJoin::refetchRecord(thread_db* /*tdbb*/) const

View File

@ -37,13 +37,15 @@
using namespace Firebird;
using namespace Jrd;
//#define PRINT_HASH_TABLE
// ----------------------
// Data access: hash join
// ----------------------
// NS: FIXME - Why use static hash table here??? Hash table shall support dynamic resizing
static const ULONG HASH_SIZE = 1009;
static const ULONG BUCKET_PREALLOCATE_SIZE = 32; // 256 bytes per slot
static const ULONG BUCKET_PREALLOCATE_SIZE = 32; // 256 bytes per bucket
unsigned HashJoin::maxCapacity()
{
@ -92,6 +94,11 @@ class HashJoin::HashTable : public PermanentStorage
m_collisions.sort();
}
ULONG getCount() const
{
return (ULONG) m_collisions.getCount();
}
void add(ULONG hash, ULONG position)
{
m_collisions.add(Entry(hash, position));
@ -202,11 +209,36 @@ public:
{
for (ULONG i = 0; i < m_streamCount * m_tableSize; i++)
{
CollisionList* const collisions = m_collisions[i];
if (collisions)
if (const auto collisions = m_collisions[i])
collisions->sort();
}
#ifdef PRINT_HASH_TABLE
FB_UINT64 total = 0;
ULONG min = MAX_ULONG, max = 0, count = 0;
for (ULONG i = 0; i < m_streamCount * m_tableSize; i++)
{
CollisionList* const collisions = m_collisions[i];
if (!collisions)
continue;
const auto cnt = collisions->getCount();
if (cnt < min)
min = cnt;
if (cnt > max)
max = cnt;
total += cnt;
count++;
}
if (count)
{
printf("Hash table size %u, count %u, buckets %u, min %u, max %u, avg %u\n",
m_tableSize, (ULONG) total, count, min, max, (ULONG) (total / count));
}
#endif
}
private:
@ -217,14 +249,35 @@ private:
};
HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count,
RecordSource* const* args, NestValueArray* const* keys,
HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, JoinType joinType,
FB_SIZE_T count, RecordSource* const* args, NestValueArray* const* keys,
double selectivity)
: RecordSource(csb),
m_joinType(joinType),
m_boolean(nullptr),
m_args(csb->csb_pool, count - 1)
{
fb_assert(count >= 2);
init(tdbb, csb, count, args, keys, selectivity);
}
HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb,
BoolExprNode* boolean,
RecordSource* const* args, NestValueArray* const* keys,
double selectivity)
: RecordSource(csb),
m_joinType(OUTER_JOIN),
m_boolean(boolean),
m_args(csb->csb_pool, 1)
{
init(tdbb, csb, 2, args, keys, selectivity);
}
void HashJoin::init(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count,
RecordSource* const* args, NestValueArray* const* keys,
double selectivity)
{
m_impure = csb->allocImpure<Impure>();
m_leader.source = args[0];
@ -360,6 +413,8 @@ bool HashJoin::internalGetRecord(thread_db* tdbb) const
if (!(impure->irsb_flags & irsb_open))
return false;
const auto inner = m_args.front().source;
while (true)
{
if (impure->irsb_flags & irsb_mustread)
@ -369,6 +424,14 @@ bool HashJoin::internalGetRecord(thread_db* tdbb) const
if (!m_leader.source->getRecord(tdbb))
return false;
if (m_boolean && !m_boolean->execute(tdbb, request))
{
// The boolean pertaining to the left sub-stream is false
// so just join sub-stream to a null valued right sub-stream
inner->nullRecords(tdbb);
return true;
}
// We have something to join with, so ensure the hash table is initialized
if (!impure->irsb_hash_table && !impure->irsb_leader_buffer)
@ -410,7 +473,15 @@ bool HashJoin::internalGetRecord(thread_db* tdbb) const
// Setup the hash table for the iteration through collisions.
if (!impure->irsb_hash_table->setup(impure->irsb_leader_hash))
continue;
{
if (m_joinType == INNER_JOIN || m_joinType == SEMI_JOIN)
continue;
if (m_joinType == OUTER_JOIN)
inner->nullRecords(tdbb);
return true;
}
impure->irsb_flags &= ~irsb_mustread;
impure->irsb_flags |= irsb_first;
@ -434,13 +505,29 @@ bool HashJoin::internalGetRecord(thread_db* tdbb) const
if (!found)
{
impure->irsb_flags |= irsb_mustread;
continue;
if (m_joinType == INNER_JOIN || m_joinType == SEMI_JOIN)
continue;
if (m_joinType == OUTER_JOIN)
inner->nullRecords(tdbb);
break;
}
if (m_joinType == SEMI_JOIN || m_joinType == ANTI_JOIN)
{
impure->irsb_flags |= irsb_mustread;
if (m_joinType == ANTI_JOIN)
continue;
}
impure->irsb_flags &= ~irsb_first;
}
else if (!fetchRecord(tdbb, impure, m_args.getCount() - 1))
{
fb_assert(m_joinType == INNER_JOIN);
impure->irsb_flags |= irsb_mustread;
continue;
}
@ -473,7 +560,30 @@ void HashJoin::print(thread_db* tdbb, string& plan, bool detailed, unsigned leve
{
if (detailed)
{
plan += printIndent(++level) + "Hash Join (inner)";
plan += printIndent(++level) + "Hash Join ";
switch (m_joinType)
{
case INNER_JOIN:
plan += "(inner)";
break;
case OUTER_JOIN:
plan += "(outer)";
break;
case SEMI_JOIN:
plan += "(semi)";
break;
case ANTI_JOIN:
plan += "(anti)";
break;
default:
fb_assert(false);
}
printOptInfo(plan);
if (recurse)
@ -627,6 +737,9 @@ bool HashJoin::fetchRecord(thread_db* tdbb, Impure* impure, FB_SIZE_T stream) co
return true;
}
if (m_joinType == SEMI_JOIN || m_joinType == ANTI_JOIN)
return false;
while (true)
{
if (stream == 0 || !fetchRecord(tdbb, impure, stream - 1))

View File

@ -35,30 +35,33 @@ using namespace Jrd;
// Data access: nested loops join
// ------------------------------
NestedLoopJoin::NestedLoopJoin(CompilerScratch* csb, FB_SIZE_T count, RecordSource* const* args)
NestedLoopJoin::NestedLoopJoin(CompilerScratch* csb,
FB_SIZE_T count,
RecordSource* const* args,
JoinType joinType)
: RecordSource(csb),
m_joinType(INNER_JOIN),
m_args(csb->csb_pool),
m_boolean(NULL)
m_joinType(joinType),
m_boolean(nullptr),
m_args(csb->csb_pool, count)
{
m_impure = csb->allocImpure<Impure>();
m_cardinality = MINIMUM_CARDINALITY;
m_args.resize(count);
for (FB_SIZE_T i = 0; i < count; i++)
{
m_args[i] = args[i];
m_args.add(args[i]);
m_cardinality *= args[i]->getCardinality();
}
}
NestedLoopJoin::NestedLoopJoin(CompilerScratch* csb, RecordSource* outer, RecordSource* inner,
BoolExprNode* boolean, JoinType joinType)
NestedLoopJoin::NestedLoopJoin(CompilerScratch* csb,
RecordSource* outer,
RecordSource* inner,
BoolExprNode* boolean)
: RecordSource(csb),
m_joinType(joinType),
m_args(csb->csb_pool),
m_boolean(boolean)
m_joinType(OUTER_JOIN),
m_boolean(boolean),
m_args(csb->csb_pool, 2)
{
fb_assert(outer && inner);
@ -90,8 +93,8 @@ void NestedLoopJoin::close(thread_db* tdbb) const
{
impure->irsb_flags &= ~irsb_open;
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++)
m_args[i]->close(tdbb);
for (const auto arg : m_args)
arg->close(tdbb);
}
}
@ -127,12 +130,70 @@ bool NestedLoopJoin::internalGetRecord(thread_db* tdbb) const
else if (!fetchRecord(tdbb, m_args.getCount() - 1))
return false;
}
else if (m_joinType == SEMI_JOIN || m_joinType == ANTI_JOIN)
{
const auto outer = m_args[0];
if (impure->irsb_flags & irsb_first)
{
outer->open(tdbb);
impure->irsb_flags &= ~irsb_first;
}
while (true)
{
if (impure->irsb_flags & irsb_joined)
{
for (FB_SIZE_T i = 1; i < m_args.getCount(); i++)
m_args[i]->close(tdbb);
impure->irsb_flags &= ~irsb_joined;
}
if (!outer->getRecord(tdbb))
return false;
FB_SIZE_T stopArg = 0;
for (FB_SIZE_T i = 1; i < m_args.getCount(); i++)
{
m_args[i]->open(tdbb);
if (m_args[i]->getRecord(tdbb))
{
if (m_joinType == ANTI_JOIN)
{
stopArg = i;
break;
}
}
else
{
if (m_joinType == SEMI_JOIN)
{
stopArg = i;
break;
}
}
}
if (!stopArg)
break;
for (FB_SIZE_T i = 1; i <= stopArg; i++)
m_args[i]->close(tdbb);
}
impure->irsb_flags |= irsb_joined;
}
else
{
fb_assert(m_joinType == OUTER_JOIN);
fb_assert(m_args.getCount() == 2);
const RecordSource* const outer = m_args[0];
const RecordSource* const inner = m_args[1];
const auto outer = m_args[0];
const auto inner = m_args[1];
if (impure->irsb_flags & irsb_first)
{
@ -159,27 +220,10 @@ bool NestedLoopJoin::internalGetRecord(thread_db* tdbb) const
inner->open(tdbb);
}
if (m_joinType == SEMI_JOIN)
if (inner->getRecord(tdbb))
{
if (inner->getRecord(tdbb))
impure->irsb_flags &= ~irsb_joined;
else
impure->irsb_flags |= irsb_joined;
}
else if (m_joinType == ANTI_JOIN)
{
if (inner->getRecord(tdbb))
impure->irsb_flags |= irsb_joined;
else
impure->irsb_flags &= ~irsb_joined;
}
else
{
if (inner->getRecord(tdbb))
{
impure->irsb_flags |= irsb_joined;
return true;
}
impure->irsb_flags |= irsb_joined;
return true;
}
inner->close(tdbb);
@ -210,8 +254,8 @@ WriteLockResult NestedLoopJoin::lockRecord(thread_db* /*tdbb*/) const
void NestedLoopJoin::getChildren(Array<const RecordSource*>& children) const
{
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++)
children.add(m_args[i]);
for (const auto arg : m_args)
children.add(arg);
}
void NestedLoopJoin::print(thread_db* tdbb, string& plan, bool detailed, unsigned level, bool recurse) const
@ -248,20 +292,20 @@ void NestedLoopJoin::print(thread_db* tdbb, string& plan, bool detailed, unsigne
if (recurse)
{
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++)
m_args[i]->print(tdbb, plan, true, level, recurse);
for (const auto arg : m_args)
arg->print(tdbb, plan, true, level, recurse);
}
}
else
{
level++;
plan += "JOIN (";
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++)
for (const auto arg : m_args)
{
if (i)
if (arg != m_args.front())
plan += ", ";
m_args[i]->print(tdbb, plan, false, level, recurse);
arg->print(tdbb, plan, false, level, recurse);
}
plan += ")";
}
@ -270,26 +314,26 @@ void NestedLoopJoin::print(thread_db* tdbb, string& plan, bool detailed, unsigne
void NestedLoopJoin::markRecursive()
{
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++)
m_args[i]->markRecursive();
for (auto arg : m_args)
arg->markRecursive();
}
void NestedLoopJoin::findUsedStreams(StreamList& streams, bool expandAll) const
{
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++)
m_args[i]->findUsedStreams(streams, expandAll);
for (const auto arg : m_args)
arg->findUsedStreams(streams, expandAll);
}
void NestedLoopJoin::invalidateRecords(Request* request) const
{
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++)
m_args[i]->invalidateRecords(request);
for (const auto arg : m_args)
arg->invalidateRecords(request);
}
void NestedLoopJoin::nullRecords(thread_db* tdbb) const
{
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++)
m_args[i]->nullRecords(tdbb);
for (const auto arg : m_args)
arg->nullRecords(tdbb);
}
bool NestedLoopJoin::fetchRecord(thread_db* tdbb, FB_SIZE_T n) const

View File

@ -1115,9 +1115,10 @@ namespace Jrd
class NestedLoopJoin : public RecordSource
{
public:
NestedLoopJoin(CompilerScratch* csb, FB_SIZE_T count, RecordSource* const* args);
NestedLoopJoin(CompilerScratch* csb, FB_SIZE_T count, RecordSource* const* args,
JoinType joinType = INNER_JOIN);
NestedLoopJoin(CompilerScratch* csb, RecordSource* outer, RecordSource* inner,
BoolExprNode* boolean, JoinType joinType);
BoolExprNode* boolean);
void close(thread_db* tdbb) const override;
@ -1143,14 +1144,16 @@ namespace Jrd
bool fetchRecord(thread_db*, FB_SIZE_T) const;
const JoinType m_joinType;
const NestConst<BoolExprNode> m_boolean;
Firebird::Array<NestConst<RecordSource> > m_args;
NestConst<BoolExprNode> const m_boolean;
};
class FullOuterJoin : public RecordSource
{
public:
FullOuterJoin(CompilerScratch* csb, RecordSource* arg1, RecordSource* arg2);
FullOuterJoin(CompilerScratch* csb, RecordSource* arg1, RecordSource* arg2,
const StreamList& checkStreams);
void close(thread_db* tdbb) const override;
@ -1175,6 +1178,7 @@ namespace Jrd
private:
NestConst<RecordSource> m_arg1;
NestConst<RecordSource> m_arg2;
const StreamList m_checkStreams;
};
class HashJoin : public RecordSource
@ -1202,7 +1206,11 @@ namespace Jrd
};
public:
HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count,
HashJoin(thread_db* tdbb, CompilerScratch* csb, JoinType joinType,
FB_SIZE_T count, RecordSource* const* args, NestValueArray* const* keys,
double selectivity = 0);
HashJoin(thread_db* tdbb, CompilerScratch* csb,
BoolExprNode* boolean,
RecordSource* const* args, NestValueArray* const* keys,
double selectivity = 0);
@ -1229,10 +1237,16 @@ namespace Jrd
bool internalGetRecord(thread_db* tdbb) const override;
private:
void init(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count,
RecordSource* const* args, NestValueArray* const* keys,
double selectivity);
ULONG computeHash(thread_db* tdbb, Request* request,
const SubStream& sub, UCHAR* buffer) const;
bool fetchRecord(thread_db* tdbb, Impure* impure, FB_SIZE_T stream) const;
const JoinType m_joinType;
const NestConst<BoolExprNode> m_boolean;
SubStream m_leader;
Firebird::Array<SubStream> m_args;
};