8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 05:23:03 +01:00

Fixed CORE-4978: Improved validation and fix of lost data pages. Thanks to Vlad for help.

This commit is contained in:
roman-simakov 2015-11-26 09:28:31 +00:00
parent 81a5ad0794
commit ce8d458e46
3 changed files with 213 additions and 5 deletions

View File

@ -3757,7 +3757,7 @@ static BufferDesc* get_buffer(thread_db* tdbb, const PageNumber page, SyncType s
if (page != FREE_PAGE)
{
CCH_TRACE(("bdb->bdb_lock->lck_logical = LCK_none; page=%i", bdb->bdb_page));
CCH_TRACE(("bdb->bdb_lock->lck_logical = LCK_none; page=%i", bdb->bdb_page.getPageNum()));
bdb->bdb_lock->lck_logical = LCK_none;
}
else

View File

@ -843,7 +843,11 @@ const Validation::MSG_ENTRY Validation::vdr_msg_table[VAL_MAX_ERROR] =
{false, fb_info_pip_warns, "PIP %"ULONGFORMAT" (seq %d) have wrong pip_min (%"ULONGFORMAT"). Correct is %"ULONGFORMAT},
{false, fb_info_pip_warns, "PIP %"ULONGFORMAT" (seq %d) have wrong pip_extent (%"ULONGFORMAT"). Correct is %"ULONGFORMAT},
{false, fb_info_pip_warns, "PIP %"ULONGFORMAT" (seq %d) have wrong pip_used (%"ULONGFORMAT"). Correct is %"ULONGFORMAT},
{false, fb_info_ppage_warns, "Pointer page %"ULONGFORMAT" {sequence %"ULONGFORMAT"} bits {0x%02X %s} are not consistent with data page %"ULONGFORMAT" {sequence %"ULONGFORMAT"} state {0x%02X %s}"}
{false, fb_info_ppage_warns, "Pointer page %"ULONGFORMAT" {sequence %"ULONGFORMAT"} bits {0x%02X %s} are not consistent with data page %"ULONGFORMAT" {sequence %"ULONGFORMAT"} state {0x%02X %s}"},
{true, fb_info_pip_errors, "Data page %"ULONGFORMAT" marked as free in PIP (%"ULONGFORMAT":%"ULONGFORMAT")"},
{true, isc_info_ppage_errors, "Data page %"ULONGFORMAT" is not in PP (%"ULONGFORMAT"). Slot (%d) is not found"},
{true, isc_info_ppage_errors, "Data page %"ULONGFORMAT" is not in PP (%"ULONGFORMAT"). Slot (%d) has value %"ULONGFORMAT},
{true, isc_info_ppage_errors, "Pointer page is not found for data page %"ULONGFORMAT". dpg_sequence (%"ULONGFORMAT") is invalid"}
};
Validation::Validation(thread_db* tdbb, UtilSvc* uSvc) :
@ -857,7 +861,9 @@ Validation::Validation(thread_db* tdbb, UtilSvc* uSvc) :
vdr_fixed = 0;
vdr_max_transaction = 0;
vdr_rel_backversion_counter = 0;
vdr_backversion_pages = NULL;
vdr_rel_chain_counter = 0;
vdr_chain_pages = NULL;
vdr_rel_records = NULL;
vdr_idx_records = NULL;
vdr_page_bitmap = NULL;
@ -1548,10 +1554,21 @@ Validation::RTN Validation::walk_chain(jrd_rel* relation, const rhd* header,
vdr_rel_chain_counter++;
data_page* page = 0;
fetch_page(true, page_number, pag_data, &window, &page);
if (page->dpg_relation != relation->rel_id)
{
release_page(&window);
return corrupt(VAL_DATA_PAGE_CONFUSED, relation, page_number, page->dpg_sequence);
}
vdr_rel_chain_counter++;
PBM_SET(vdr_tdbb->getDefaultPool(), &vdr_chain_pages, page_number);
const data_page::dpg_repeat* line = &page->dpg_rpt[line_number];
header = (const rhd*) ((UCHAR*) page + line->dpg_offset);
if (page->dpg_count <= line_number || !line->dpg_length ||
(header->rhd_flags & (rhd_blob | rhd_fragment)) ||
!(header->rhd_flags & rhd_chain) ||
walk_record(relation, header, line->dpg_length,
head_number, delta_flag) != rtn_ok)
{
@ -1754,7 +1771,10 @@ Validation::RTN Validation::walk_data_page(jrd_rel* relation, ULONG page_number,
sequence, (ULONG) (line - page->dpg_rpt));
}
if (header->rhd_flags & rhd_chain)
{
vdr_rel_backversion_counter++;
PBM_SET(vdr_tdbb->getDefaultPool(), &vdr_backversion_pages, page_number);
}
// Record the existance of a primary version of a record
@ -2753,6 +2773,148 @@ Validation::RTN Validation::walk_record(jrd_rel* relation, const rhd* header, US
return rtn_ok;
}
#define DECOMPOSE(n, divisor, q, r) {r = n % divisor; q = n / divisor;}
void restoreFlags(UCHAR* byte, UCHAR flags, bool empty)
{
UCHAR bit = PPG_DP_BIT_MASK(slot, ppg_dp_full);
if (flags & dpg_full)
*byte |= bit;
else
*byte &= ~bit;
bit = PPG_DP_BIT_MASK(slot, ppg_dp_large);
if (flags & dpg_large)
*byte |= bit;
else
*byte &= ~bit;
bit = PPG_DP_BIT_MASK(slot, ppg_dp_swept);
if (flags & dpg_swept)
*byte |= bit;
else
*byte &= ~bit;
bit = PPG_DP_BIT_MASK(slot, ppg_dp_secondary);
if (flags & dpg_secondary)
*byte |= bit;
else
*byte &= ~bit;
bit = PPG_DP_BIT_MASK(slot, ppg_dp_empty);
if (empty)
*byte |= bit;
else
*byte &= ~bit;
}
void Validation::checkDPinPP(jrd_rel* relation, SLONG page_number)
{
/**************************************
*
* Functional description
* Check if data page presented in pointer page.
* If not we try to fix it by setting pointer page slot in the page_number.
* Early in walk_chain we observed that this page in related to the relation so we skip such kind of check here.
**************************************/
WIN window(DB_PAGE_SPACE, page_number);
data_page* dpage;
fetch_page(false, page_number, pag_data, &window, &dpage);
const SLONG sequence = dpage->dpg_sequence;
const bool dpEmpty = (dpage->dpg_count == 0);
release_page(&window);
USHORT slot;
ULONG pp_sequence;
Database* dbb = vdr_tdbb->getDatabase();
DECOMPOSE(sequence, dbb->dbb_dp_per_pp, pp_sequence, slot);
const vcl* vector = relation->getBasePages()->rel_pages;
pointer_page* ppage = 0;
if (pp_sequence < vector->count())
{
fetch_page(false, (*vector)[pp_sequence], pag_pointer, &window, &ppage);
if (slot >= ppage->ppg_count)
{
corrupt(VAL_DATA_PAGE_SLOT_NOT_FOUND, relation, page_number, window.win_page.getPageNum(), slot);
if (vdr_flags & VDR_update && slot < dbb->dbb_dp_per_pp)
{
CCH_MARK(vdr_tdbb, &window);
for (USHORT i = ppage->ppg_count; i < slot; i++)
{
ppage->ppg_page[i] = 0;
// Clear control fields
UCHAR* byte = &PPG_DP_BITS_BYTE((UCHAR*) &ppage->ppg_page[dbb->dbb_dp_per_pp], slot);
*byte = 0;
}
ppage->ppg_page[slot] = page_number;
ppage->ppg_count = slot + 1;
// Restore control fields
UCHAR* byte = &PPG_DP_BITS_BYTE((UCHAR*) &ppage->ppg_page[dbb->dbb_dp_per_pp], slot);
restoreFlags(byte, dpage->dpg_header.pag_flags, dpEmpty);
vdr_fixed++;
}
}
else if (page_number != ppage->ppg_page[slot])
{
corrupt(VAL_DATA_PAGE_SLOT_BAD_VAL, relation, page_number, window.win_page.getPageNum(), slot, ppage->ppg_page[slot]);
if (vdr_flags & VDR_update && !ppage->ppg_page[slot])
{
CCH_MARK(vdr_tdbb, &window);
ppage->ppg_page[slot] = page_number;
// Restore control fields
UCHAR* byte = &PPG_DP_BITS_BYTE((UCHAR*) &ppage->ppg_page[dbb->dbb_dp_per_pp], slot);
restoreFlags(byte, dpage->dpg_header.pag_flags, dpEmpty);
vdr_fixed++;
}
}
}
else
corrupt(VAL_DATA_PAGE_HASNO_PP, relation, page_number, dpage->dpg_sequence);
release_page(&window);
}
void Validation::checkDPinPIP(jrd_rel* relation, SLONG page_number)
{
/**************************************
*
* Functional description
* Check if data page presented in page inventory page.
* If not we try to fix it by clearing the necessary bit on the page.
**************************************/
Database* dbb = vdr_tdbb->getDatabase();
PageManager& pageMgr = dbb->dbb_page_manager;
PageSpace* pageSpace = pageMgr.findPageSpace(DB_PAGE_SPACE);
fb_assert(pageSpace);
const SLONG sequence = page_number / pageMgr.pagesPerPIP;
const SLONG relative_bit = page_number % pageMgr.pagesPerPIP;
WIN pip_window(DB_PAGE_SPACE, (sequence == 0) ? pageSpace->pipFirst : sequence * pageMgr.pagesPerPIP - 1);
page_inv_page* pages;
fetch_page(false, pip_window.win_page.getPageNum(), pag_pages, &pip_window, &pages);
if (pages->pip_bits[relative_bit >> 3] & (1 << (relative_bit & 7)))
{
corrupt(VAL_DATA_PAGE_ISNT_IN_PIP, relation, page_number, pip_window.win_page.getPageNum(), relative_bit);
if (vdr_flags & VDR_update)
{
CCH_MARK(vdr_tdbb, &pip_window);
pages->pip_bits[relative_bit >> 3] &= ~(1 << (relative_bit & 7));
vdr_fixed++;
}
}
release_page(&pip_window);
}
Validation::RTN Validation::walk_relation(jrd_rel* relation)
{
@ -2823,7 +2985,9 @@ Validation::RTN Validation::walk_relation(jrd_rel* relation)
// Walk pointer and selected data pages associated with relation
vdr_rel_backversion_counter = 0;
PageBitmap::reset(vdr_backversion_pages);
vdr_rel_chain_counter = 0;
PageBitmap::reset(vdr_chain_pages);
RecordBitmap::reset(vdr_rel_records);
for (ULONG sequence = 0; true; sequence++)
@ -2845,9 +3009,45 @@ Validation::RTN Validation::walk_relation(jrd_rel* relation)
lckGC.release();
// Compare backversion pages and chain pages
if ((vdr_flags & VDR_records) &&
vdr_chain_pages && (vdr_rel_chain_counter > vdr_rel_backversion_counter))
{
if (vdr_backversion_pages)
{
PageBitmap::Accessor c(vdr_chain_pages);
PageBitmap::Accessor b(vdr_backversion_pages);
if (c.getFirst() && b.getFirst()) {
for (bool next = true; next; next = c.getNext() )
{
if (c.current() == b.current())
b.getNext();
else if ((c.current() < b.current()) || !b.getNext()) {
//fprintf(stdout, "chain page was visited not via data pages %d\n", c.current());
checkDPinPP(relation, c.current());
checkDPinPIP(relation, c.current());
}
}
}
}
else
{
PageBitmap::Accessor c(vdr_chain_pages);
if (c.getFirst())
{
for (bool next = true; next; next = c.getNext() )
{
//fprintf(stdout, "chain page was visited not via data pages %d\n", c.current());
checkDPinPP(relation, c.current());
checkDPinPIP(relation, c.current());
}
}
}
}
// See if the counts of backversions match
if ((vdr_flags & VDR_records) &&
(vdr_rel_backversion_counter != vdr_rel_chain_counter))
(vdr_rel_backversion_counter > vdr_rel_chain_counter))
{
return corrupt(VAL_REL_CHAIN_ORPHANS, relation,
vdr_rel_backversion_counter - vdr_rel_chain_counter, vdr_rel_chain_counter);

View File

@ -115,8 +115,12 @@ private:
VAL_PIP_WRONG_EXTENT = 32,
VAL_PIP_WRONG_USED = 33,
VAL_P_PAGE_WRONG_BITS = 34,
VAL_DATA_PAGE_ISNT_IN_PIP = 35,
VAL_DATA_PAGE_SLOT_NOT_FOUND= 36,
VAL_DATA_PAGE_SLOT_BAD_VAL = 37,
VAL_DATA_PAGE_HASNO_PP = 38,
VAL_MAX_ERROR = 35
VAL_MAX_ERROR = 39
};
struct MSG_ENTRY
@ -136,7 +140,9 @@ private:
int vdr_fixed;
TraNumber vdr_max_transaction;
FB_UINT64 vdr_rel_backversion_counter; // Counts slots w/rhd_chain
PageBitmap* vdr_backversion_pages; // 1 bit per visited table page
FB_UINT64 vdr_rel_chain_counter; // Counts chains w/rdr_chain
PageBitmap* vdr_chain_pages; // 1 bit per visited record chain page
RecordBitmap* vdr_rel_records; // 1 bit per valid record
RecordBitmap* vdr_idx_records; // 1 bit per index item
PageBitmap* vdr_page_bitmap;
@ -148,6 +154,8 @@ private:
PatternMatcher* vdr_idx_incl;
PatternMatcher* vdr_idx_excl;
int vdr_lock_tout;
void checkDPinPP(jrd_rel *relation, SLONG page_number);
void checkDPinPIP(jrd_rel *relation, SLONG page_number);
public:
explicit Validation(thread_db*, Firebird::UtilSvc* uSvc = NULL);
@ -180,7 +188,7 @@ private:
void cleanup();
RTN corrupt(int, const jrd_rel*, ...);
FETCH_CODE fetch_page(bool validate, ULONG, USHORT, WIN*, void*);
FETCH_CODE fetch_page(bool mark, ULONG, USHORT, WIN*, void*);
void release_page(WIN*);
void garbage_collect();