8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 15:23:02 +01:00

Swept flag implementation.

A little optimisation of mark_full() - don't call it if not necessary, don't mark PP if its flags already matched to DP flags.
This commit is contained in:
hvlad 2009-12-28 09:11:20 +00:00
parent caf39b1d57
commit ce9fd62c3c

View File

@ -81,6 +81,7 @@ using namespace Jrd;
using namespace Ods;
using namespace Firebird;
static void check_swept(thread_db*, record_param*);
static void delete_tail(thread_db*, rhdf*, const USHORT, USHORT);
static void fragment(thread_db*, record_param*, SSHORT, DataComprControl*, SSHORT, const jrd_tra*);
static void extend_relation(thread_db*, jrd_rel*, WIN*, USHORT);
@ -189,6 +190,8 @@ void DPM_backout( thread_db* tdbb, record_param* rpb)
printf(" new dpg_count %d\n", page->dpg_count);
#endif
fb_assert((page->dpg_header.pag_flags & dpg_swept) == 0);
CCH_RELEASE(tdbb, &rpb->getWindow(tdbb));
}
@ -456,6 +459,12 @@ bool DPM_chain( thread_db* tdbb, record_param* org_rpb, record_param* new_rpb)
memset(header->rhd_data + size, 0, fill);
}
if (page->dpg_header.pag_flags & dpg_swept)
{
page->dpg_header.pag_flags &= ~dpg_swept;
mark_full(tdbb, org_rpb);
}
else
CCH_RELEASE(tdbb, &org_rpb->getWindow(tdbb));
return true;
@ -1541,6 +1550,22 @@ bool DPM_next(thread_db* tdbb, record_param* rpb, USHORT lock_type, bool onepage
}
#endif
// If i'm a sweeper i don't need to look at swept pages. Also i should
// check processed pages if they was swept.
const bool sweeper = (tdbb->tdbb_flags & TDBB_sweeper);
if (sweeper && (pp_sequence || slot) && !line)
{
// The last record at previous data page was returned to caller.
// It is time now to check if previous data page was swept.
const RecordNumber saveRecNo = rpb->rpb_number;
rpb->rpb_number.decrement();
check_swept(tdbb, rpb);
rpb->rpb_number = saveRecNo;
}
// Find the next pointer page, data page, and record
while (true)
@ -1555,7 +1580,8 @@ bool DPM_next(thread_db* tdbb, record_param* rpb, USHORT lock_type, bool onepage
{
const SLONG page_number = ppage->ppg_page[slot];
const UCHAR* bits = (UCHAR*) (ppage->ppg_page + dbb->dbb_dp_per_pp);
if (page_number && !PPG_DP_BIT_TEST(bits, slot, ppg_dp_secondary))
if (page_number && !PPG_DP_BIT_TEST(bits, slot, ppg_dp_secondary) &&
(sweeper && !PPG_DP_BIT_TEST(bits, slot, ppg_dp_swept) || !sweeper) )
{
#ifdef SUPERSERVER_V2
// Perform sequential prefetch of relation's data pages.
@ -1613,6 +1639,20 @@ bool DPM_next(thread_db* tdbb, record_param* rpb, USHORT lock_type, bool onepage
CCH_RELEASE(tdbb, window);
}
if (sweeper)
{
// The last record at data page was not returned to caller.
// It is time now to check if this data page was swept.
const RecordNumber saveRecNo = rpb->rpb_number;
rpb->rpb_number.compose(dbb->dbb_max_records, dbb->dbb_dp_per_pp,
line, slot, pp_sequence);
rpb->rpb_number.decrement();
check_swept(tdbb, rpb);
rpb->rpb_number = saveRecNo;
}
if (onepage) {
return false;
}
@ -1917,6 +1957,13 @@ void DPM_store( thread_db* tdbb, record_param* rpb, PageStack& stack, USHORT typ
memset(header->rhd_data + size, 0, fill);
}
Ods::pag *page = rpb->getWindow(tdbb).win_buffer;
if (page->pag_flags & dpg_swept)
{
page->pag_flags &= ~dpg_swept;
mark_full(tdbb, rpb);
}
else
CCH_RELEASE(tdbb, &rpb->getWindow(tdbb));
}
@ -2204,10 +2251,80 @@ void DPM_update( thread_db* tdbb, record_param* rpb, PageStack* stack, const jrd
memset(header->rhd_data + size, 0, fill);
}
if (page->dpg_header.pag_flags & dpg_swept)
{
page->dpg_header.pag_flags &= ~dpg_swept;
mark_full(tdbb, rpb);
}
else
CCH_RELEASE(tdbb, &rpb->getWindow(tdbb));
}
static void check_swept(thread_db* tdbb, record_param* rpb)
{
/**************************************
*
* c h e c k _ s w e p t
*
**************************************
*
* Functional description
* Check if data page have primary record versions only and all of them
* created by committed transactions. Such data page should be skipped
* by sweep as sweep have nothing to do on it.
* Mark swept data page and its pointer page by corresponding flag.
*
**************************************/
Database* dbb = tdbb->getDatabase();
jrd_tra *transaction = tdbb->getTransaction();
WIN* window = &rpb->getWindow(tdbb);
RelationPages* relPages = rpb->rpb_relation->getPages(tdbb);
USHORT pp_sequence;
SSHORT slot, line;
rpb->rpb_number.decompose(dbb->dbb_max_records, dbb->dbb_dp_per_pp,
line, slot, pp_sequence);
pointer_page* ppage =
get_pointer_page(tdbb, rpb->rpb_relation, relPages, window, pp_sequence, LCK_read);
if (!ppage)
return;
const UCHAR* bits = (UCHAR*) (ppage->ppg_page + dbb->dbb_dp_per_pp);
if (slot >= ppage->ppg_count || !ppage->ppg_page[slot] ||
PPG_DP_BIT_TEST(bits, slot, ppg_dp_secondary | ppg_dp_swept))
{
CCH_RELEASE(tdbb, window);
return;
}
data_page* dpage = (data_page*)
CCH_HANDOFF(tdbb, window, ppage->ppg_page[slot], LCK_write, pag_data);
for (int line = 0; line < dpage->dpg_count; ++line)
{
const data_page::dpg_repeat* index = &dpage->dpg_rpt[line];
if (index->dpg_offset)
{
rhd* header = (rhd*) ((SCHAR*) dpage + index->dpg_offset);
if (header->rhd_transaction > transaction->tra_oldest ||
header->rhd_flags & (rpb_blob | rpb_chained | rpb_fragment) ||
header->rhd_b_page)
{
CCH_RELEASE_TAIL(tdbb, window);
return;
}
}
}
CCH_MARK(tdbb, window);
dpage->dpg_header.pag_flags |= dpg_swept;
mark_full(tdbb, rpb);
}
static void delete_tail(thread_db* tdbb, rhdf* header, const USHORT page_space, USHORT length)
{
/**************************************
@ -2499,6 +2616,12 @@ static void fragment(thread_db* tdbb,
BUGCHECK(252); // msg 252 header fragment length changed
}
if (page->dpg_header.pag_flags & dpg_swept)
{
page->dpg_header.pag_flags &= ~dpg_swept;
mark_full(tdbb, rpb);
}
else
CCH_RELEASE(tdbb, window);
}
@ -2740,9 +2863,16 @@ static UCHAR* find_space(thread_db* tdbb,
if (aligned_size > (int) dbb->dbb_page_size - used)
{
if (!(page->dpg_header.pag_flags & dpg_full))
{
CCH_MARK(tdbb, &rpb->getWindow(tdbb));
page->dpg_header.pag_flags |= dpg_full;
mark_full(tdbb, rpb);
}
else
{
CCH_RELEASE(tdbb, &rpb->getWindow(tdbb));
}
return NULL;
}
@ -3093,12 +3223,25 @@ static void mark_full(thread_db* tdbb, record_param* rpb)
const UCHAR flags = dpage->dpg_header.pag_flags;
CCH_RELEASE(tdbb, &rpb->getWindow(tdbb));
// Check if PP flags already equal to the DP flags
UCHAR* byte = &PPG_DP_BITS_BYTE((UCHAR*) &ppage->ppg_page[dbb->dbb_dp_per_pp], slot);
const UCHAR bit_full_set = ((*byte & PPG_DP_BIT_MASK(slot, ppg_dp_full)) == 0) ? 0 : dpg_full;
const UCHAR bit_large_set = ((*byte & PPG_DP_BIT_MASK(slot, ppg_dp_large)) == 0) ? 0 : dpg_large;
const UCHAR bit_swept_set = ((*byte & PPG_DP_BIT_MASK(slot, ppg_dp_swept)) == 0) ? 0 : dpg_swept;
if ((flags & (dpg_full | dpg_large | dpg_swept)) == (bit_full_set | bit_large_set | bit_swept_set))
{
CCH_RELEASE(tdbb, &pp_window);
return;
}
CCH_precedence(tdbb, &pp_window, rpb->getWindow(tdbb).win_page);
CCH_MARK(tdbb, &pp_window);
//UCHAR bit = 1 << ((slot & 3) << 1);
//UCHAR* byte = (UCHAR *) (&ppage->ppg_page[dbb->dbb_dp_per_pp]) + (slot >> 2);
UCHAR* byte = &PPG_DP_BITS_BYTE((UCHAR*) &ppage->ppg_page[dbb->dbb_dp_per_pp], slot);
UCHAR bit = PPG_DP_BIT_MASK(slot, ppg_dp_full);
if (flags & dpg_full)
@ -3124,6 +3267,14 @@ static void mark_full(thread_db* tdbb, record_param* rpb)
*byte &= ~bit;
}
bit = PPG_DP_BIT_MASK(slot, ppg_dp_swept);
if (flags & dpg_swept) {
*byte |= bit;
}
else {
*byte &= ~bit;
}
CCH_RELEASE(tdbb, &pp_window);
}