8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-23 02:03:04 +01:00

Backport #8069 : Add missing synchronization to cached vectors of known pages

This commit is contained in:
Dmitry Yemanov 2024-04-04 17:53:56 +03:00 committed by Vlad Khorsun
parent 87240e8522
commit ee7f0730b2
6 changed files with 128 additions and 47 deletions

View File

@ -483,6 +483,65 @@ namespace Jrd
dbb_tip_cache = cache;
}
// Methods encapsulating operations with vectors of known pages
ULONG Database::getKnownPagesCount(SCHAR ptype)
{
fb_assert(ptype == pag_transactions || ptype == pag_ids);
SyncLockGuard guard(&dbb_pages_sync, SYNC_SHARED, FB_FUNCTION);
const auto vector =
(ptype == pag_transactions) ? dbb_tip_pages :
(ptype == pag_ids) ? dbb_gen_pages :
nullptr;
return vector ? (ULONG) vector->count() : 0;
}
ULONG Database::getKnownPage(SCHAR ptype, ULONG sequence)
{
fb_assert(ptype == pag_transactions || ptype == pag_ids);
SyncLockGuard guard(&dbb_pages_sync, SYNC_SHARED, FB_FUNCTION);
const auto vector =
(ptype == pag_transactions) ? dbb_tip_pages :
(ptype == pag_ids) ? dbb_gen_pages :
nullptr;
if (!vector || sequence >= vector->count())
return 0;
return (*vector)[sequence];
}
void Database::setKnownPage(SCHAR ptype, ULONG sequence, ULONG value)
{
fb_assert(ptype == pag_transactions || ptype == pag_ids);
SyncLockGuard guard(&dbb_pages_sync, SYNC_EXCLUSIVE, FB_FUNCTION);
auto& rvector = (ptype == pag_transactions) ? dbb_tip_pages : dbb_gen_pages;
rvector = vcl::newVector(*dbb_permanent, rvector, sequence + 1);
(*rvector)[sequence] = value;
}
void Database::copyKnownPages(SCHAR ptype, ULONG count, ULONG* data)
{
fb_assert(ptype == pag_transactions || ptype == pag_ids);
SyncLockGuard guard(&dbb_pages_sync, SYNC_EXCLUSIVE, FB_FUNCTION);
auto& rvector = (ptype == pag_transactions) ? dbb_tip_pages : dbb_gen_pages;
rvector = vcl::newVector(*dbb_permanent, rvector, count);
memcpy(rvector->memPtr(), data, count * sizeof(ULONG));
}
// Database::Linger class implementation
void Database::Linger::handler()

View File

@ -458,8 +458,6 @@ public:
Lock* dbb_retaining_lock; // lock for preserving commit retaining snapshot
PageManager dbb_page_manager;
vcl* dbb_t_pages; // pages number for transactions
vcl* dbb_gen_id_pages; // known pages for gen_id
BlobFilter* dbb_blob_filters; // known blob filters
MonitoringData* dbb_monitoring_data; // monitoring data
@ -470,6 +468,10 @@ private:
Firebird::SyncObject dbb_modules_sync;
DatabaseModules dbb_modules; // external function/filter modules
// Vectors of known pages and their synchronization
Firebird::SyncObject dbb_pages_sync; // guard access to dbb_XXX_pages vectors
vcl* dbb_tip_pages; // known TIP pages
vcl* dbb_gen_pages; // known generator pages
public:
Firebird::AutoPtr<ExtEngineManager> dbb_extManager; // external engine manager
@ -591,6 +593,12 @@ public:
return (dbb_replica_mode == mode);
}
// Methods encapsulating operations with vectors of known pages
ULONG getKnownPagesCount(SCHAR ptype);
ULONG getKnownPage(SCHAR ptype, ULONG sequence);
void setKnownPage(SCHAR ptype, ULONG sequence, ULONG value);
void copyKnownPages(SCHAR ptype, ULONG count, ULONG* data);
private:
Database(MemoryPool* p, Firebird::IPluginConfig* pConf, bool shared)
: dbb_permanent(p),

View File

@ -1347,26 +1347,27 @@ SINT64 DPM_gen_id(thread_db* tdbb, SLONG generator, bool initialize, SINT64 val)
const USHORT offset = generator % dbb->dbb_page_manager.gensPerPage;
WIN window(DB_PAGE_SPACE, -1);
vcl* vector = dbb->dbb_gen_id_pages;
if (!vector || (sequence >= vector->count()) || !((*vector)[sequence]))
ULONG pageNumber = dbb->getKnownPage(pag_ids, sequence);
if (!pageNumber)
{
DPM_scan_pages(tdbb);
if (!(vector = dbb->dbb_gen_id_pages) ||
(sequence >= vector->count()) || !((*vector)[sequence]))
pageNumber = dbb->getKnownPage(pag_ids, sequence);
if (!pageNumber)
{
generator_page* page = (generator_page*) DPM_allocate(tdbb, &window);
page->gpg_header.pag_type = pag_ids;
page->gpg_sequence = sequence;
CCH_must_write(tdbb, &window);
CCH_RELEASE(tdbb, &window);
DPM_pages(tdbb, 0, pag_ids, (ULONG) sequence, window.win_page.getPageNum());
vector = dbb->dbb_gen_id_pages =
vcl::newVector(*dbb->dbb_permanent, dbb->dbb_gen_id_pages, sequence + 1);
(*vector)[sequence] = window.win_page.getPageNum();
pageNumber = window.win_page.getPageNum();
dbb->setKnownPage(pag_ids, sequence, pageNumber);
DPM_pages(tdbb, 0, pag_ids, sequence, pageNumber);
}
}
window.win_page = (*vector)[sequence];
window.win_page = pageNumber;
window.win_flags = 0;
// As a special exception that allows physical backups for read-only replicas,
@ -2038,6 +2039,9 @@ void DPM_scan_pages( thread_db* tdbb)
CCH_RELEASE(tdbb, &window);
HalfStaticArray<ULONG, 256> tipSeqList;
HalfStaticArray<ULONG, 4> genSeqList;
AutoCacheRequest request(tdbb, irq_r_pages, IRQ_REQUESTS);
FOR(REQUEST_HANDLE request) X IN RDB$PAGES
@ -2045,33 +2049,41 @@ void DPM_scan_pages( thread_db* tdbb)
relation = MET_relation(tdbb, X.RDB$RELATION_ID);
relPages = relation->getBasePages();
sequence = X.RDB$PAGE_SEQUENCE;
MemoryPool* pool = dbb->dbb_permanent;
switch (X.RDB$PAGE_TYPE)
{
case pag_root:
relPages->rel_index_root = X.RDB$PAGE_NUMBER;
continue;
break;
case pag_pointer:
address = &relPages->rel_pages;
pool = relation->rel_pool;
relPages->rel_pages = vcl::newVector(*relation->rel_pool, relPages->rel_pages, sequence + 1);
(*relPages->rel_pages)[sequence] = X.RDB$PAGE_NUMBER;
break;
case pag_transactions:
address = &dbb->dbb_t_pages;
if (sequence >= tipSeqList.getCount())
tipSeqList.resize(sequence + 1);
tipSeqList[sequence] = X.RDB$PAGE_NUMBER;
break;
case pag_ids:
address = &dbb->dbb_gen_id_pages;
if (sequence >= genSeqList.getCount())
genSeqList.resize(sequence + 1);
genSeqList[sequence] = X.RDB$PAGE_NUMBER;
break;
default:
CORRUPT(257); // msg 257 bad record in RDB$PAGES
}
vector = *address = vcl::newVector(*pool, *address, sequence + 1);
(*vector)[sequence] = X.RDB$PAGE_NUMBER;
}
END_FOR
if (const auto count = tipSeqList.getCount())
dbb->copyKnownPages(pag_transactions, count, tipSeqList.begin());
if (const auto count = genSeqList.getCount())
dbb->copyKnownPages(pag_ids, count, genSeqList.begin());
}

View File

@ -3082,7 +3082,7 @@ JAttachment* JProvider::createDatabase(CheckStatusWrapper* user_status, const ch
INI_format(attachment->getUserName().c_str(), options.dpb_set_db_charset.c_str());
// If we have not allocated first TIP page, do it now.
if (!dbb->dbb_t_pages || !dbb->dbb_t_pages->count())
if (!dbb->getKnownPagesCount(pag_transactions))
TRA_extend_tip(tdbb, 0);
// There is no point to move database online at database creation since it is online by default.

View File

@ -590,24 +590,24 @@ void TRA_extend_tip(thread_db* tdbb, ULONG sequence) //, WIN* precedence_window)
CCH_must_write(tdbb, &window);
CCH_RELEASE(tdbb, &window);
const ULONG pageNumber = window.win_page.getPageNum();
// Release prior page
if (sequence)
{
CCH_MARK_MUST_WRITE(tdbb, &prior_window);
prior_tip->tip_next = window.win_page.getPageNum();
prior_tip->tip_next = pageNumber;
CCH_RELEASE(tdbb, &prior_window);
}
// Link into internal data structures
vcl* vector = dbb->dbb_t_pages =
vcl::newVector(*dbb->dbb_permanent, dbb->dbb_t_pages, sequence + 1);
(*vector)[sequence] = window.win_page.getPageNum();
dbb->setKnownPage(pag_transactions, sequence, pageNumber);
// Write into pages relation
DPM_pages(tdbb, 0, pag_transactions, sequence, window.win_page.getPageNum());
DPM_pages(tdbb, 0, pag_transactions, sequence, pageNumber);
}
@ -2366,19 +2366,21 @@ static ULONG inventory_page(thread_db* tdbb, ULONG sequence)
Database* dbb = tdbb->getDatabase();
CHECK_DBB(dbb);
WIN window(DB_PAGE_SPACE, -1);
vcl* vector = dbb->dbb_t_pages;
while (!vector || sequence >= vector->count())
if (const ULONG pageno = dbb->getKnownPage(pag_transactions, sequence))
return pageno;
while (sequence >= dbb->getKnownPagesCount(pag_transactions))
{
DPM_scan_pages(tdbb);
if ((vector = dbb->dbb_t_pages) && sequence < vector->count())
const ULONG tipCount = dbb->getKnownPagesCount(pag_transactions);
if (sequence < tipCount)
break;
if (!vector)
if (!tipCount)
BUGCHECK(165); // msg 165 cannot find tip page
window.win_page = (*vector)[vector->count() - 1];
WIN window(DB_PAGE_SPACE, dbb->getKnownPage(pag_transactions, tipCount - 1));
tx_inv_page* tip = (tx_inv_page*) CCH_FETCH(tdbb, &window, LCK_read, pag_transactions);
const ULONG next = tip->tip_next;
CCH_RELEASE(tdbb, &window);
@ -2388,10 +2390,10 @@ static ULONG inventory_page(thread_db* tdbb, ULONG sequence)
// Type check it
tip = (tx_inv_page*) CCH_FETCH(tdbb, &window, LCK_read, pag_transactions);
CCH_RELEASE(tdbb, &window);
DPM_pages(tdbb, 0, pag_transactions, vector->count(), window.win_page.getPageNum());
DPM_pages(tdbb, 0, pag_transactions, tipCount, window.win_page.getPageNum());
}
return (*vector)[sequence];
return dbb->getKnownPage(pag_transactions, sequence);
}

View File

@ -1871,21 +1871,19 @@ void Validation::walk_generators()
WIN window(DB_PAGE_SPACE, -1);
vcl* vector = dbb->dbb_gen_id_pages;
if (vector)
if (const auto idsCount = dbb->getKnownPagesCount(pag_ids))
{
vcl::iterator ptr, end;
for (ptr = vector->begin(), end = vector->end(); ptr < end; ++ptr)
for (ULONG sequence = 0; sequence < idsCount; sequence++)
{
if (*ptr)
if (const auto pageNumber = dbb->getKnownPage(pag_ids, sequence))
{
#ifdef DEBUG_VAL_VERBOSE
if (VAL_debug_level)
fprintf(stdout, "walk_generator: page %d\n", *ptr);
fprintf(stdout, "walk_generator: page %d\n", pageNumber);
#endif
// It doesn't make a difference generator_page or pointer_page because it's not used.
generator_page* page = NULL;
fetch_page(true, *ptr, pag_ids, &window, &page);
fetch_page(true, pageNumber, pag_ids, &window, &page);
release_page(&window);
}
}
@ -3174,8 +3172,7 @@ Validation::RTN Validation::walk_tip(TraNumber transaction)
**************************************/
Database* dbb = vdr_tdbb->getDatabase();
const vcl* vector = dbb->dbb_t_pages;
if (!vector)
if (!dbb->getKnownPagesCount(pag_transactions))
return corrupt(VAL_TIP_LOST, 0);
tx_inv_page* page = 0;
@ -3183,25 +3180,28 @@ Validation::RTN Validation::walk_tip(TraNumber transaction)
for (ULONG sequence = 0; sequence <= pages; sequence++)
{
if (!(*vector)[sequence] || sequence >= vector->count())
auto pageNumber = dbb->getKnownPage(pag_transactions, sequence);
if (!pageNumber)
{
corrupt(VAL_TIP_LOST_SEQUENCE, 0, sequence);
if (!(vdr_flags & VDR_repair))
continue;
TRA_extend_tip(vdr_tdbb, sequence);
vector = dbb->dbb_t_pages;
vdr_fixed++;
pageNumber = dbb->getKnownPage(pag_transactions, sequence);
}
WIN window(DB_PAGE_SPACE, -1);
fetch_page(true, (*vector)[sequence], pag_transactions, &window, &page);
fetch_page(true, pageNumber, pag_transactions, &window, &page);
#ifdef DEBUG_VAL_VERBOSE
if (VAL_debug_level)
fprintf(stdout, "walk_tip: page %d next %d\n", (*vector)[sequence], page->tip_next);
fprintf(stdout, "walk_tip: page %d next %d\n", pageNumber, page->tip_next);
#endif
if (page->tip_next && page->tip_next != (*vector)[sequence + 1])
const auto next = dbb->getKnownPage(pag_transactions, sequence + 1);
if (page->tip_next && page->tip_next != next)
{
corrupt(VAL_TIP_CONFUSED, 0, sequence);
}