8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 21:23:03 +01:00

Fidex CORE-4978: Improved validation and fix of lost data pages

This commit is contained in:
roman-simakov 2015-10-27 08:38:19 +00:00
parent 01240db806
commit e7171b3b44
4 changed files with 207 additions and 5 deletions

View File

@ -4983,7 +4983,7 @@ static BufferDesc* get_buffer(thread_db* tdbb, const PageNumber page, LATCH latc
#ifndef SUPERSERVER
if (page.getPageNum() >= 0)
{
CCH_TRACE(("bdb->bdb_lock->lck_logical = LCK_none; page=%i", bdb->bdb_page));
CCH_TRACE(("bdb->bdb_lock->lck_logical = LCK_none; page=%i", bdb->bdb_page.getPageNum()));
bdb->bdb_lock->lck_logical = LCK_none;
}
else {

View File

@ -624,6 +624,10 @@ void INF_database_info(const UCHAR* items,
if (err_att->att_val_errors)
{
err_val = (*err_att->att_val_errors)[VAL_DATA_PAGE_CONFUSED] +
(*err_att->att_val_errors)[VAL_DATA_PAGE_ISNT_IN_PIP] +
(*err_att->att_val_errors)[VAL_DATA_PAGE_SLOT_NOT_FOUND] +
(*err_att->att_val_errors)[VAL_DATA_PAGE_SLOT_BAD_VAL] +
(*err_att->att_val_errors)[VAL_DATA_PAGE_HASNO_PP] +
(*err_att->att_val_errors)[VAL_DATA_PAGE_LINE_ERR];
}
else

View File

@ -188,6 +188,7 @@ const int DYN_REQUESTS = 2;
//
// Errors during validation - will be returned on info calls
// CVC: It seems they will be better in a header for val.cpp that's not val.h
// Do not forget to add new constants to inf.cpp:INF_database_info
//
const int VAL_PAG_WRONG_TYPE = 0;
const int VAL_PAG_CHECKSUM_ERR = 1;
@ -217,7 +218,11 @@ const int VAL_INDEX_ORPHAN_CHILD = 24;
const int VAL_INDEX_CYCLE = 25;
const int VAL_INDEX_BAD_LEFT_SIBLING = 26;
const int VAL_INDEX_MISSES_NODE = 27;
const int VAL_MAX_ERROR = 28;
const int VAL_DATA_PAGE_ISNT_IN_PIP = 28;
const int VAL_DATA_PAGE_SLOT_NOT_FOUND = 29;
const int VAL_DATA_PAGE_SLOT_BAD_VAL = 30;
const int VAL_DATA_PAGE_HASNO_PP = 31;
const int VAL_MAX_ERROR = 32;
struct DSqlCacheItem

View File

@ -645,7 +645,9 @@ private:
USHORT vdr_errors;
SLONG vdr_max_transaction;
ULONG vdr_rel_backversion_counter; // Counts slots w/rhd_chain
PageBitmap* vdr_backversion_pages; // 1 bit per visited table page
ULONG vdr_rel_chain_counter; // Counts chains w/rdr_chain
PageBitmap* vdr_chain_pages; // 1 bit per visited record chain page
RecordBitmap* vdr_rel_records; // 1 bit per valid record
RecordBitmap* vdr_idx_records; // 1 bit per index item
@ -655,6 +657,8 @@ private:
PatternMatcher* vdr_idx_incl;
PatternMatcher* vdr_idx_excl;
int vdr_lock_tout;
void checkDPinPP(thread_db *tdbb, jrd_rel *relation, SLONG page_number);
void checkDPinPIP(thread_db *tdbb, jrd_rel *relation, SLONG page_number);
};
// vdr_flags
@ -724,7 +728,11 @@ static const TEXT msg_table[VAL_MAX_ERROR][80] =
"Index %d has orphan child page at page %ld",
"Index %d has a circular reference at page %ld", // 25
"Index %d has inconsistent left sibling pointer, page %ld level %d",
"Index %d misses node on page %ld level %d"
"Index %d misses node on page %ld level %d",
"Data page %ld marked as free in PIP (%ld:%ld)",
"Data page %ld is not in PP (%ld). Slot (%d) is not found",
"Data page %ld is not in PP (%ld). Slot (%d) has value %ld",
"Pointer page is not found for data page %ld. dpg_sequence (%ld) is invalid"
};
@ -817,7 +825,9 @@ Validation::Validation(thread_db* tdbb, UtilSvc* uSvc)
vdr_errors = 0;
vdr_max_transaction = 0;
vdr_rel_backversion_counter = 0;
vdr_backversion_pages = NULL;
vdr_rel_chain_counter = 0;
vdr_chain_pages = NULL;
vdr_rel_records = NULL; // 1 bit per valid record
vdr_idx_records = NULL;
@ -1448,13 +1458,24 @@ RTN Validation::walk_chain(thread_db* tdbb, jrd_rel* relation, rhd* header, Reco
if (VAL_debug_level)
fprintf(stdout, " BV %02d: ", ++counter);
#endif
vdr_rel_chain_counter++;
data_page* page = 0;
fetch_page(tdbb, page_number, pag_data, &window, &page);
if (page->dpg_relation != relation->rel_id)
{
++vdr_errors;
CCH_RELEASE_TAIL(tdbb, &window);
return corrupt(tdbb, VAL_DATA_PAGE_CONFUSED, relation, page_number, page->dpg_sequence);
}
vdr_rel_chain_counter++;
PBM_SET(tdbb->getDefaultPool(), &vdr_chain_pages, page_number);
const data_page::dpg_repeat* line = &page->dpg_rpt[line_number];
header = (rhd*) ((UCHAR *) page + line->dpg_offset);
if (page->dpg_count <= line_number || !line->dpg_length ||
(header->rhd_flags & (rhd_blob | rhd_fragment)) ||
!(header->rhd_flags & rhd_chain) ||
walk_record(tdbb, relation, header, line->dpg_length,
head_number, delta_flag) != rtn_ok)
{
@ -1642,7 +1663,10 @@ RTN Validation::walk_data_page(thread_db* tdbb, jrd_rel* relation, SLONG page_nu
sequence, (SLONG) (line - page->dpg_rpt));
}
if (header->rhd_flags & rhd_chain)
{
vdr_rel_backversion_counter++;
PBM_SET(tdbb->getDefaultPool(), &vdr_backversion_pages, page_number);
}
// Record the existance of a primary version of a record
@ -2541,6 +2565,137 @@ RTN Validation::walk_record(thread_db* tdbb, jrd_rel* relation, rhd* header, USH
return rtn_ok;
}
#define DECOMPOSE(n, divisor, q, r) {r = n % divisor; q = n / divisor;}
void Validation::checkDPinPP(thread_db* tdbb, jrd_rel* relation, SLONG page_number)
{
/**************************************
*
* Functional description
* Check if data page presented in pointer page.
* If not we try to fix it by setting pointer page slot in the page_number.
* Early in walk_chain we observed that this page in related to the relation so we skip such kind of check here.
**************************************/
WIN window(DB_PAGE_SPACE, page_number);
data_page* dpage;
fetch_page(tdbb, page_number, pag_data, &window, &dpage, false);
const SLONG sequence = dpage->dpg_sequence;
CCH_RELEASE_TAIL(tdbb, &window);
USHORT slot;
ULONG pp_sequence;
Database* dbb = tdbb->getDatabase();
DECOMPOSE(sequence, dbb->dbb_dp_per_pp, pp_sequence, slot);
const vcl* vector = relation->getBasePages()->rel_pages;
pointer_page* ppage = 0;
if (pp_sequence < vector->count())
{
fetch_page(tdbb, (*vector)[pp_sequence], pag_pointer, &window, &ppage, false);
if (slot >= ppage->ppg_count)
{
corrupt(tdbb, VAL_DATA_PAGE_SLOT_NOT_FOUND, relation, page_number, window.win_page.getPageNum(), slot);
if (vdr_flags & VDR_update && slot < dbb->dbb_dp_per_pp)
{
CCH_MARK(tdbb, &window);
for (USHORT i = ppage->ppg_count; i < slot; i++)
{
ppage->ppg_page[i] = 0;
// Clear control fields
UCHAR* byte = (UCHAR *) (&ppage->ppg_page[dbb->dbb_dp_per_pp]) + (i >> 2);
UCHAR bits = 3 << ((i & 3) << 1);
*byte &= ~bits;
}
ppage->ppg_page[slot] = page_number;
ppage->ppg_count = slot + 1;
// Restore control fields
UCHAR* byte = (UCHAR *) (&ppage->ppg_page[dbb->dbb_dp_per_pp]) + (slot >> 2);
UCHAR bit = 1 << ((slot & 3) << 1);
const UCHAR flags = dpage->dpg_header.pag_flags;
if (flags & dpg_full)
*byte |= bit;
else
*byte &= ~bit;
bit <<= 1;
if (flags & dpg_large)
*byte |= bit;
else
*byte &= ~bit;
}
}
else if (page_number != ppage->ppg_page[slot])
{
corrupt(tdbb, VAL_DATA_PAGE_SLOT_BAD_VAL, relation, page_number, window.win_page.getPageNum(), slot, ppage->ppg_page[slot]);
if (vdr_flags & VDR_update && !ppage->ppg_page[slot])
{
CCH_MARK(tdbb, &window);
ppage->ppg_page[slot] = page_number;
// Restore control fields
UCHAR* byte = (UCHAR *) (&ppage->ppg_page[dbb->dbb_dp_per_pp]) + (slot >> 2);
UCHAR bit = 1 << ((slot & 3) << 1);
const UCHAR flags = dpage->dpg_header.pag_flags;
if (flags & dpg_full)
*byte |= bit;
else
*byte &= ~bit;
bit <<= 1;
if (flags & dpg_large)
*byte |= bit;
else
*byte &= ~bit;
}
}
}
else
corrupt(tdbb, VAL_DATA_PAGE_HASNO_PP, relation, page_number, dpage->dpg_sequence);
CCH_RELEASE_TAIL(tdbb, &window);
}
void Validation::checkDPinPIP(thread_db* tdbb, jrd_rel* relation, SLONG page_number)
{
/**************************************
*
* Functional description
* Check if data page presented in page inventory page.
* If not we try to fix it by clearing the necessary bit on the page.
**************************************/
Database* dbb = tdbb->getDatabase();
PageManager& pageMgr = dbb->dbb_page_manager;
PageSpace* pageSpace = pageMgr.findPageSpace(DB_PAGE_SPACE);
fb_assert(pageSpace);
const SLONG sequence = page_number / pageMgr.pagesPerPIP;
const SLONG relative_bit = page_number % pageMgr.pagesPerPIP;
WIN pip_window(DB_PAGE_SPACE, (sequence == 0) ? pageSpace->ppFirst : sequence * pageMgr.pagesPerPIP - 1);
page_inv_page* pages;
fetch_page(tdbb, pip_window.win_page.getPageNum(), pag_pages, &pip_window, &pages, false);
if (pages->pip_bits[relative_bit >> 3] & (1 << (relative_bit & 7)))
{
corrupt(tdbb, VAL_DATA_PAGE_ISNT_IN_PIP, relation, page_number, pip_window.win_page.getPageNum(), relative_bit);
if (vdr_flags & VDR_update)
{
CCH_MARK(tdbb, &pip_window);
pages->pip_bits[relative_bit >> 3] &= ~(1 << (relative_bit & 7));
}
}
CCH_RELEASE_TAIL(tdbb, &pip_window);
}
RTN Validation::walk_relation(thread_db* tdbb, jrd_rel* relation)
{
@ -2607,7 +2762,9 @@ RTN Validation::walk_relation(thread_db* tdbb, jrd_rel* relation)
// Walk pointer and selected data pages associated with relation
vdr_rel_backversion_counter = 0;
PageBitmap::reset(vdr_backversion_pages);
vdr_rel_chain_counter = 0;
PageBitmap::reset(vdr_chain_pages);
RecordBitmap::reset(vdr_rel_records);
for (SLONG sequence = 0; true; sequence++)
@ -2631,9 +2788,45 @@ RTN Validation::walk_relation(thread_db* tdbb, jrd_rel* relation)
lckGC.release();
// Compare backversion pages and chain pages
if ((vdr_flags & VDR_records) &&
vdr_chain_pages && (vdr_rel_chain_counter > vdr_rel_backversion_counter))
{
if (vdr_backversion_pages)
{
PageBitmap::Accessor c(vdr_chain_pages);
PageBitmap::Accessor b(vdr_backversion_pages);
if (c.getFirst() && b.getFirst()) {
for (bool next = true; next; next = c.getNext() )
{
if (c.current() == b.current())
b.getNext();
else if ((c.current() < b.current()) || !b.getNext()) {
//fprintf(stdout, "chain page was visited not via data pages %d\n", c.current());
checkDPinPP(tdbb, relation, c.current());
checkDPinPIP(tdbb, relation, c.current());
}
}
}
}
else
{
PageBitmap::Accessor c(vdr_chain_pages);
if (c.getFirst())
{
for (bool next = true; next; next = c.getNext() )
{
//fprintf(stdout, "chain page was visited not via data pages %d\n", c.current());
checkDPinPP(tdbb, relation, c.current());
checkDPinPIP(tdbb, relation, c.current());
}
}
}
}
// See if the counts of backversions match
if ((vdr_flags & VDR_records) &&
(vdr_rel_backversion_counter != vdr_rel_chain_counter))
(vdr_rel_backversion_counter > vdr_rel_chain_counter))
{
return corrupt(tdbb, VAL_REL_CHAIN_ORPHANS, relation,
vdr_rel_backversion_counter - vdr_rel_chain_counter,