8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 17:23:03 +01:00

Feature CORE-4707 : Implement ability to validate tables and indices online

This commit is contained in:
hvlad 2015-06-19 12:07:41 +00:00
parent f1946281ca
commit 2e78b5a7e0
19 changed files with 1208 additions and 89 deletions

View File

@ -0,0 +1,90 @@
Database validation allows to run low-level checks of consistency of on-disk
structures and even to fix some minor corruptions. It is recommended procedure
for any valuable database, i.e. DBA should validate database from time to time
to make sure it is healthy. But validation process requires exclusive access to
database, i.e. it forbids any kind of concurrent access to database while
validation runs. It could be a big problem to stop user access, especially when
database is large and validation takes notable amount of time.
Online validation is a new feature which allows to perform some consistency
checks without exclusive access to database. Online validation allows to:
- validate some (or all) user tables in database
- validate some (or all) indices
- system tables are not validated
- other ODS checks (such as Header\PIP\TIP\Generators pages) are not run by
online validation
- while table (and\or its index) is validated user attachments are allowed to
read this table. Attempt to INSERT\UPDATE\DELETE will wait until validation
finished or will return lock timeout error (depends on lock timeout of user
transaction)
- while table (and\or its index) is validated any kind of garbage collection at
this table is disabled - background and cooperative garbage collection will
just skip this table, sweep will be terminated with error.
When online validation starts to check table it makes few actions to prevent
concurrent modifications of table's data:
- acquires relation lock in PR (protected read) mode
- acquires (new) garbage collection lock in PW (protected write) mode.
Both locks are acquired using user-specified lock timeout. If any lock request
fails error is reported and table is skipped.
Then table and its indices are validated in the same way as full validation does.
Then locks are released and next table is validated.
Online validation is implemented as Firebird service and accessible via Services
API. Therefore gfix utility can't run online validation. fbsvcmgr utility has
full support for new service, syntax is:
fbsvcmgr [host:]service_mgr [user <...>] [password <...>]
action_validate dbname <filename>
[val_tab_incl <pattern>]
[val_tab_excl <pattern>]
[val_idx_incl <pattern>]
[val_idx_excl <pattern>]
[val_lock_timeout <number>]
where
val_tab_incl pattern for tables names to include in validation run
val_tab_excl pattern for tables names to exclude from validation run
val_idx_incl pattern for indices names to include in validation run,
by default %, i.e. all indices
val_idx_excl pattern for indices names to exclude from validation run
val_lock_timeout lock timeout, used to acquire locks for table to validate,
in seconds, default is 10 sec
0 is no-wait
-1 is infinite wait
Patterns are regular expressions, they are processed by the same rules as
"SIMILAR TO" expressions. All patterns are case-sensitive (despite of database
dialect!).
If pattern for tables is omitted then all user tables will be validated.
If pattern for indices is omitted then all indices of tables to validate will
be validated.
System tables are not validated.
Examples:
1. fbsvcmgr.exe service_mgr user SYSDBA password masterkey
action_validate dbname c:\db.fdb
val_tab_incl A%
val_idx_excl %
val_lock_timeout 0
this command will validate all tables in database "c:\db.fdb" with names
starting with "A". Indices are not validated. Lock wait is not performed.
2. fbsvcmgr.exe service_mgr user SYSDBA password masterkey
action_validate dbname c:\db.fdb
val_tab_incl "TAB1|TAB2"
this command will validate tables TAB1 and TAB2 and all their indices.
Lock wait timeout is 10 sec.
Note, to specify list of tables\indices it is necessary to:
a) separate names by character "|"
b) don't use spaces : TAB1 | TAB2 is wrong
c) whole list should be enclosed in double quotes to not confuse command
interpreter
Vlad Khorsun, <hvlad at users sourceforge net>

View File

@ -251,6 +251,7 @@ IntlParametersBlock::TagType IntlSpbStart::checkTag(UCHAR tag, const char** tagN
case isc_action_svc_nrest:
case isc_action_svc_trace_start:
case isc_action_svc_db_stats:
case isc_action_svc_validate:
mode = tag;
break;
}
@ -316,6 +317,18 @@ IntlParametersBlock::TagType IntlSpbStart::checkTag(UCHAR tag, const char** tagN
return TAG_COMMAND_LINE;
}
break;
case isc_action_svc_validate:
switch (tag)
{
FB_IPB_TAG(isc_spb_dbname);
FB_IPB_TAG(isc_spb_val_tab_incl);
FB_IPB_TAG(isc_spb_val_tab_excl);
FB_IPB_TAG(isc_spb_val_idx_incl);
FB_IPB_TAG(isc_spb_val_idx_excl);
return TAG_STRING;
}
break;
}
return TAG_SKIP;

View File

@ -454,6 +454,19 @@ ClumpletReader::ClumpletType ClumpletReader::getClumpletType(UCHAR tag) const
return IntSpb;
}
break;
case isc_action_svc_validate:
switch(tag)
{
case isc_spb_val_tab_incl:
case isc_spb_val_tab_excl:
case isc_spb_val_idx_incl:
case isc_spb_val_idx_excl:
case isc_spb_dbname:
return StringSpb;
case isc_spb_val_lock_timeout:
return IntSpb;
}
break;
}
invalid_structure("wrong spb state");
break;

View File

@ -48,10 +48,10 @@ bool SyncObject::lock(Sync* sync, SyncType type, const char* from, int timeOut)
if (type == SYNC_SHARED)
{
// In Vulcan SyncObject locking is not fair. Shared locks have priority
// before Exclusive locks. If we'll need to restore this behavior we
// should replace loop condition below by:
// while (true)
while (waiters == 0)
// before Exclusive locks. To change this behavior we should replace
// loop condition below by:
//while (waiters == 0) // activate to make locking fair
while (true)
{
const AtomicCounter::counter_type oldState = lockState;
if (oldState < 0)
@ -75,8 +75,8 @@ bool SyncObject::lock(Sync* sync, SyncType type, const char* from, int timeOut)
mutex.enter(FB_FUNCTION);
++waiters;
//while (true)
while (!waitingThreads)
//while (!waitingThreads) // activate to make locking fair
while (true)
{
const AtomicCounter::counter_type oldState = lockState;
if (oldState < 0)

View File

@ -320,7 +320,8 @@
#define isc_action_svc_set_mapping 27 // Set auto admins mapping in security database
#define isc_action_svc_drop_mapping 28 // Drop auto admins mapping in security database
#define isc_action_svc_display_user_adm 29 // Displays user(s) from security database with admin info
#define isc_action_svc_last 30 // keep it last !
#define isc_action_svc_validate 30 // Starts database online validation
#define isc_action_svc_last 31 // keep it last !
/*****************************
* Service information items *
@ -496,6 +497,16 @@
#define isc_spb_res_create 0x2000
#define isc_spb_res_use_all_space 0x4000
/*****************************************
* Parameters for isc_action_svc_validate *
*****************************************/
#define isc_spb_val_tab_incl 1 // include filter based on regular expression
#define isc_spb_val_tab_excl 2 // exclude filter based on regular expression
#define isc_spb_val_idx_incl 3 // regexp of indices to validate
#define isc_spb_val_idx_excl 4 // regexp of indices to NOT validate
#define isc_spb_val_lock_timeout 5 // how long to wait for table lock
/******************************************
* Parameters for isc_spb_res_access_mode *
******************************************/

View File

@ -484,6 +484,12 @@ void Jrd::Attachment::releaseLocks(thread_db* tdbb)
relation->rel_flags &= ~REL_scanned;
}
if (relation->rel_gc_lock)
{
LCK_release(tdbb, relation->rel_gc_lock);
relation->rel_flags |= REL_gc_lockneed;
}
for (IndexLock* index = relation->rel_index_locks; index; index = index->idl_next)
{
if (index->idl_lock)

View File

@ -27,6 +27,7 @@
#include "../jrd/btr_proto.h"
#include "../jrd/dpm_proto.h"
#include "../jrd/idx_proto.h"
#include "../jrd/lck_proto.h"
#include "../jrd/met_proto.h"
#include "../jrd/pag_proto.h"
#include "../jrd/vio_debug.h"
@ -300,6 +301,222 @@ bool jrd_rel::hasTriggers() const
return false;
}
Lock* jrd_rel::createLock(thread_db* tdbb, MemoryPool* pool, jrd_rel* relation, lck_t lckType, bool noAst)
{
if (!pool)
pool = relation->rel_pool;
const USHORT relLockLen = relation->getRelLockKeyLength();
Lock* lock = FB_NEW_RPT(*pool, relLockLen) Lock(tdbb, relLockLen, lckType, relation);
relation->getRelLockKey(tdbb, &lock->lck_key.lck_string[0]);
lock->lck_type = lckType;
switch (lckType)
{
case LCK_relation:
break;
case LCK_rel_gc:
lock->lck_ast = noAst ? NULL : blocking_ast_gcLock;
break;
default:
fb_assert(false);
}
return lock;
}
bool jrd_rel::acquireGCLock(thread_db* tdbb, int wait)
{
fb_assert(rel_flags & REL_gc_lockneed);
if (!(rel_flags & REL_gc_lockneed))
{
fb_assert(rel_gc_lock->lck_id);
fb_assert(rel_gc_lock->lck_physical == (rel_flags & REL_gc_disabled ? LCK_SR : LCK_SW));
return true;
}
if (!rel_gc_lock)
rel_gc_lock = createLock(tdbb, NULL, this, LCK_rel_gc, false);
fb_assert(!rel_gc_lock->lck_id);
fb_assert(!(rel_flags & REL_gc_blocking));
ThreadStatusGuard temp_status(tdbb);
const USHORT level = (rel_flags & REL_gc_disabled) ? LCK_SR : LCK_SW;
bool ret = LCK_lock(tdbb, rel_gc_lock, level, wait);
if (!ret && (level == LCK_SW))
{
rel_flags |= REL_gc_disabled;
ret = LCK_lock(tdbb, rel_gc_lock, LCK_SR, wait);
if (!ret)
rel_flags &= ~REL_gc_disabled;
}
if (ret)
rel_flags &= ~REL_gc_lockneed;
return ret;
}
void jrd_rel::downgradeGCLock(thread_db* tdbb)
{
if (!rel_sweep_count && (rel_flags & REL_gc_blocking))
{
fb_assert(!(rel_flags & REL_gc_lockneed));
fb_assert(rel_gc_lock->lck_id);
fb_assert(rel_gc_lock->lck_physical == LCK_SW);
rel_flags &= ~REL_gc_blocking;
rel_flags |= REL_gc_disabled;
LCK_downgrade(tdbb, rel_gc_lock);
if (rel_gc_lock->lck_physical != LCK_SR)
{
rel_flags &= ~REL_gc_disabled;
if (rel_gc_lock->lck_physical < LCK_SR)
rel_flags |= REL_gc_lockneed;
}
}
}
int jrd_rel::blocking_ast_gcLock(void* ast_object)
{
/****
SR - gc forbidden, awaiting moment to re-establish SW lock
SW - gc allowed, usual state
PW - gc allowed to the one connection only
****/
jrd_rel* relation = static_cast<jrd_rel*>(ast_object);
try
{
Lock* lock = relation->rel_gc_lock;
Database* dbb = lock->lck_dbb;
AsyncContextHolder tdbb(dbb, FB_FUNCTION);
fb_assert(!(relation->rel_flags & REL_gc_lockneed));
if (relation->rel_flags & REL_gc_lockneed) // work already done syncronously ?
return 0;
relation->rel_flags |= REL_gc_blocking;
if (relation->rel_sweep_count)
return 0;
if (relation->rel_flags & REL_gc_disabled)
{
// someone acquired EX lock
fb_assert(lock->lck_id);
fb_assert(lock->lck_physical == LCK_SR);
LCK_release(tdbb, lock);
relation->rel_flags &= ~(REL_gc_disabled | REL_gc_blocking);
relation->rel_flags |= REL_gc_lockneed;
}
else
{
// someone acquired PW lock
fb_assert(lock->lck_id);
fb_assert(lock->lck_physical == LCK_SW);
relation->rel_flags |= REL_gc_disabled;
relation->downgradeGCLock(tdbb);
}
}
catch (const Firebird::Exception&)
{} // no-op
return 0;
}
/// jrd_rel::GCExclusive
jrd_rel::GCExclusive::GCExclusive(thread_db* tdbb, jrd_rel* relation) :
m_tdbb(tdbb),
m_relation(relation),
m_lock(NULL)
{
}
jrd_rel::GCExclusive::~GCExclusive()
{
release();
delete m_lock;
}
bool jrd_rel::GCExclusive::acquire(int wait)
{
// if validation is already running - go out
if (m_relation->rel_flags & REL_gc_disabled)
return false;
ThreadStatusGuard temp_status(m_tdbb);
m_relation->rel_flags |= REL_gc_disabled;
int sleeps = -wait * 10;
while (m_relation->rel_sweep_count)
{
Attachment::Checkout cout(m_tdbb->getAttachment(), FB_FUNCTION);
Thread::sleep(100);
if (wait < 0 && --sleeps == 0)
break;
}
if (m_relation->rel_sweep_count)
{
m_relation->rel_flags &= ~REL_gc_disabled;
return false;
}
if (!(m_relation->rel_flags & REL_gc_lockneed))
{
m_relation->rel_flags |= REL_gc_lockneed;
LCK_release(m_tdbb, m_relation->rel_gc_lock);
}
// we need no AST here
if (!m_lock)
m_lock = jrd_rel::createLock(m_tdbb, NULL, m_relation, LCK_rel_gc, true);
const bool ret = LCK_lock(m_tdbb, m_lock, LCK_PW, wait);
if (!ret)
m_relation->rel_flags &= ~REL_gc_disabled;
return ret;
}
void jrd_rel::GCExclusive::release()
{
if (!m_lock || !m_lock->lck_id)
return;
fb_assert(m_relation->rel_flags & REL_gc_disabled);
if (!(m_relation->rel_flags & REL_gc_lockneed))
{
m_relation->rel_flags |= REL_gc_lockneed;
LCK_release(m_tdbb, m_relation->rel_gc_lock);
}
LCK_convert(m_tdbb, m_lock, LCK_EX, LCK_WAIT);
m_relation->rel_flags &= ~REL_gc_disabled;
LCK_release(m_tdbb, m_lock);
}
/// RelationPages
void RelationPages::free(RelationPages*& nextFree)
{
rel_next_free = nextFree;

View File

@ -23,6 +23,7 @@
#define JRD_RELATION_H
#include "../jrd/jrd.h"
#include "../jrd/lck.h"
#include "../jrd/pag.h"
#include "../jrd/val.h"
#include "../jrd/Attachment.h"
@ -166,6 +167,7 @@ public:
Lock* rel_existence_lock; // existence lock, if any
Lock* rel_partners_lock; // partners lock
Lock* rel_rescan_lock; // lock forcing relation to be scanned
Lock* rel_gc_lock; // garbage collection lock
IndexLock* rel_index_locks; // index existence locks
IndexBlock* rel_index_blocks; // index blocks for caching index info
trig_vec* rel_pre_erase; // Pre-operation erase trigger
@ -240,12 +242,50 @@ private:
RelationPages* getPagesInternal(thread_db* tdbb, TraNumber tran, bool allocPages);
public:
explicit jrd_rel(MemoryPool& p)
: rel_pool(&p), rel_name(p), rel_owner_name(p),
rel_view_contexts(p), rel_security_name(p), rel_gc_records(p)
{}
explicit jrd_rel(MemoryPool& p);
bool hasTriggers() const;
static Lock* createLock(thread_db* tdbb, MemoryPool* pool, jrd_rel* relation, lck_t, bool);
static int blocking_ast_gcLock(void*);
void downgradeGCLock(thread_db* tdbb);
bool acquireGCLock(thread_db* tdbb, int wait);
// This guard is used by regular code to prevent online validation while
// dead- or back- versions is removed from disk.
class GCShared
{
public:
GCShared(thread_db* tdbb, jrd_rel* relation);
~GCShared();
bool gcEnabled() const
{
return m_gcEnabled;
}
private:
thread_db* m_tdbb;
jrd_rel* m_relation;
bool m_gcEnabled;
};
// This guard is used by online validation to prevent any modifications of
// table data while it is checked.
class GCExclusive
{
public:
GCExclusive(thread_db* tdbb, jrd_rel* relation);
~GCExclusive();
bool acquire(int wait);
void release();
private:
thread_db* m_tdbb;
jrd_rel* m_relation;
Lock* m_lock;
};
};
// rel_flags
@ -267,8 +307,19 @@ const ULONG REL_temp_tran = 0x2000; // relation is a GTT delete rows
const ULONG REL_temp_conn = 0x4000; // relation is a GTT preserve rows
const ULONG REL_virtual = 0x8000; // relation is virtual
const ULONG REL_jrd_view = 0x10000; // relation is VIEW
const ULONG REL_gc_blocking = 0x20000; // request to downgrade\release gc lock
const ULONG REL_gc_disabled = 0x40000; // gc is disabled temporarily
const ULONG REL_gc_lockneed = 0x80000; // gc lock should be acquired
/// class jrd_rel
inline jrd_rel::jrd_rel(MemoryPool& p)
: rel_pool(&p), rel_flags(REL_gc_lockneed), rel_name(p), rel_owner_name(p),
rel_view_contexts(p), rel_security_name(p), rel_gc_records(p)
{
}
inline bool jrd_rel::isSystem() const
{
return rel_flags & REL_system;
@ -297,6 +348,39 @@ inline RelationPages* jrd_rel::getPages(thread_db* tdbb, TraNumber tran, bool al
return getPagesInternal(tdbb, tran, allocPages);
}
/// class jrd_rel::GCShared
inline jrd_rel::GCShared::GCShared(thread_db* tdbb, jrd_rel* relation) :
m_tdbb(tdbb),
m_relation(relation),
m_gcEnabled(false)
{
if (m_relation->rel_flags & (REL_gc_blocking | REL_gc_disabled))
return;
if (m_relation->rel_flags & REL_gc_lockneed)
m_relation->acquireGCLock(tdbb, LCK_NO_WAIT);
if (!(m_relation->rel_flags & (REL_gc_blocking | REL_gc_disabled | REL_gc_lockneed)))
{
++m_relation->rel_sweep_count;
m_gcEnabled = true;
}
if ((m_relation->rel_flags & REL_gc_blocking) && !m_relation->rel_sweep_count)
m_relation->downgradeGCLock(m_tdbb);
}
inline jrd_rel::GCShared::~GCShared()
{
if (m_gcEnabled)
--m_relation->rel_sweep_count;
if ((m_relation->rel_flags & REL_gc_blocking) && !m_relation->rel_sweep_count)
m_relation->downgradeGCLock(m_tdbb);
}
// Field block, one for each field in a scanned relation
class jrd_fld : public pool_alloc<type_fld>

View File

@ -241,23 +241,31 @@ public:
m_save_lock(NULL)
{
Jrd::Attachment* att = m_tdbb->getAttachment();
m_save_lock = att ? att->att_wait_lock : NULL;
if (att)
m_save_lock = att->att_wait_lock;
m_cancel_disabled = (m_tdbb->tdbb_flags & TDBB_wait_cancel_disable);
m_tdbb->tdbb_flags |= TDBB_wait_cancel_disable;
if (wait == LCK_WAIT)
{
switch (lock->lck_type)
{
case LCK_tra:
m_tdbb->tdbb_flags &= ~TDBB_wait_cancel_disable;
if (att)
att->att_wait_lock = lock;
break;
if (!wait)
return;
if (lock->lck_type == LCK_tra)
default:
m_tdbb->tdbb_flags |= TDBB_wait_cancel_disable;
}
}
else
{
fb_assert(att);
m_tdbb->tdbb_flags &= ~TDBB_wait_cancel_disable;
if (att)
{
att->att_wait_lock = lock;
}
}
}
@ -265,9 +273,7 @@ public:
{
Jrd::Attachment* att = m_tdbb->getAttachment();
if (att)
{
att->att_wait_lock = m_save_lock;
}
if (m_cancel_disabled)
m_tdbb->tdbb_flags |= TDBB_wait_cancel_disable;
@ -541,6 +547,7 @@ static SLONG get_owner_handle(thread_db* tdbb, enum lck_t lock_type)
case LCK_cancel:
case LCK_monitor:
case LCK_btr_dont_gc:
case LCK_rel_gc:
handle = *LCK_OWNER_HANDLE_ATT(tdbb);
break;

View File

@ -65,6 +65,7 @@ enum lck_t {
LCK_btr_dont_gc, // Prevent removal of b-tree page from index
LCK_shared_counter, // Database-wide shared counter
LCK_tra_pc, // Precommitted transaction lock
LCK_rel_gc, // Allow garbage collection for relation
LCK_fun_exist, // Function existence lock
LCK_rel_rescan, // Relation forced rescan lock
LCK_crypt, // Crypt lock for single crypt thread

View File

@ -46,4 +46,53 @@ void LCK_release(Jrd::thread_db*, Jrd::Lock*);
void LCK_re_post(Jrd::thread_db*, Jrd::Lock*);
void LCK_write_data(Jrd::thread_db*, Jrd::Lock*, SLONG);
class AutoLock
{
public:
explicit AutoLock(Jrd::thread_db* tdbb, Jrd::Lock* lck = NULL) :
m_tdbb(tdbb),
m_lock(lck)
{
}
~AutoLock()
{
release();
}
void release()
{
if (m_lock)
{
if (m_lock->lck_id)
LCK_release(m_tdbb, m_lock);
delete m_lock;
m_lock = NULL;
}
}
Jrd::Lock* operator-> ()
{
return m_lock;
}
operator Jrd::Lock* ()
{
return m_lock;
}
Jrd::Lock* operator= (Jrd::Lock* lck)
{
release();
m_lock = lck;
return m_lock;
}
private:
Jrd::thread_db* m_tdbb;
Jrd::Lock* m_lock;
};
#endif // JRD_LCK_PROTO_H

View File

@ -100,8 +100,15 @@ Lock* RLCK_reserve_relation(thread_db* tdbb, jrd_tra* transaction, jrd_rel* rela
result = LCK_convert(tdbb, lock, level, transaction->getLockWait());
else
result = LCK_lock(tdbb, lock, level, transaction->getLockWait());
if (!result)
{
string err;
err.printf("Acquire lock for relation (%s) failed", relation->rel_name.c_str());
ERR_append_status(tdbb->tdbb_status_vector, Arg::Gds(isc_random) << Arg::Str(err));
ERR_punt();
}
return lock;
}
@ -133,10 +140,8 @@ Lock* RLCK_transaction_relation_lock(thread_db* tdbb, jrd_tra* transaction, jrd_
vector = transaction->tra_relation_locks =
vec<Lock*>::newVector(*transaction->tra_pool, transaction->tra_relation_locks, relId + 1);
lock = jrd_rel::createLock(tdbb, transaction->tra_pool, relation, LCK_relation, true);
const USHORT relLockLen = relation->getRelLockKeyLength();
lock = FB_NEW_RPT(*transaction->tra_pool, relLockLen) Lock(tdbb, relLockLen, LCK_relation);
relation->getRelLockKey(tdbb, &lock->lck_key.lck_string[0]);
// enter all relation locks into the intra-process lock manager and treat
// them as compatible within the attachment according to IPLM rules
lock->lck_compatible = tdbb->getAttachment();

View File

@ -76,6 +76,7 @@
#include "../utilities/gstat/dbaswi.h"
#include "../utilities/nbackup/nbkswi.h"
#include "../jrd/trace/traceswi.h"
#include "../jrd/val_proto.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
@ -1970,7 +1971,8 @@ void Service::start(USHORT spb_length, const UCHAR* spb_data)
svc_id == isc_action_svc_display_user ||
svc_id == isc_action_svc_display_user_adm ||
svc_id == isc_action_svc_set_mapping ||
svc_id == isc_action_svc_drop_mapping;
svc_id == isc_action_svc_drop_mapping ||
svc_id == isc_action_svc_validate;
if (flNeedUser)
{
@ -2496,6 +2498,8 @@ bool Service::process_switches(ClumpletReader& spb, string& switches)
string nbk_database, nbk_file;
int nbk_level = -1;
bool val_database = false;
bool found = false;
do
@ -2868,6 +2872,31 @@ bool Service::process_switches(ClumpletReader& spb, string& switches)
}
break;
case isc_action_svc_validate:
if (!get_action_svc_parameter(spb.getClumpTag(), val_option_in_sw_table, switches)) {
return false;
}
switch (spb.getClumpTag())
{
case isc_spb_dbname:
if (val_database) {
(Arg::Gds(isc_unexp_spb_form) << Arg::Str("only one isc_spb_dbname")).raise();
}
val_database = true;
// fall thru
case isc_spb_val_tab_incl:
case isc_spb_val_tab_excl:
case isc_spb_val_idx_incl:
case isc_spb_val_idx_excl:
get_action_svc_string(spb, switches);
break;
case isc_spb_val_lock_timeout:
get_action_svc_data(spb, switches);
break;
}
break;
default:
return false;
}
@ -2918,6 +2947,13 @@ bool Service::process_switches(ClumpletReader& spb, string& switches)
switches += nbk_database;
switches += nbk_file;
break;
case isc_action_svc_validate:
if (!val_database)
{
(Arg::Gds(isc_missing_required_spb) << Arg::Str("isc_spb_dbname")).raise();
}
break;
}
switches.rtrim();
@ -2962,7 +2998,7 @@ void Service::get_action_svc_string(const ClumpletReader& spb, string& switches)
void Service::get_action_svc_data(const ClumpletReader& spb, string& switches)
{
string s;
s.printf("%"ULONGFORMAT" ", spb.getInt());
s.printf("%"SLONGFORMAT" ", spb.getInt());
switches += s;
}

View File

@ -31,6 +31,7 @@
#include "gen/iberror.h"
#include "../jrd/svc.h"
#include "../jrd/trace/TraceService.h"
#include "../jrd/val_proto.h"
// Service Functions
#include "../burp/burp_proto.h"
@ -119,6 +120,7 @@ const serv_entry services[] =
{ isc_action_svc_drop_mapping, "Drop Domain Admins Mapping to RDB$ADMIN", NULL, GSEC_main },
{ isc_action_svc_display_user_adm, "Display User with Admin Info", NULL, GSEC_main },
#endif
{ isc_action_svc_validate, "Validate Database", NULL, VAL_service},
// actions with no names are undocumented
{ isc_action_svc_set_config, NULL, NULL, TEST_THREAD },
{ isc_action_svc_default_config, NULL, NULL, TEST_THREAD },

View File

@ -25,6 +25,27 @@
#define JRD_VAL_PROTO_H
bool VAL_validate(Jrd::thread_db*, USHORT);
int VAL_service(Firebird::UtilSvc*);
const int IN_SW_VAL_TAB_INCL = 1;
const int IN_SW_VAL_TAB_EXCL = 2;
const int IN_SW_VAL_IDX_INCL = 3;
const int IN_SW_VAL_IDX_EXCL = 4;
const int IN_SW_VAL_LOCK_TIMEOUT = 5;
const int IN_SW_VAL_DATABASE = 6;
static const Switches::in_sw_tab_t val_option_in_sw_table[] =
{
{IN_SW_VAL_TAB_INCL, isc_spb_val_tab_incl, "TAB_INCLUDE", 0, 0, 0, false, 0, 5, NULL},
{IN_SW_VAL_TAB_EXCL, isc_spb_val_tab_excl, "TAB_EXCLUDE", 0, 0, 0, false, 0, 5, NULL},
{IN_SW_VAL_IDX_INCL, isc_spb_val_idx_incl, "IDX_INCLUDE", 0, 0, 0, false, 0, 5, NULL},
{IN_SW_VAL_IDX_EXCL, isc_spb_val_idx_excl, "IDX_EXCLUDE", 0, 0, 0, false, 0, 5, NULL},
{IN_SW_VAL_LOCK_TIMEOUT, isc_spb_val_lock_timeout, "WAIT", 0, 0, 0, false, 0, 1, NULL},
{IN_SW_VAL_DATABASE, isc_spb_dbname, "DATABASE", 0, 0, 0, false, 0, 1, NULL},
{0, 0, NULL, 0, 0, 0, false, 0, 0, NULL} // End of List
};
#endif // JRD_VAL_PROTO_H

View File

@ -552,6 +552,7 @@ VI. ADDITIONAL NOTES
#include "../jrd/cch.h"
#include "../jrd/rse.h"
#include "../jrd/tra.h"
#include "../jrd/svc.h"
#include "../jrd/btr_proto.h"
#include "../jrd/cch_proto.h"
#include "../jrd/dpm_proto.h"
@ -564,6 +565,11 @@ VI. ADDITIONAL NOTES
#include "../jrd/val_proto.h"
#include "../jrd/validation.h"
#include "../common/classes/ClumpletWriter.h"
#include "../jrd/intl_proto.h"
#include "../jrd/lck_proto.h"
#include "../jrd/Collation.h"
#ifdef DEBUG_VAL_VERBOSE
#include "../jrd/dmp_proto.h"
/* Control variable for verbose output during debug of
@ -576,6 +582,7 @@ static USHORT VAL_debug_level = 0;
using namespace Jrd;
using namespace Ods;
using namespace Firebird;
#ifdef DEBUG_VAL_VERBOSE
@ -583,6 +590,30 @@ static void print_rhd(USHORT, const rhd*);
#endif
static PatternMatcher* createPatternMatcher(thread_db* tdbb, const char* pattern)
{
PatternMatcher* matcher = NULL;
try
{
if (pattern)
{
const int len = strlen(pattern);
Collation* obj = INTL_texttype_lookup(tdbb, CS_UTF8);
matcher = obj->createSimilarToMatcher(*tdbb->getDefaultPool(),
(const UCHAR*)pattern, len, (UCHAR*)"\\", 1);
}
}
catch (const Exception& ex)
{
Arg::StatusVector status(ex);
status << Arg::Gds(isc_random) << Arg::Str(pattern);
status.raise();
}
return matcher;
}
static void explain_pp_bits(const UCHAR bits, Firebird::string& names)
{
if (bits & ppg_dp_full)
@ -635,9 +666,131 @@ bool VAL_validate(thread_db* tdbb, USHORT switches)
Attachment* att = tdbb->getAttachment();
if (!att->att_validation)
att->att_validation = FB_NEW (*att->att_pool) Validation();
att->att_validation = FB_NEW (*att->att_pool) Validation(tdbb);
return att->att_validation->run(tdbb, switches);
USHORT flags = 0;
if (switches & isc_dpb_records)
flags |= Validation::VDR_records;
if (switches & isc_dpb_repair)
flags |= Validation::VDR_repair;
if (!(switches & isc_dpb_no_update))
flags |= Validation::VDR_update;
return att->att_validation->run(tdbb, flags);
}
static int validate(Firebird::UtilSvc* svc)
{
PathName dbName;
string userName;
const Switches valSwitches(val_option_in_sw_table, FB_NELEM(val_option_in_sw_table), false, true);
const char** argv = svc->argv.begin();
const char* const* end = svc->argv.end();
for (++argv; argv < end; argv++)
{
if (!*argv)
continue;
const Switches::in_sw_tab_t* sw = valSwitches.findSwitch(*argv);
if (!sw)
continue;
switch (sw->in_sw)
{
case IN_SW_VAL_DATABASE:
*argv = NULL;
argv++;
if (argv < end && *argv)
dbName = *argv;
else
;// error
break;
default:
break;
}
}
ClumpletWriter dpb(ClumpletReader::Tagged, MAX_DPB_SIZE, isc_dpb_version1);
if (!userName.isEmpty())
{
dpb.insertString(isc_dpb_trusted_auth, userName);
}
FbLocalStatus status;
RefPtr<JProvider> jProv(JProvider::getInstance());
RefPtr<JAttachment> jAtt;
jAtt.assignRefNoIncr(jProv->attachDatabase(&status, dbName.c_str(), dpb.getBufferLength(), dpb.getBuffer()));
if (status->getState() & IStatus::STATE_ERRORS)
{
svc->setServiceStatus(status->getErrors());
return FB_FAILURE;
}
Attachment* att = jAtt->getHandle();
Database* dbb = att->att_database;
svc->started();
MemoryPool* val_pool = NULL;
int ret_code = FB_SUCCESS;
try
{
// should be EngineContextHolder but it is declared in jrd.cpp
BackgroundContextHolder tdbb(dbb, att, &status, FB_FUNCTION);
att->att_use_count++;
tdbb->tdbb_flags |= TDBB_sweeper;
val_pool = dbb->createPool();
Jrd::ContextPoolHolder context(tdbb, val_pool);
Validation control(tdbb, svc);
control.run(tdbb, Validation::VDR_records | Validation::VDR_online | Validation::VDR_partial);
att->att_use_count--;
}
catch (const Exception& ex)
{
att->att_use_count--;
ex.stuffException(&status);
svc->setServiceStatus(status->getErrors());
ret_code = FB_FAILURE;
}
dbb->deletePool(val_pool);
jAtt->detach(&status);
return ret_code;
}
int VAL_service(Firebird::UtilSvc* svc)
{
svc->initStatus();
int exit_code = FB_SUCCESS;
try
{
exit_code = validate(svc);
}
catch (const Exception& ex)
{
FbLocalStatus status;
ex.stuffException(&status);
svc->setServiceStatus(status->getErrors());
exit_code = FB_FAILURE;
}
svc->started();
return (THREAD_ENTRY_RETURN)(IPTR)exit_code;
}
@ -683,9 +836,9 @@ const Validation::MSG_ENTRY Validation::vdr_msg_table[VAL_MAX_ERROR] =
{false, fb_info_ppage_warns, "Pointer page %"ULONGFORMAT" {sequence %"ULONGFORMAT"} bits {0x%02X %s} are not consistent with data page %"ULONGFORMAT" {sequence %"ULONGFORMAT"} state {0x%02X %s}"}
};
Validation::Validation()
Validation::Validation(thread_db* tdbb, UtilSvc* uSvc)
{
vdr_tdbb = NULL;
vdr_tdbb = tdbb;
vdr_max_page = 0;
vdr_flags = 0;
vdr_errors = 0;
@ -697,10 +850,142 @@ Validation::Validation()
vdr_rel_records = NULL;
vdr_idx_records = NULL;
vdr_page_bitmap = NULL;
vdr_service = uSvc;
vdr_tab_incl = vdr_tab_excl = NULL;
vdr_idx_incl = vdr_idx_excl = NULL;
vdr_lock_tout = -10;
if (uSvc) {
parse_args(tdbb);
}
output("Validation started\n\n");
}
Validation::~Validation()
{
delete vdr_tab_incl;
delete vdr_tab_excl;
delete vdr_idx_incl;
delete vdr_idx_excl;
output("Validation finished\n");
}
void Validation::parse_args(thread_db* tdbb)
{
Switches local_sw_table(val_option_in_sw_table, FB_NELEM(val_option_in_sw_table), true, true);
const char** argv = vdr_service->argv.begin();
const char* const* end = vdr_service->argv.end();
for (++argv; argv < end; argv++)
{
if (!*argv)
continue;
string arg(*argv);
Switches::in_sw_tab_t* sw = local_sw_table.findSwitchMod(arg);
if (!sw)
continue;
if (sw->in_sw_state)
{
string s;
s.printf("Switch %s specified more than once", sw->in_sw_name);
(Arg::Gds(isc_random) << Arg::Str(s)).raise();
}
sw->in_sw_state = true;
switch (sw->in_sw)
{
case IN_SW_VAL_TAB_INCL:
case IN_SW_VAL_TAB_EXCL:
case IN_SW_VAL_IDX_INCL:
case IN_SW_VAL_IDX_EXCL:
case IN_SW_VAL_LOCK_TIMEOUT:
*argv++ = NULL;
if (argv >= end || !(*argv))
{
string s;
s.printf("Switch %s requires value", sw->in_sw_name);
(Arg::Gds(isc_random) << Arg::Str(s)).raise();
}
break;
default:
break;
}
switch (sw->in_sw)
{
case IN_SW_VAL_TAB_INCL:
vdr_tab_incl = createPatternMatcher(tdbb, *argv);
break;
case IN_SW_VAL_TAB_EXCL:
vdr_tab_excl = createPatternMatcher(tdbb, *argv);
break;
case IN_SW_VAL_IDX_INCL:
vdr_idx_incl = createPatternMatcher(tdbb, *argv);
break;
case IN_SW_VAL_IDX_EXCL:
vdr_idx_excl = createPatternMatcher(tdbb, *argv);
break;
case IN_SW_VAL_LOCK_TIMEOUT:
{
char* end = (char*) *argv;
vdr_lock_tout = -strtol(*argv, &end, 10);
if (end && *end)
{
string s;
s.printf("Value (%s) is not a valid number", *argv);
(Arg::Gds(isc_random) << Arg::Str(s)).raise();
}
}
break;
default:
break;
}
}
}
bool Validation::run(thread_db* tdbb, USHORT switches)
void Validation::output(const char* format, ...)
{
if (!vdr_service)
return;
va_list params;
va_start(params, format);
string s;
tm now;
int ms;
TimeStamp::getCurrentTimeStamp().decode(&now, &ms);
// s.printf("%04d-%02d-%02d %02d:%02d:%02d.%04d ",
s.printf("%02d:%02d:%02d.%02d ",
// now.tm_year + 1900, now.tm_mon + 1, now.tm_mday,
now.tm_hour, now.tm_min, now.tm_sec, ms / 100);
vdr_service->outputVerbose(s.c_str());
s.vprintf(format, params);
va_end(params);
vdr_service->outputVerbose(s.c_str());
}
bool Validation::run(thread_db* tdbb, USHORT flags)
{
/**************************************
*
@ -722,15 +1007,7 @@ bool Validation::run(thread_db* tdbb, USHORT switches)
val_pool = dbb->createPool();
Jrd::ContextPoolHolder context(tdbb, val_pool);
vdr_flags = 0;
if (switches & isc_dpb_records)
vdr_flags |= VDR_records;
if (switches & isc_dpb_repair)
vdr_flags |= VDR_repair;
if (!(switches & isc_dpb_no_update))
vdr_flags |= VDR_update;
vdr_flags = flags;
// initialize validate errors
vdr_errors = vdr_warns = vdr_fixed = 0;
@ -745,7 +1022,9 @@ bool Validation::run(thread_db* tdbb, USHORT switches)
if (vdr_errors || vdr_warns)
vdr_flags &= ~VDR_update;
garbage_collect();
if (!(vdr_flags & VDR_online) && !(vdr_flags & VDR_partial)) {
garbage_collect();
}
CCH_flush(tdbb, FLUSH_FINI, 0);
tdbb->tdbb_flags &= ~TDBB_sweeper;
@ -819,7 +1098,7 @@ Validation::RTN Validation::corrupt(int err_code, const jrd_rel* relation, ...)
const TEXT* err_string = err_code < VAL_MAX_ERROR ? vdr_msg_table[err_code].msg: "Unknown error code";
Firebird::string s;
string s;
va_list ptr;
const char* fn = att->att_filename.c_str();
@ -858,11 +1137,14 @@ Validation::RTN Validation::corrupt(int err_code, const jrd_rel* relation, ...)
else
gds__log("Database: %s\n\t%s", fn, s.c_str());
s.append("\n");
output(s.c_str());
return rtn_corrupt;
}
Validation::FETCH_CODE Validation::fetch_page(bool mark, ULONG page_number,
USHORT type, WIN* window, void* apage_pointer)
USHORT type, WIN* window, void* aPage_pointer)
{
/**************************************
*
@ -878,12 +1160,22 @@ Validation::FETCH_CODE Validation::fetch_page(bool mark, ULONG page_number,
Database* dbb = vdr_tdbb->getDatabase();
if (--vdr_tdbb->tdbb_quantum < 0)
{
JRD_reschedule(vdr_tdbb, 0, true);
if (vdr_service && vdr_service->finished())
{
CCH_unwind(vdr_tdbb, false);
Arg::Gds(isc_att_shutdown).raise();
}
}
window->win_page = page_number;
window->win_flags = 0;
pag** page_pointer = reinterpret_cast<pag**>(apage_pointer);
*page_pointer = CCH_FETCH_NO_SHADOW(vdr_tdbb, window, LCK_write, 0);
pag** page_pointer = reinterpret_cast<pag**>(aPage_pointer);
*page_pointer = CCH_FETCH_NO_SHADOW(vdr_tdbb, window,
(vdr_flags & VDR_online ? LCK_read : LCK_write),
0);
if ((*page_pointer)->pag_type != type && type != pag_undefined)
{
@ -1125,6 +1417,7 @@ Validation::RTN Validation::walk_blob(jrd_rel* relation, const blh* header, USHO
// Level 1 blobs are a little more complicated
WIN window1(DB_PAGE_SPACE, -1), window2(DB_PAGE_SPACE, -1);
window1.win_flags = window2.win_flags = WIN_garbage_collector;
const ULONG* pages1 = header->blh_page;
const ULONG* const end1 = pages1 + ((USHORT) (length - BLH_SIZE) >> SHIFTLONG);
@ -1192,6 +1485,7 @@ Validation::RTN Validation::walk_chain(jrd_rel* relation, const rhd* header,
ULONG page_number = header->rhd_b_page;
USHORT line_number = header->rhd_b_line;
WIN window(DB_PAGE_SPACE, -1);
window.win_flags = WIN_garbage_collector;
while (page_number)
{
@ -1250,25 +1544,72 @@ void Validation::walk_database()
fetch_page(true, HEADER_PAGE, pag_header, &window, &page);
vdr_max_transaction = page->hdr_next_transaction;
walk_header(page->hdr_next_page);
walk_pip();
walk_scns();
walk_tip(page->hdr_next_transaction);
walk_generators();
if (vdr_flags & VDR_online) {
CCH_RELEASE(vdr_tdbb, &window);
}
if (!(vdr_flags & VDR_partial))
{
walk_header(page->hdr_next_page);
walk_pip();
walk_scns();
walk_tip(page->hdr_next_transaction);
walk_generators();
}
vec<jrd_rel*>* vector;
for (USHORT i = 0; (vector = attachment->att_relations) && i < vector->count(); i++)
{
#ifdef DEBUG_VAL_VERBOSE
if (i >= 32 /* rel_MAX */ ) // Why not system flag instead?
if (i > dbb->dbb_max_sys_rel) // Why not system flag instead?
VAL_debug_level = 2;
#endif
jrd_rel* relation = (*vector)[i];
if (relation && relation->rel_flags & REL_check_existence)
relation = MET_lookup_relation_id(vdr_tdbb, i, false);
if (relation)
{
// Can't validate system relations online as they could be modified
// by system transaction which not acquires relation locks
if ((vdr_flags & VDR_online) && relation->isSystem())
continue;
if (vdr_tab_incl)
{
vdr_tab_incl->reset();
if (!vdr_tab_incl->process((UCHAR*)relation->rel_name.c_str(), relation->rel_name.length()) ||
!vdr_tab_incl->result())
continue;
}
if (vdr_tab_excl)
{
vdr_tab_excl->reset();
if (!vdr_tab_excl->process((UCHAR*)relation->rel_name.c_str(), relation->rel_name.length()) ||
vdr_tab_excl->result())
continue;
}
string relName;
relName.printf("Relation %d (%s)", relation->rel_id, relation->rel_name.c_str());
output("%s\n", relName.c_str());
int errs = vdr_errors;
walk_relation(relation);
errs = vdr_errors - errs;
if (!errs)
output("%s is ok\n\n", relName.c_str());
else
output("%s : %d ERRORS found\n\n", relName.c_str(), errs);
}
}
CCH_RELEASE(vdr_tdbb, &window);
if (!(vdr_flags & VDR_online)) {
CCH_RELEASE(vdr_tdbb, &window);
}
}
Validation::RTN Validation::walk_data_page(jrd_rel* relation, ULONG page_number,
@ -1287,6 +1628,8 @@ Validation::RTN Validation::walk_data_page(jrd_rel* relation, ULONG page_number,
Database* dbb = vdr_tdbb->getDatabase();
WIN window(DB_PAGE_SPACE, -1);
window.win_flags = WIN_garbage_collector;
data_page* page = 0;
fetch_page(true, page_number, pag_data, &window, &page);
@ -1544,6 +1887,8 @@ Validation::RTN Validation::walk_index(jrd_rel* relation, index_root_page& root_
while (next)
{
WIN window(DB_PAGE_SPACE, -1);
window.win_flags = WIN_garbage_collector;
btree_page* page = 0;
fetch_page(true, next, pag_index, &window, &page);
@ -1612,6 +1957,14 @@ Validation::RTN Validation::walk_index(jrd_rel* relation, index_root_page& root_
break;
}
if (node.prefix > key.key_length)
{
corrupt(VAL_INDEX_PAGE_CORRUPT, relation,
id + 1, next, page->btr_level, node.nodePointer - (UCHAR*) page, __FILE__, __LINE__);
CCH_RELEASE(vdr_tdbb, &window);
return rtn_corrupt;
}
const UCHAR* p;
const UCHAR* q;
USHORT l; // temporary variable for length
@ -1731,10 +2084,12 @@ Validation::RTN Validation::walk_index(jrd_rel* relation, index_root_page& root_
const ULONG down_number = node.pageNumber;
const RecordNumber down_record_number = node.recordNumber;
// Note: validate == false for the fetch_page() call here
// Note: mark == false for the fetch_page() call here
// as we don't want to mark the page as visited yet - we'll
// mark it when we visit it for real later on
WIN down_window(DB_PAGE_SPACE, -1);
down_window.win_flags = WIN_garbage_collector;
btree_page* down_page = 0;
fetch_page(false, down_number, pag_index, &down_window, &down_page);
const bool downLeafPage = (down_page->btr_level == 0);
@ -2014,6 +2369,8 @@ Validation::RTN Validation::walk_pointer_page(jrd_rel* relation, ULONG sequence)
pointer_page* page = 0;
WIN window(DB_PAGE_SPACE, -1);
window.win_flags = WIN_garbage_collector;
fetch_page(true, (*vector)[sequence], pag_pointer, &window, &page);
#ifdef DEBUG_VAL_VERBOSE
@ -2100,6 +2457,33 @@ Validation::RTN Validation::walk_pointer_page(jrd_rel* relation, ULONG sequence)
(page->ppg_next && page->ppg_next != (*vector)[sequence]))
{
CCH_RELEASE(vdr_tdbb, &window);
if (vdr_flags & VDR_online)
{
// relation could be extended before we acquired its lock in PR mode
// let's re-read pointer pages and check again
DPM_scan_pages(vdr_tdbb);
vector = relation->getBasePages()->rel_pages;
--sequence;
if (!vector || sequence >= static_cast<int>(vector->count())) {
return corrupt(VAL_P_PAGE_LOST, relation, sequence);
}
fetch_page(false, (*vector)[sequence], pag_pointer, &window, &page);
++sequence;
const bool error = sequence >= static_cast<int>(vector->count()) ||
(page->ppg_next && page->ppg_next != (*vector)[sequence]);
CCH_RELEASE(vdr_tdbb, &window);
if (!error)
return rtn_ok;
}
return corrupt(VAL_P_PAGE_INCONSISTENT, relation, page->ppg_next, sequence);
}
@ -2204,6 +2588,8 @@ Validation::RTN Validation::walk_record(jrd_rel* relation, const rhd* header, US
while (flags & rhd_incomplete)
{
WIN window(DB_PAGE_SPACE, -1);
window.win_flags = WIN_garbage_collector;
fetch_page(true, page_number, pag_data, &window, &page);
const data_page::dpg_repeat* line = &page->dpg_rpt[line_number];
if (page->dpg_relation != relation->rel_id ||
@ -2302,6 +2688,31 @@ Validation::RTN Validation::walk_relation(jrd_rel* relation)
return rtn_ok;
}
AutoLock lckRead(vdr_tdbb);
jrd_rel::GCExclusive lckGC(vdr_tdbb, relation);
if (vdr_flags & VDR_online)
{
lckRead = jrd_rel::createLock(vdr_tdbb, NULL, relation, LCK_relation, false);
if (!LCK_lock(vdr_tdbb, lckRead, LCK_PR, vdr_lock_tout))
{
output("Acquire relation lock failed\n");
vdr_errors++;
return rtn_ok;
}
if (!lckGC.acquire(vdr_lock_tout))
{
output("Acquire garbage collection lock failed\n");
vdr_errors++;
return rtn_ok;
}
WIN window(DB_PAGE_SPACE, -1);
header_page* page = 0;
fetch_page(false, (SLONG)HEADER_PAGE, pag_header, &window, &page);
vdr_max_transaction = page->hdr_next_transaction;
CCH_RELEASE(vdr_tdbb, &window);
}
// Walk pointer and selected data pages associated with relation
@ -2311,6 +2722,11 @@ Validation::RTN Validation::walk_relation(jrd_rel* relation)
for (ULONG sequence = 0; true; sequence++)
{
const vcl* vector = relation->getBasePages()->rel_pages;
const int ppCnt = vector ? vector->count() : 0;
output(" process pointer page %4d of %4d\n", sequence, ppCnt);
const RTN result = walk_pointer_page(relation, sequence);
if (result == rtn_eof)
break;
@ -2321,6 +2737,8 @@ Validation::RTN Validation::walk_relation(jrd_rel* relation)
// Walk indices for the relation
walk_root(relation);
lckGC.release();
// See if the counts of backversions match
if ((vdr_flags & VDR_records) &&
(vdr_rel_backversion_counter != vdr_rel_chain_counter))
@ -2332,10 +2750,13 @@ Validation::RTN Validation::walk_relation(jrd_rel* relation)
} // try
catch (const Firebird::Exception&)
{
const char* msg = relation->rel_name.length() > 0 ?
"bugcheck during scan of table %d (%s)" :
"bugcheck during scan of table %d";
gds__log(msg, relation->rel_id, relation->rel_name.c_str());
if (!(vdr_flags & VDR_online))
{
const char* msg = relation->rel_name.length() > 0 ?
"bugcheck during scan of table %d (%s)" :
"bugcheck during scan of table %d";
gds__log(msg, relation->rel_id, relation->rel_name.c_str());
}
#ifdef DEBUG_VAL_VERBOSE
if (VAL_debug_level)
{
@ -2375,7 +2796,33 @@ Validation::RTN Validation::walk_root(jrd_rel* relation)
fetch_page(true, relPages->rel_index_root, pag_root, &window, &page);
for (USHORT i = 0; i < page->irt_count; i++)
{
if (page->irt_rpt[i].irt_root == 0)
continue;
MetaName index;
CCH_RELEASE(vdr_tdbb, &window);
MET_lookup_index(vdr_tdbb, index, relation->rel_name, i + 1);
fetch_page(false, relPages->rel_index_root, pag_root, &window, &page);
if (vdr_idx_incl)
{
vdr_idx_incl->reset();
if (!vdr_idx_incl->process((UCHAR*)index.c_str(), index.length()) || !vdr_idx_incl->result())
continue;
}
if (vdr_idx_excl)
{
vdr_idx_excl->reset();
if (!vdr_idx_excl->process((UCHAR*)index.c_str(), index.length()) || vdr_idx_excl->result())
continue;
}
output("Index %d (%s)\n", i + 1, index.c_str());
walk_index(relation, *page, i);
}
CCH_RELEASE(vdr_tdbb, &window);

View File

@ -32,6 +32,11 @@
#include "../jrd/RecordNumber.h"
namespace Firebird
{
class UtilSvc;
}
namespace Jrd
{
@ -45,6 +50,15 @@ class thread_db;
class Validation
{
public:
// vdr_flags
static const USHORT VDR_online = 0x01; // online validation (no exclusive attachment)
static const USHORT VDR_update = 0x02; // fix simple things
static const USHORT VDR_repair = 0x04; // fix non-simple things (-mend)
static const USHORT VDR_records = 0x08; // Walk all records
static const USHORT VDR_partial = 0x10; // Walk only (some) relations
private:
enum FETCH_CODE
@ -112,12 +126,6 @@ private:
static const MSG_ENTRY vdr_msg_table[VAL_MAX_ERROR];
// vdr_flags
static const USHORT VDR_update = 2; // fix simple things
static const USHORT VDR_repair = 4; // fix non-simple things (-mend)
static const USHORT VDR_records = 8; // Walk all records
thread_db* vdr_tdbb;
ULONG vdr_max_page;
USHORT vdr_flags;
@ -132,11 +140,18 @@ private:
PageBitmap* vdr_page_bitmap;
ULONG vdr_err_counts[VAL_MAX_ERROR];
public:
Validation();
~Validation() {};
Firebird::UtilSvc* vdr_service;
PatternMatcher* vdr_tab_incl;
PatternMatcher* vdr_tab_excl;
PatternMatcher* vdr_idx_incl;
PatternMatcher* vdr_idx_excl;
int vdr_lock_tout;
bool run(thread_db* tdbb, USHORT switches);
public:
Validation(thread_db*, Firebird::UtilSvc* uSvc = NULL);
~Validation();
bool run(thread_db* tdbb, USHORT flags);
ULONG getInfo(UCHAR item);
private:
@ -145,6 +160,9 @@ private:
FETCH_CODE fetch_page(bool validate, ULONG, USHORT, WIN*, void*);
void garbage_collect();
void parse_args(thread_db*);
void output(const char*, ...);
RTN walk_blob(jrd_rel*, const Ods::blh*, USHORT, RecordNumber);
RTN walk_chain(jrd_rel*, const Ods::rhd*, RecordNumber);
RTN walk_data_page(jrd_rel*, ULONG, ULONG, UCHAR&);

View File

@ -171,6 +171,45 @@ static void set_system_flag(thread_db*, Record*, USHORT);
static void update_in_place(thread_db*, jrd_tra*, record_param*, record_param*);
static void verb_post(thread_db*, jrd_tra*, record_param*, Record*, const bool, const bool);
static bool assert_gc_enabled(const jrd_tra* transaction, const jrd_rel* relation)
{
/**************************************
*
* a s s e r t _ g c _ e n a b l e d
*
**************************************
*
* Functional description
* Ensure that calls of purge\expunge\VIO_backout are safe and don't break
* results of online validation run.
*
* Notes
* System and temporary relations are not validated online.
* Non-zero rel_sweep_count is possible only under GCShared control when
* garbage collection is enabled.
*
* VIO_backout is more complex as it could run without GCShared control.
* Therefore we additionally check if we own relation lock in "write" mode -
* in this case online validation is not run against given relation.
*
**************************************/
if (relation->rel_sweep_count || relation->isSystem() || relation->isTemporary())
return true;
if (relation->rel_flags & REL_gc_disabled)
return false;
vec<Lock*>* vector = transaction->tra_relation_locks;
if (!vector || vector->count() < relation->rel_id)
return false;
Lock* lock = (*vector)[relation->rel_id];
if (!lock)
return false;
return (lock->lck_physical == LCK_SW) || (lock->lck_physical == LCK_EX);
}
// Pick up relation ids
#include "../jrd/ini.h"
@ -338,6 +377,8 @@ void VIO_backout(thread_db* tdbb, record_param* rpb, const jrd_tra* transaction)
Database* dbb = tdbb->getDatabase();
CHECK_DBB(dbb);
fb_assert(assert_gc_enabled(transaction, rpb->rpb_relation));
#ifdef VIO_DEBUG
VIO_trace(DEBUG_WRITES,
"VIO_backout (record_param %"QUADFORMAT"d, transaction %"ULONGFORMAT")\n",
@ -818,8 +859,10 @@ bool VIO_chase_record_version(thread_db* tdbb, record_param* rpb,
}
case tra_precommitted:
{// scope
jrd_rel::GCShared gcGuard(tdbb, rpb->rpb_relation);
if ((attachment->att_flags & ATT_NO_CLEANUP) ||
if (attachment->att_flags & ATT_NO_CLEANUP || !gcGuard.gcEnabled() ||
(rpb->rpb_flags & (rpb_chained | rpb_gc_active)))
{
if (rpb->rpb_b_page == 0)
@ -886,7 +929,7 @@ bool VIO_chase_record_version(thread_db* tdbb, record_param* rpb,
if (!DPM_get(tdbb, rpb, LCK_read))
return false;
} // scope
break;
// If it's active, prepare to fetch the old version.
@ -1072,6 +1115,12 @@ bool VIO_chase_record_version(thread_db* tdbb, record_param* rpb,
else
{
CCH_RELEASE(tdbb, &rpb->getWindow(tdbb));
jrd_rel::GCShared gcGuard(tdbb, rpb->rpb_relation);
if (!gcGuard.gcEnabled())
return false;
expunge(tdbb, rpb, transaction, 0);
}
@ -1114,7 +1163,14 @@ bool VIO_chase_record_version(thread_db* tdbb, record_param* rpb,
return true;
}
purge(tdbb, rpb);
{ // scope
jrd_rel::GCShared gcGuard(tdbb, rpb->rpb_relation);
if (!gcGuard.gcEnabled())
return true;
purge(tdbb, rpb);
}
// Go back to be primary record version and chase versions all over again.
if (!DPM_get(tdbb, rpb, LCK_read))
@ -1881,7 +1937,9 @@ bool VIO_garbage_collect(thread_db* tdbb, record_param* rpb, const jrd_tra* tran
rpb->rpb_f_page, rpb->rpb_f_line);
#endif
if (attachment->att_flags & ATT_no_cleanup)
jrd_rel::GCShared gcGuard(tdbb, rpb->rpb_relation);
if (attachment->att_flags & ATT_no_cleanup || !gcGuard.gcEnabled())
return true;
const TraNumber oldest_snapshot = rpb->rpb_relation->isTemporary() ?
@ -2165,7 +2223,14 @@ bool VIO_get_current(thread_db* tdbb,
if (transaction->tra_attachment->att_flags & ATT_no_cleanup)
return !foreign_key;
VIO_backout(tdbb, rpb, transaction);
{
jrd_rel::GCShared gcGuard(tdbb, rpb->rpb_relation);
if (!gcGuard.gcEnabled())
return !foreign_key;
VIO_backout(tdbb, rpb, transaction);
}
continue;
case tra_precommitted:
Attachment::Checkout cout(attachment, FB_FUNCTION);
@ -2267,7 +2332,14 @@ bool VIO_get_current(thread_db* tdbb,
if (transaction->tra_attachment->att_flags & ATT_no_cleanup)
return !foreign_key;
VIO_backout(tdbb, rpb, transaction);
{
jrd_rel::GCShared gcGuard(tdbb, rpb->rpb_relation);
if (!gcGuard.gcEnabled())
return !foreign_key;
VIO_backout(tdbb, rpb, transaction);
}
break;
case tra_limbo:
@ -3505,6 +3577,7 @@ bool VIO_sweep(thread_db* tdbb, jrd_tra* transaction, TraceSweepEvent* traceSwee
vec<jrd_rel*>* vector = 0;
GarbageCollector* gc = dbb->dbb_garbage_collector;
bool ret = true;
try {
@ -3519,10 +3592,16 @@ bool VIO_sweep(thread_db* tdbb, jrd_tra* transaction, TraceSweepEvent* traceSwee
!relation->isTemporary() &&
relation->getPages(tdbb)->rel_pages)
{
jrd_rel::GCShared gcGuard(tdbb, relation);
if (!gcGuard.gcEnabled())
{
ret = false;
break;
}
rpb.rpb_relation = relation;
rpb.rpb_number.setValue(BOF_NUMBER);
rpb.rpb_org_scans = relation->rel_scan_count++;
++relation->rel_sweep_count;
traceSweep->beginSweepRelation(relation);
@ -3545,7 +3624,6 @@ bool VIO_sweep(thread_db* tdbb, jrd_tra* transaction, TraceSweepEvent* traceSwee
traceSweep->endSweepRelation(relation);
--relation->rel_sweep_count;
--relation->rel_scan_count;
}
}
@ -3559,17 +3637,13 @@ bool VIO_sweep(thread_db* tdbb, jrd_tra* transaction, TraceSweepEvent* traceSwee
if (relation)
{
if (relation->rel_sweep_count)
--relation->rel_sweep_count;
if (relation->rel_scan_count)
--relation->rel_scan_count;
}
ERR_punt();
}
return true;
return ret;
}
@ -4424,6 +4498,8 @@ static void expunge(thread_db* tdbb, record_param* rpb, const jrd_tra* transacti
SET_TDBB(tdbb);
Jrd::Attachment* attachment = transaction->tra_attachment;
fb_assert(assert_gc_enabled(transaction, rpb->rpb_relation));
#ifdef VIO_DEBUG
VIO_trace(DEBUG_WRITES,
"expunge (record_param %"QUADFORMAT"d, transaction %"ULONGFORMAT
@ -4706,7 +4782,10 @@ static THREAD_ENTRY_DECLARE garbage_collector(THREAD_ENTRY_PARAM arg)
if (gc_bitmap)
{
++relation->rel_sweep_count;
jrd_rel::GCShared gcGuard(tdbb, relation);
if (!gcGuard.gcEnabled())
continue;
rpb.rpb_relation = relation;
while (gc_bitmap->getFirst())
@ -4715,7 +4794,6 @@ static THREAD_ENTRY_DECLARE garbage_collector(THREAD_ENTRY_PARAM arg)
if (!(dbb->dbb_flags & DBB_garbage_collector))
{
--relation->rel_sweep_count;
gc_exit = true;
break;
}
@ -4758,7 +4836,6 @@ static THREAD_ENTRY_DECLARE garbage_collector(THREAD_ENTRY_PARAM arg)
if (!(dbb->dbb_flags & DBB_garbage_collector))
{
--relation->rel_sweep_count;
gc_exit = true;
break;
}
@ -4769,6 +4846,12 @@ static THREAD_ENTRY_DECLARE garbage_collector(THREAD_ENTRY_PARAM arg)
break;
}
if (relation->rel_flags & REL_gc_disabled)
{
rel_exit = true;
break;
}
if (--tdbb->tdbb_quantum < 0)
JRD_reschedule(tdbb, SWEEP_QUANTUM, true);
@ -4785,7 +4868,6 @@ static THREAD_ENTRY_DECLARE garbage_collector(THREAD_ENTRY_PARAM arg)
delete gc_bitmap;
gc_bitmap = NULL;
--relation->rel_sweep_count;
}
}
@ -4823,10 +4905,6 @@ static THREAD_ENTRY_DECLARE garbage_collector(THREAD_ENTRY_PARAM arg)
{
ex.stuffException(&status_vector);
iscDbLogStatus(dbb->dbb_filename.c_str(), &status_vector);
if (relation && relation->rel_sweep_count)
--relation->rel_sweep_count;
// continue execution to clean up
}
@ -5671,6 +5749,8 @@ static void purge(thread_db* tdbb, record_param* rpb)
Database* dbb = tdbb->getDatabase();
CHECK_DBB(dbb);
fb_assert(assert_gc_enabled(tdbb->getTransaction(), rpb->rpb_relation));
#ifdef VIO_DEBUG
VIO_trace(DEBUG_TRACE_ALL,
"purge (record_param %"QUADFORMAT"d)\n", rpb->rpb_number.getValue());

View File

@ -250,7 +250,14 @@ bool putNumericArgument(char**& av, ClumpletWriter& spb, unsigned int tag)
if (! *av)
return false;
int n = atoi(*av++);
char* err = NULL;
SLONG n = strtol(*av++, &err, 10);
if (err && *err)
{
(Arg::Gds(isc_fbsvcmgr_bad_arg) << av[-2]).raise();
}
spb.insertInt(tag, n);
return true;
@ -499,6 +506,17 @@ const SvcSwitches traceChgStateOptions[] =
{0, 0, 0, 0, 0}
};
const SvcSwitches validateOptions[] =
{
{"dbname", putStringArgument, 0, isc_spb_dbname, 0},
{"val_tab_incl", putStringArgument, 0, isc_spb_val_tab_incl, 0},
{"val_tab_excl", putStringArgument, 0, isc_spb_val_tab_excl, 0},
{"val_idx_incl", putStringArgument, 0, isc_spb_val_idx_incl, 0},
{"val_idx_excl", putStringArgument, 0, isc_spb_val_idx_excl, 0},
{"val_lock_timeout", putNumericArgument, 0, isc_spb_val_lock_timeout, 0},
{0, 0, 0, 0, 0}
};
const SvcSwitches actionSwitch[] =
{
{"action_backup", putSingleTag, backupOptions, isc_action_svc_backup, isc_info_svc_to_eof},
@ -522,6 +540,7 @@ const SvcSwitches actionSwitch[] =
{"action_trace_list", putSingleTag, 0, isc_action_svc_trace_list, isc_info_svc_line},
{"action_set_mapping", putSingleTag, mappingOptions, isc_action_svc_set_mapping, 0},
{"action_drop_mapping", putSingleTag, mappingOptions, isc_action_svc_drop_mapping, 0},
{"action_validate", putSingleTag, validateOptions, isc_action_svc_validate, isc_info_svc_line},
{0, 0, 0, 0, 0}
};