8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 21:23:04 +01:00

Frontported sorting improvement from RedDb/HQbird, this also resolves CORE-2650

This commit is contained in:
Dmitry Yemanov 2020-12-17 11:09:50 +03:00
parent d3eb8b1f6f
commit 8e692d2374
10 changed files with 333 additions and 97 deletions

View File

@ -433,6 +433,18 @@
#
#MaxIdentifierCharLength = 63
# ----------------------------
# Threshold that controls whether to store non-key fields inside the sort block
# or refetch them from data pages after the sorting.
#
# Defines the maximum sort record size (in bytes) that can be stored inline,
# i.e. inside the sort block. Zero means that records are always refetched.
#
# Per-database configurable.
#
# Type: integer
#
#InlineSortThreshold = 1000
# ----------------------------
#

View File

@ -408,6 +408,8 @@ void Config::checkValues()
checkIntForLoBound(KEY_TIP_CACHE_BLOCK_SIZE, 1, true);
checkIntForHiBound(KEY_TIP_CACHE_BLOCK_SIZE, MAX_ULONG, true);
checkIntForLoBound(KEY_INLINE_SORT_THRESHOLD, 0, true);
}

View File

@ -185,6 +185,7 @@ enum ConfigKey
KEY_CLEAR_GTT_RETAINING,
KEY_DATA_TYPE_COMPATIBILITY,
KEY_USE_FILESYSTEM_CACHE,
KEY_INLINE_SORT_THRESHOLD,
MAX_CONFIG_KEY // keep it last
};
@ -293,12 +294,13 @@ constexpr ConfigEntry entries[MAX_CONFIG_KEY] =
#endif
{TYPE_INTEGER, "ExtConnPoolSize", true, 0},
{TYPE_INTEGER, "ExtConnPoolLifeTime", true, 7200},
{TYPE_INTEGER, "SnapshotsMemSize", false, 65536}, // bytes,
{TYPE_INTEGER, "TipCacheBlockSize", false, 4194304}, // bytes,
{TYPE_INTEGER, "SnapshotsMemSize", false, 65536}, // bytes
{TYPE_INTEGER, "TipCacheBlockSize", false, 4194304}, // bytes
{TYPE_BOOLEAN, "ReadConsistency", false, true},
{TYPE_BOOLEAN, "ClearGTTAtRetaining", false, false},
{TYPE_STRING, "DataTypeCompatibility", false, nullptr},
{TYPE_BOOLEAN, "UseFileSystemCache", false, true}
{TYPE_BOOLEAN, "UseFileSystemCache", false, true},
{TYPE_INTEGER, "InlineSortThreshold", false, 1000}, // bytes
};
@ -619,6 +621,8 @@ public:
CONFIG_GET_PER_DB_STR(getDataTypeCompatibility, KEY_DATA_TYPE_COMPATIBILITY);
bool getUseFileSystemCache(bool* pPresent = nullptr) const;
CONFIG_GET_PER_DB_KEY(ULONG, getInlineSortThreshold, KEY_INLINE_SORT_THRESHOLD, getInt);
};
// Implementation of interface to access master configuration file

View File

@ -851,7 +851,7 @@ void OptimizerRetrieval::analyzeNavigation(const InversionCandidateList& inversi
// then don't consider any (possibly better) alternatives.
// Another exception is when the FIRST ROWS optimization strategy is applied.
if (candidate && !optimizer->optimizeFirstRows &&
if (candidate && !optimizer->favorFirstRows &&
!(indexScratch->idx->idx_runtime_flags & idx_plan_navigate))
{
for (const InversionCandidate* const* iter = inversions.begin();
@ -2886,7 +2886,7 @@ StreamType OptimizerInnerJoin::findJoinOrder()
const int currentFilter = innerStreams[i]->isFiltered() ? 1 : 0;
if (!optimizer->optimizeFirstRows || !navigations ||
if (!optimizer->favorFirstRows || !navigations ||
(innerStreams[i]->baseNavigated && currentFilter == filters))
{
indexedRelationships.clear();

View File

@ -113,6 +113,20 @@ namespace
*node1 = (*node1) ? FB_NEW_POOL(pool) BinaryBoolNode(pool, blr_and, *node1, node2) : node2;
}
struct SortField
{
SortField() : stream(INVALID_STREAM), id(0), desc(NULL)
{}
SortField(StreamType _stream, ULONG _id, const dsc* _desc)
: stream(_stream), id(_id), desc(_desc)
{}
StreamType stream;
ULONG id;
const dsc* desc;
};
class River
{
public:
@ -483,7 +497,7 @@ RecordSource* OPT_compile(thread_db* tdbb, CompilerScratch* csb, RseNode* rse,
AutoPtr<OptimizerBlk> opt(FB_NEW_POOL(*pool) OptimizerBlk(pool, rse));
opt->opt_streams.grow(csb->csb_n_stream);
opt->optimizeFirstRows = (rse->flags & RseNode::FLAG_OPT_FIRST_ROWS) != 0;
opt->favorFirstRows = (rse->flags & RseNode::FLAG_OPT_FIRST_ROWS) != 0;
RecordSource* rsb = NULL;
@ -834,11 +848,11 @@ RecordSource* OPT_compile(thread_db* tdbb, CompilerScratch* csb, RseNode* rse,
// Handle project clause, if present
if (project)
rsb = OPT_gen_sort(tdbb, opt->opt_csb, opt->beds, &opt->keyStreams, rsb, project, true);
rsb = OPT_gen_sort(tdbb, opt->opt_csb, opt->beds, &opt->keyStreams, rsb, project, opt->favorFirstRows, true);
// Handle sort clause if present
if (sort)
rsb = OPT_gen_sort(tdbb, opt->opt_csb, opt->beds, &opt->keyStreams, rsb, sort, false);
rsb = OPT_gen_sort(tdbb, opt->opt_csb, opt->beds, &opt->keyStreams, rsb, sort, opt->favorFirstRows, false);
}
// Handle first and/or skip. The skip MUST (if present)
@ -2435,7 +2449,8 @@ static RecordSource* gen_retrieval(thread_db* tdbb,
SortedStream* OPT_gen_sort(thread_db* tdbb, CompilerScratch* csb, const StreamList& streams,
const StreamList* dbkey_streams, RecordSource* prior_rsb, SortNode* sort, bool project_flag)
const StreamList* dbkey_streams, RecordSource* prior_rsb, SortNode* sort,
bool refetch_flag, bool project_flag)
{
/**************************************
*
@ -2468,52 +2483,97 @@ SortedStream* OPT_gen_sort(thread_db* tdbb, CompilerScratch* csb, const StreamLi
ULONG items = sort->expressions.getCount() +
3 * streams.getCount() + 2 * (dbkey_streams ? dbkey_streams->getCount() : 0);
const StreamType* const end_ptr = streams.end();
const NestConst<ValueExprNode>* const end_node = sort->expressions.end();
HalfStaticArray<ULONG, OPT_STATIC_ITEMS> id_list;
StreamList stream_list;
for (const StreamType* ptr = streams.begin(); ptr < end_ptr; ptr++)
// Collect all fields involved into the sort
HalfStaticArray<SortField, OPT_STATIC_ITEMS> fields;
ULONG totalLength = 0;
for (const auto stream : streams)
{
UInt32Bitmap::Accessor accessor(csb->csb_rpt[*ptr].csb_fields);
UInt32Bitmap::Accessor accessor(csb->csb_rpt[stream].csb_fields);
if (accessor.getFirst())
{
do
{
const ULONG id = accessor.current();
items++;
id_list.push(id);
stream_list.push(*ptr);
const auto id = accessor.current();
for (NestConst<ValueExprNode>* node_ptr = sort->expressions.begin();
node_ptr != end_node;
++node_ptr)
const auto format = CMP_format(tdbb, csb, stream);
const auto desc = &format->fmt_desc[id];
if (id >= format->fmt_count || desc->dsc_dtype == dtype_unknown)
IBERROR(157); // msg 157 cannot sort on a field that does not exist
fields.push(SortField(stream, id, desc));
totalLength += desc->dsc_length;
// If the field has already been mentioned as a sort key, don't bother to repeat it
for (auto expr : sort->expressions)
{
FieldNode* fieldNode = nodeAs<FieldNode>(*node_ptr);
const auto fieldNode = nodeAs<FieldNode>(expr);
if (fieldNode && fieldNode->fieldStream == *ptr && fieldNode->fieldId == id)
if (fieldNode && fieldNode->fieldStream == stream && fieldNode->fieldId == id)
{
dsc* desc = &descriptor;
auto desc = &descriptor;
fieldNode->getDesc(tdbb, csb, desc);
// International type text has a computed key
// Different decimal float values sometimes have same keys
// International type text has a computed key.
// Different decimal float values sometimes have same keys.
// ASF: Date/time with time zones too.
if (IS_INTL_DATA(desc) || desc->isDecFloat() || desc->isDateTimeTz())
break;
--items;
id_list.pop();
stream_list.pop();
if (!IS_INTL_DATA(desc) && !desc->isDecFloat() && !desc->isDateTimeTz())
{
totalLength -= desc->dsc_length;
fields.pop();
}
break;
}
}
} while (accessor.getNext());
}
}
auto fieldCount = fields.getCount();
// Unless refetching is requested explicitly (e.g. FIRST ROWS optimization mode),
// validate the sort record length against the configured threshold for inline storage
if (!refetch_flag)
{
const auto dbb = tdbb->getDatabase();
const auto threshold = dbb->dbb_config->getInlineSortThreshold();
refetch_flag = (totalLength > threshold);
}
// Check for persistent fields to be excluded from the sort.
// If nothing is excluded, there's no point in the refetch mode.
if (refetch_flag)
{
for (auto& item : fields)
{
const auto relation = csb->csb_rpt[item.stream].csb_relation;
if (relation &&
!relation->rel_file &&
!relation->rel_view_rse &&
!relation->isVirtual())
{
item.desc = NULL;
--fieldCount;
}
}
refetch_flag = (fieldCount != fields.getCount());
}
items += fieldCount;
// Now that we know the number of items, allocate a sort map block.
SortedStream::SortMap* map =
FB_NEW_POOL(*tdbb->getDefaultPool()) SortedStream::SortMap(*tdbb->getDefaultPool());
@ -2521,6 +2581,9 @@ SortedStream* OPT_gen_sort(thread_db* tdbb, CompilerScratch* csb, const StreamLi
if (project_flag)
map->flags |= SortedStream::FLAG_PROJECT;
if (refetch_flag)
map->flags |= SortedStream::FLAG_REFETCH;
if (sort->unique)
map->flags |= SortedStream::FLAG_UNIQUE;
@ -2607,7 +2670,7 @@ SortedStream* OPT_gen_sort(thread_db* tdbb, CompilerScratch* csb, const StreamLi
FieldNode* fieldNode;
if ((fieldNode = nodeAs<FieldNode>(node)))
if ( (fieldNode = nodeAs<FieldNode>(node)) )
{
map_item->stream = fieldNode->fieldStream;
map_item->fieldId = fieldNode->fieldId;
@ -2618,100 +2681,98 @@ SortedStream* OPT_gen_sort(thread_db* tdbb, CompilerScratch* csb, const StreamLi
ULONG map_length = prev_key ? ROUNDUP(prev_key->getSkdOffset() + prev_key->getSkdLength(), sizeof(SLONG)) : 0;
map->keyLength = map_length;
ULONG flag_offset = map_length;
map_length += stream_list.getCount();
map_length += fieldCount;
// Now go back and process all to fields involved with the sort. If the
// field has already been mentioned as a sort key, don't bother to repeat it.
// Now go back and process all to fields involved with the sort
while (stream_list.hasData())
for (const auto& item : fields)
{
const ULONG id = id_list.pop();
const StreamType stream = stream_list.pop();
const Format* format = CMP_format(tdbb, csb, stream);
const dsc* desc = &format->fmt_desc[id];
if (id >= format->fmt_count || desc->dsc_dtype == dtype_unknown)
IBERROR(157); // msg 157 cannot sort on a field that does not exist
if (desc->dsc_dtype >= dtype_aligned)
map_length = FB_ALIGN(map_length, type_alignments[desc->dsc_dtype]);
if (!item.desc)
continue;
if (item.desc->dsc_dtype >= dtype_aligned)
map_length = FB_ALIGN(map_length, type_alignments[item.desc->dsc_dtype]);
map_item->clear();
map_item->fieldId = (SSHORT) id;
map_item->stream = stream;
map_item->fieldId = (SSHORT) item.id;
map_item->stream = item.stream;
map_item->flagOffset = flag_offset++;
map_item->desc = *desc;
map_item->desc = *item.desc;
map_item->desc.dsc_address = (UCHAR*)(IPTR) map_length;
map_length += desc->dsc_length;
map_length += item.desc->dsc_length;
map_item++;
}
// Make fields for record numbers and transaction ids for all streams
map_length = ROUNDUP(map_length, sizeof(SINT64));
for (const StreamType* ptr = streams.begin(); ptr < end_ptr; ptr++, map_item++)
for (const auto stream : streams)
{
map_item->clear();
map_item->fieldId = SortedStream::ID_DBKEY;
map_item->stream = *ptr;
map_item->stream = stream;
dsc* desc = &map_item->desc;
desc->dsc_dtype = dtype_int64;
desc->dsc_length = sizeof(SINT64);
desc->dsc_address = (UCHAR*)(IPTR) map_length;
map_length += desc->dsc_length;
map_item++;
map_item->clear();
map_item->fieldId = SortedStream::ID_TRANS;
map_item->stream = *ptr;
map_item->stream = stream;
desc = &map_item->desc;
desc->dsc_dtype = dtype_int64;
desc->dsc_length = sizeof(SINT64);
desc->dsc_address = (UCHAR*)(IPTR) map_length;
map_length += desc->dsc_length;
map_item++;
}
if (dbkey_streams && dbkey_streams->hasData())
{
const StreamType* const end_ptrL = dbkey_streams->end();
map_length = ROUNDUP(map_length, sizeof(SINT64));
for (const StreamType* ptr = dbkey_streams->begin(); ptr < end_ptrL; ptr++, map_item++)
for (const auto stream : *dbkey_streams)
{
map_item->clear();
map_item->fieldId = SortedStream::ID_DBKEY;
map_item->stream = *ptr;
map_item->stream = stream;
dsc* desc = &map_item->desc;
desc->dsc_dtype = dtype_int64;
desc->dsc_length = sizeof(SINT64);
desc->dsc_address = (UCHAR*)(IPTR) map_length;
map_length += desc->dsc_length;
map_item++;
}
for (const StreamType* ptr = dbkey_streams->begin(); ptr < end_ptrL; ptr++, map_item++)
for (const auto stream : *dbkey_streams)
{
map_item->clear();
map_item->fieldId = SortedStream::ID_DBKEY_VALID;
map_item->stream = *ptr;
map_item->stream = stream;
dsc* desc = &map_item->desc;
desc->dsc_dtype = dtype_text;
desc->dsc_ttype() = CS_BINARY;
desc->dsc_length = 1;
desc->dsc_address = (UCHAR*)(IPTR) map_length;
map_length += desc->dsc_length;
map_item++;
}
}
for (const StreamType* ptr = streams.begin(); ptr < end_ptr; ptr++, map_item++)
for (const auto stream : streams)
{
map_item->clear();
map_item->fieldId = SortedStream::ID_DBKEY_VALID;
map_item->stream = *ptr;
map_item->stream = stream;
dsc* desc = &map_item->desc;
desc->dsc_dtype = dtype_text;
desc->dsc_ttype() = CS_BINARY;
desc->dsc_length = 1;
desc->dsc_address = (UCHAR*)(IPTR) map_length;
map_length += desc->dsc_length;
map_item++;
}
fb_assert(map_item == map->items.end());
@ -2719,15 +2780,15 @@ SortedStream* OPT_gen_sort(thread_db* tdbb, CompilerScratch* csb, const StreamLi
map_length = ROUNDUP(map_length, sizeof(SLONG));
// Make fields to store varying and cstring length.
// Make fields to store varying and cstring length
const sort_key_def* const end_key = sort_key;
for (sort_key = map->keyItems.begin(); sort_key < end_key; sort_key++)
for (auto& sortKey : map->keyItems)
{
fb_assert(sort_key->skd_dtype != 0);
if (sort_key->skd_dtype == SKD_varying || sort_key->skd_dtype == SKD_cstring)
fb_assert(sortKey.skd_dtype != 0);
if (sortKey.skd_dtype == SKD_varying || sortKey.skd_dtype == SKD_cstring)
{
sort_key->skd_vary_offset = map_length;
sortKey.skd_vary_offset = map_length;
map_length += sizeof(USHORT);
map->flags |= SortedStream::FLAG_KEY_VARY;
}
@ -2972,7 +3033,7 @@ static bool gen_equi_join(thread_db* tdbb, OptimizerBlk* opt, RiverList& org_riv
StreamList streams;
streams.assign(river->getStreams());
rsb = OPT_gen_sort(tdbb, opt->opt_csb, streams, NULL, rsb, key, false);
rsb = OPT_gen_sort(tdbb, opt->opt_csb, streams, NULL, rsb, key, opt->favorFirstRows, false);
}
else
{

View File

@ -48,6 +48,7 @@ void OPT_compile_relation(Jrd::thread_db* tdbb, Jrd::jrd_rel* relation, Jrd::Com
StreamType stream, bool needIndices);
void OPT_gen_aggregate_distincts(Jrd::thread_db* tdbb, Jrd::CompilerScratch* csb, Jrd::MapNode* map);
Jrd::SortedStream* OPT_gen_sort(Jrd::thread_db* tdbb, Jrd::CompilerScratch* csb, const Jrd::StreamList& streams,
const Jrd::StreamList* dbkey_streams, Jrd::RecordSource* prior_rsb, Jrd::SortNode* sort, bool project_flag);
const Jrd::StreamList* dbkey_streams, Jrd::RecordSource* prior_rsb, Jrd::SortNode* sort,
bool refetch_flag, bool project_flag);
#endif // JRD_OPT_PROTO_H

View File

@ -517,6 +517,7 @@ namespace Jrd
static const USHORT FLAG_PROJECT = 0x1; // sort is really a project
static const USHORT FLAG_UNIQUE = 0x2; // sorts using unique key - for distinct and group by
static const USHORT FLAG_KEY_VARY = 0x4; // sort key contains varying length string(s)
static const USHORT FLAG_REFETCH = 0x8; // refetch data after sorting
// Special values for SortMap::Item::fieldId.
static const SSHORT ID_DBKEY = -1; // dbkey value

View File

@ -22,8 +22,11 @@
#include "../jrd/btr.h"
#include "../jrd/intl.h"
#include "../jrd/req.h"
#include "../jrd/tra.h"
#include "../dsql/ExprNodes.h"
#include "../jrd/cch_proto.h"
#include "../jrd/cmp_proto.h"
#include "../jrd/dpm_proto.h"
#include "../jrd/evl_proto.h"
#include "../jrd/intl_proto.h"
#include "../jrd/met_proto.h"
@ -57,7 +60,7 @@ void SortedStream::open(thread_db* tdbb) const
// Get rid of the old sort areas if this request has been used already.
// Null the pointer before calling init() because it may throw.
delete impure->irsb_sort;
impure->irsb_sort = NULL;
impure->irsb_sort = nullptr;
impure->irsb_sort = init(tdbb);
}
@ -75,7 +78,7 @@ void SortedStream::close(thread_db* tdbb) const
impure->irsb_flags &= ~irsb_open;
delete impure->irsb_sort;
impure->irsb_sort = NULL;
impure->irsb_sort = nullptr;
m_next->close(tdbb);
}
@ -120,6 +123,9 @@ void SortedStream::print(thread_db* tdbb, string& plan,
extras.printf(" (record length: %" ULONGFORMAT", key length: %" ULONGFORMAT")",
m_map->length, m_map->keyLength);
if (m_map->flags & FLAG_REFETCH)
plan += printIndent(++level) + "Refetch";
plan += printIndent(++level) +
((m_map->flags & FLAG_PROJECT) ? "Unique Sort" : "Sort") + extras;
@ -168,7 +174,7 @@ Sort* SortedStream::init(thread_db* tdbb) const
Sort(tdbb->getDatabase(), &request->req_sorts,
m_map->length, m_map->keyItems.getCount(), m_map->keyItems.getCount(),
m_map->keyItems.begin(),
((m_map->flags & FLAG_PROJECT) ? rejectDuplicate : NULL), 0));
((m_map->flags & FLAG_PROJECT) ? rejectDuplicate : nullptr), 0));
// Pump the input stream dry while pushing records into sort. For
// each record, map all fields into the sort record. The reverse
@ -183,7 +189,7 @@ Sort* SortedStream::init(thread_db* tdbb) const
// "Put" a record to sort. Actually, get the address of a place
// to build a record.
UCHAR* data = NULL;
UCHAR* data = nullptr;
scb->put(tdbb, reinterpret_cast<ULONG**>(&data));
// Zero out the sort key. This solves a multitude of problems.
@ -199,7 +205,7 @@ Sort* SortedStream::init(thread_db* tdbb) const
to = item->desc;
to.dsc_address = data + (IPTR) to.dsc_address;
bool flag = false;
dsc* from = NULL;
dsc* from = nullptr;
if (item->node)
{
@ -308,7 +314,7 @@ UCHAR* SortedStream::getData(thread_db* tdbb) const
jrd_req* const request = tdbb->getRequest();
Impure* const impure = request->getImpure<Impure>(m_impure);
ULONG* data = NULL;
ULONG* data = nullptr;
impure->irsb_sort->get(tdbb, &data);
return reinterpret_cast<UCHAR*>(data);
@ -317,18 +323,16 @@ UCHAR* SortedStream::getData(thread_db* tdbb) const
void SortedStream::mapData(thread_db* tdbb, jrd_req* request, UCHAR* data) const
{
StreamType stream = INVALID_STREAM;
dsc from, to;
StreamList refetchStreams;
const SortMap::Item* const end_item = m_map->items.begin() + m_map->items.getCount();
for (const SortMap::Item* item = m_map->items.begin(); item < end_item; item++)
for (const auto& item : m_map->items)
{
const bool flag = (*(data + item->flagOffset) == TRUE);
from = item->desc;
const auto flag = (*(data + item.flagOffset) == TRUE);
from = item.desc;
from.dsc_address = data + (IPTR) from.dsc_address;
if (item->node && !nodeIs<FieldNode>(item->node))
if (item.node && !nodeIs<FieldNode>(item.node))
continue;
// if moving a TEXT item into the key portion of the sort record,
@ -338,16 +342,15 @@ void SortedStream::mapData(thread_db* tdbb, jrd_req* request, UCHAR* data) const
// a sort key, there is a later nod_field in the item
// list that contains the data to send back
if ((IS_INTL_DATA(&item->desc) || item->desc.isDecFloat()) &&
(ULONG)(IPTR) item->desc.dsc_address < m_map->keyLength)
if ((IS_INTL_DATA(&item.desc) || item.desc.isDecFloat()) &&
(ULONG)(IPTR) item.desc.dsc_address < m_map->keyLength)
{
continue;
}
record_param* const rpb = &request->req_rpb[item->stream];
jrd_rel* const relation = rpb->rpb_relation;
const SSHORT id = item->fieldId;
const auto rpb = &request->req_rpb[item.stream];
const auto relation = rpb->rpb_relation;
const auto id = item.fieldId;
if (id < 0)
{
@ -366,20 +369,33 @@ void SortedStream::mapData(thread_db* tdbb, jrd_req* request, UCHAR* data) const
fb_assert(false);
}
if (relation &&
// If transaction ID is present, then fields from this stream are accessed.
// So we need to refetch the stream, either immediately or on demand.
const auto refetch = (id == ID_TRANS);
if (refetch && relation &&
!relation->rel_file &&
!relation->rel_view_rse &&
!relation->isVirtual())
{
rpb->rpb_runtime_flags |= RPB_refetch;
if (m_map->flags & FLAG_REFETCH)
{
// Prepare this stream for an immediate refetch
if (!refetchStreams.exist(item.stream))
refetchStreams.add(item.stream);
}
else // delay refetch until really necessary
rpb->rpb_runtime_flags |= RPB_refetch;
}
continue;
}
if (item->stream != stream)
fb_assert(!(rpb->rpb_stream_flags & RPB_s_no_data));
if (item.stream != stream)
{
stream = item->stream;
stream = item.stream;
// For the sake of prudence, set all record parameter blocks to contain
// the most recent format. This will guarantee that all fields mapped
@ -388,20 +404,160 @@ void SortedStream::mapData(thread_db* tdbb, jrd_req* request, UCHAR* data) const
// dimitr: I've added the check for !isValid to ensure that we don't overwrite
// the format for an active rpb (i.e. the one having some record fetched).
// See CORE-3806 for example.
// BEWARE: This check depends on the fact that ID_DBKEY_VALID flags are stored
// *after* real fields and ID_TRANS / ID_DBKEY values.
if (relation && !rpb->rpb_number.isValid())
VIO_record(tdbb, rpb, MET_current(tdbb, relation), tdbb->getDefaultPool());
}
Record* const record = rpb->rpb_record;
const auto record = rpb->rpb_record;
record->reset();
if (flag)
record->setNull(id);
else
{
EVL_field(rpb->rpb_relation, record, id, &to);
EVL_field(relation, record, id, &to);
MOV_move(tdbb, &from, &to);
record->clearNull(id);
}
}
// If necessary, refetch records from the underlying streams
for (const auto stream : refetchStreams)
{
fb_assert(m_map->flags & FLAG_REFETCH);
const auto rpb = &request->req_rpb[stream];
const auto relation = rpb->rpb_relation;
// Ensure the record is still in the most recent format
VIO_record(tdbb, rpb, MET_current(tdbb, relation), tdbb->getDefaultPool());
// Set all fields to NULL if the stream was originally marked as invalid
if (!rpb->rpb_number.isValid())
{
rpb->rpb_record->nullify();
continue;
}
// Refetch the record to make sure all fields are present.
// It should always succeed for SNAPSHOT and READ CONSISTENCY transactions.
const auto transaction = tdbb->getTransaction();
fb_assert(transaction);
const auto selfTraNum = transaction->tra_number;
// Code underneath is a slightly customized version of VIO_refetch_record,
// because in the case of deleted record followed by a commit we should
// find the original version (or die trying).
const auto orgTraNum = rpb->rpb_transaction_nr;
// If the primary record version disappeared, we cannot proceed
if (!DPM_get(tdbb, rpb, LCK_read))
Arg::Gds(isc_no_cur_rec).raise();
tdbb->bumpRelStats(RuntimeStatistics::RECORD_RPT_READS, rpb->rpb_relation->rel_id);
if (VIO_chase_record_version(tdbb, rpb, transaction, tdbb->getDefaultPool(), false, false))
{
if (!(rpb->rpb_runtime_flags & RPB_undo_data))
VIO_data(tdbb, rpb, tdbb->getDefaultPool());
if (rpb->rpb_transaction_nr == orgTraNum && orgTraNum != selfTraNum)
continue; // we surely see the original record version
}
else if (!(rpb->rpb_flags & rpb_deleted))
Arg::Gds(isc_no_cur_rec).raise();
if (rpb->rpb_transaction_nr != selfTraNum)
{
// Ensure that somebody really touched this record
fb_assert(rpb->rpb_transaction_nr != orgTraNum);
// and we discovered it in READ COMMITTED transaction
fb_assert(transaction->tra_flags & TRA_read_committed);
// and our transaction isn't READ CONSISTENCY one
fb_assert(!(transaction->tra_flags & TRA_read_consistency));
}
// We have found a more recent record version. Unless it's a delete stub,
// validate whether it's still suitable for the result set.
if (!(rpb->rpb_flags & rpb_deleted))
{
fb_assert(rpb->rpb_length != 0);
fb_assert(rpb->rpb_address != nullptr);
// Record can be safely returned only if it has no fields
// acting as sort keys or they haven't been changed
bool keysChanged = false;
for (const auto& item : m_map->items)
{
if (item.node && !nodeIs<FieldNode>(item.node))
continue;
if (item.stream != stream || item.fieldId < 0)
continue;
const auto null1 = (*(data + item.flagOffset) == TRUE);
from = item.desc;
from.dsc_address = data + (IPTR) from.dsc_address;
const auto null2 = !EVL_field(relation, rpb->rpb_record, item.fieldId, &to);
if (null1 != null2 || (!null1 && MOV_compare(tdbb, &from, &to)))
{
keysChanged = true;
break;
}
}
if (!keysChanged)
continue;
}
// If it's our transaction who updated/deleted this record, punt.
// We don't know how to find the original record version.
if (rpb->rpb_transaction_nr == selfTraNum)
Arg::Gds(isc_no_cur_rec).raise();
// We have to find the original record version, sigh.
// Scan version chain backwards until it's done.
auto temp = *rpb;
temp.rpb_record = nullptr;
RuntimeStatistics::Accumulator backversions(tdbb, relation,
RuntimeStatistics::RECORD_BACKVERSION_READS);
while (temp.rpb_transaction_nr != orgTraNum)
{
// If the backversion has been garbage collected, punt.
// We can do nothing in this situation.
if (!temp.rpb_b_page)
Arg::Gds(isc_no_cur_rec).raise();
// Fetch the backversion. Punt if it's unexpectedly disappeared.
temp.rpb_page = temp.rpb_b_page;
temp.rpb_line = temp.rpb_b_line;
if (!DPM_fetch(tdbb, &temp, LCK_read))
Arg::Gds(isc_no_cur_rec).raise();
VIO_data(tdbb, &temp, tdbb->getDefaultPool());
++backversions;
}
VIO_copy_record(tdbb, &temp, rpb);
delete temp.rpb_record;
}
}

View File

@ -321,7 +321,7 @@ WindowedStream::WindowedStream(thread_db* tdbb, CompilerScratch* csb,
if (windowOrder)
{
SortedStream* sortedStream = OPT_gen_sort(tdbb, csb, streams, NULL,
m_joinedStream, windowOrder, false);
m_joinedStream, windowOrder, false, false);
m_joinedStream = FB_NEW_POOL(csb->csb_pool) WindowStream(tdbb, csb, window->stream,
(window->group ? &window->group->expressions : NULL),

View File

@ -103,8 +103,7 @@ public:
RseNode* const rse;
StreamList outerStreams, subStreams;
StreamList compileStreams, beds, localStreams, keyStreams;
bool optimizeFirstRows;
bool isLateral;
bool favorFirstRows;
};
// values for opt_conjunct_flags