8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-30 22:03:03 +01:00
firebird-mirror/src/jrd/recsrc/BufferedStream.cpp

356 lines
8.8 KiB
C++
Raw Normal View History

/*
* The contents of this file are subject to the Initial
* Developer's Public License Version 1.0 (the "License");
* you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl.
*
* Software distributed under the License is distributed AS IS,
* WITHOUT WARRANTY OF ANY KIND, either express or implied.
* See the License for the specific language governing rights
* and limitations under the License.
*
* The Original Code was created by Dmitry Yemanov
* for the Firebird Open Source RDBMS project.
*
* Copyright (c) 2009 Dmitry Yemanov <dimitr@firebirdsql.org>
* and all contributors signed below.
*
* All Rights Reserved.
* Contributor(s): ______________________________________.
*/
#include "firebird.h"
#include "../jrd/align.h"
#include "../jrd/jrd.h"
#include "../jrd/req.h"
#include "../jrd/cmp_proto.h"
#include "../jrd/evl_proto.h"
#include "../jrd/met_proto.h"
#include "../jrd/mov_proto.h"
#include "../jrd/vio_proto.h"
#include "RecordSource.h"
using namespace Firebird;
using namespace Jrd;
// --------------------------
// Data access: record buffer
// --------------------------
BufferedStream::BufferedStream(CompilerScratch* csb, RecordSource* next)
: m_next(next), m_map(csb->csb_pool)
{
fb_assert(m_next);
m_impure = CMP_impure(csb, sizeof(Impure));
2011-02-20 16:34:08 +01:00
StreamList streams;
m_next->findUsedStreams(streams);
2009-12-10 02:32:47 +01:00
Array<dsc> fields;
2011-02-20 16:34:08 +01:00
for (StreamList::iterator i = streams.begin(); i != streams.end(); ++i)
{
const StreamType stream = *i;
CompilerScratch::csb_repeat* const tail = &csb->csb_rpt[stream];
tail->csb_flags |= csb_offline;
2009-12-13 11:41:53 +01:00
UInt32Bitmap::Accessor accessor(tail->csb_fields);
if (accessor.getFirst())
{
do {
const USHORT id = (USHORT) accessor.current();
const Format* const format = tail->csb_format; // CMP_format(tdbb, csb, stream);
const dsc* const desc = &format->fmt_desc[id];
2009-12-14 13:56:27 +01:00
m_map.add(FieldMap(FieldMap::REGULAR_FIELD, stream, id));
fields.add(*desc);
2009-12-10 02:32:47 +01:00
} while (accessor.getNext());
}
2009-12-14 13:56:27 +01:00
dsc desc;
desc.makeLong(0);
m_map.add(FieldMap(FieldMap::TRANSACTION_ID, stream, 0));
fields.add(desc);
desc.makeInt64(0);
m_map.add(FieldMap(FieldMap::DBKEY_NUMBER, stream, 0));
fields.add(desc);
desc.makeText(1, CS_BINARY);
m_map.add(FieldMap(FieldMap::DBKEY_VALID, stream, 0));
fields.add(desc);
}
2014-07-17 20:48:46 +02:00
const FB_SIZE_T count = fields.getCount();
Format* const format = Format::newFormat(csb->csb_pool, count);
2013-07-16 08:22:27 +02:00
format->fmt_length = FLAG_BYTES(count);
2014-07-17 20:48:46 +02:00
for (FB_SIZE_T i = 0; i < count; i++)
{
2010-07-06 13:09:32 +02:00
dsc& desc = format->fmt_desc[i] = fields[i];
2013-07-16 08:22:27 +02:00
2009-12-14 13:56:27 +01:00
if (desc.dsc_dtype >= dtype_aligned)
format->fmt_length = FB_ALIGN(format->fmt_length, type_alignments[desc.dsc_dtype]);
2013-07-16 08:22:27 +02:00
desc.dsc_address = (UCHAR*)(IPTR) format->fmt_length;
format->fmt_length += desc.dsc_length;
}
2010-07-06 13:09:32 +02:00
m_format = format;
}
void BufferedStream::open(thread_db* tdbb) const
{
jrd_req* const request = tdbb->getRequest();
Impure* const impure = request->getImpure<Impure>(m_impure);
impure->irsb_flags = irsb_open | irsb_mustread;
m_next->open(tdbb);
delete impure->irsb_buffer;
MemoryPool& pool = *tdbb->getDefaultPool();
impure->irsb_buffer = FB_NEW(pool) RecordBuffer(pool, m_format);
impure->irsb_position = 0;
}
void BufferedStream::close(thread_db* tdbb) const
{
jrd_req* const request = tdbb->getRequest();
invalidateRecords(request);
Impure* const impure = request->getImpure<Impure>(m_impure);
if (impure->irsb_flags & irsb_open)
{
impure->irsb_flags &= ~irsb_open;
delete impure->irsb_buffer;
impure->irsb_buffer = NULL;
m_next->close(tdbb);
}
}
bool BufferedStream::getRecord(thread_db* tdbb) const
{
if (--tdbb->tdbb_quantum < 0)
JRD_reschedule(tdbb, 0, true);
jrd_req* const request = tdbb->getRequest();
Impure* const impure = request->getImpure<Impure>(m_impure);
if (!(impure->irsb_flags & irsb_open))
return false;
dsc from, to;
Record* const buffer_record = impure->irsb_buffer->getTempRecord();
if (impure->irsb_flags & irsb_mustread)
{
if (!m_next->getRecord(tdbb))
{
// ASF: There is nothing more to read, so remove irsb_mustread flag.
// That's important if m_next is reused in another stream and our caller
2010-01-25 07:48:09 +01:00
// relies on this BufferedStream being non-lazy, like WindowedStream does.
impure->irsb_flags &= ~irsb_mustread;
return false;
}
buffer_record->nullify();
// Assign the fields to the record to be stored
2014-07-17 20:48:46 +02:00
for (FB_SIZE_T i = 0; i < m_map.getCount(); i++)
{
2009-12-14 13:56:27 +01:00
const FieldMap& map = m_map[i];
record_param* const rpb = &request->req_rpb[map.map_stream];
Record* const record = rpb->rpb_record;
if (map.map_type == FieldMap::REGULAR_FIELD)
{
if (!EVL_field(rpb->rpb_relation, record, map.map_id, &from))
continue;
}
2013-03-17 18:35:53 +01:00
buffer_record->clearNull(i);
if (!EVL_field(rpb->rpb_relation, buffer_record, (USHORT) i, &to))
2011-03-03 15:12:41 +01:00
fb_assert(false);
2009-12-14 13:56:27 +01:00
switch (map.map_type)
{
2009-12-14 13:56:27 +01:00
case FieldMap::REGULAR_FIELD:
MOV_move(tdbb, &from, &to);
2009-12-14 13:56:27 +01:00
break;
case FieldMap::TRANSACTION_ID:
*reinterpret_cast<ULONG*>(to.dsc_address) = rpb->rpb_transaction_nr;
2009-12-14 13:56:27 +01:00
break;
case FieldMap::DBKEY_NUMBER:
*reinterpret_cast<SINT64*>(to.dsc_address) = rpb->rpb_number.getValue();
break;
case FieldMap::DBKEY_VALID:
*to.dsc_address = (UCHAR) rpb->rpb_number.isValid();
break;
default:
fb_assert(false);
}
}
// Put the record into the buffer
impure->irsb_buffer->store(buffer_record);
}
else
{
// Read the record from the buffer
if (!impure->irsb_buffer->fetch(impure->irsb_position, buffer_record))
return false;
2009-12-14 13:56:27 +01:00
StreamType stream = INVALID_STREAM;
// Assign fields back to their original streams
2014-07-17 20:48:46 +02:00
for (FB_SIZE_T i = 0; i < m_map.getCount(); i++)
{
const FieldMap& map = m_map[i];
record_param* const rpb = &request->req_rpb[map.map_stream];
if (map.map_stream != stream)
{
stream = map.map_stream;
// See SortedStream::mapData() for explanations why we need
// to upgrade the record format
if (rpb->rpb_relation && !rpb->rpb_number.isValid())
VIO_record(tdbb, rpb, MET_current(tdbb, rpb->rpb_relation), tdbb->getDefaultPool());
}
Record* const record = rpb->rpb_record;
record->reset();
if (!EVL_field(rpb->rpb_relation, buffer_record, (USHORT) i, &from))
2009-12-14 13:56:27 +01:00
{
fb_assert(map.map_type == FieldMap::REGULAR_FIELD);
record->setNull(map.map_id);
continue;
}
switch (map.map_type)
{
case FieldMap::REGULAR_FIELD:
2009-12-14 13:56:27 +01:00
{
EVL_field(rpb->rpb_relation, record, map.map_id, &to);
MOV_move(tdbb, &from, &to);
record->clearNull(map.map_id);
}
break;
2009-12-14 13:56:27 +01:00
case FieldMap::TRANSACTION_ID:
rpb->rpb_transaction_nr = *reinterpret_cast<ULONG*>(from.dsc_address);
break;
2009-12-14 13:56:27 +01:00
case FieldMap::DBKEY_NUMBER:
rpb->rpb_number.setValue(*reinterpret_cast<SINT64*>(from.dsc_address));
break;
2009-12-14 13:56:27 +01:00
case FieldMap::DBKEY_VALID:
rpb->rpb_number.setValid(*from.dsc_address != 0);
break;
2009-12-14 13:56:27 +01:00
default:
fb_assert(false);
}
}
}
impure->irsb_position++;
return true;
}
bool BufferedStream::refetchRecord(thread_db* tdbb) const
{
return m_next->refetchRecord(tdbb);
}
bool BufferedStream::lockRecord(thread_db* tdbb) const
{
return m_next->lockRecord(tdbb);
}
void BufferedStream::print(thread_db* tdbb, string& plan, bool detailed, unsigned level) const
{
if (detailed)
2014-02-01 10:42:30 +01:00
{
string extras;
extras.printf(" (record length: %"ULONGFORMAT")", m_format->fmt_length);
plan += printIndent(++level) + "Record Buffer" + extras;
}
m_next->print(tdbb, plan, detailed, level);
}
void BufferedStream::markRecursive()
{
m_next->markRecursive();
}
void BufferedStream::findUsedStreams(StreamList& streams, bool expandAll) const
{
m_next->findUsedStreams(streams, expandAll);
}
void BufferedStream::invalidateRecords(jrd_req* request) const
{
m_next->invalidateRecords(request);
}
void BufferedStream::nullRecords(thread_db* tdbb) const
{
m_next->nullRecords(tdbb);
}
2010-07-06 13:09:32 +02:00
void BufferedStream::locate(thread_db* tdbb, FB_UINT64 position) const
{
jrd_req* const request = tdbb->getRequest();
Impure* const impure = request->getImpure<Impure>(m_impure);
// If we haven't fetched and cached the underlying stream completely, do it now
if (impure->irsb_flags & irsb_mustread)
{
while (this->getRecord(tdbb))
; // no-op
2010-01-19 17:32:02 +01:00
fb_assert(!(impure->irsb_flags & irsb_mustread));
}
impure->irsb_position = position;
}
FB_UINT64 BufferedStream::getCount(thread_db* tdbb) const
{
jrd_req* const request = tdbb->getRequest();
Impure* const impure = request->getImpure<Impure>(m_impure);
// If we haven't fetched and cached the underlying stream completely, do it now
if (impure->irsb_flags & irsb_mustread)
{
while (this->getRecord(tdbb))
; // no-op
fb_assert(!(impure->irsb_flags & irsb_mustread));
}
return impure->irsb_buffer ? impure->irsb_buffer->getCount() : 0;
}