8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 18:43:02 +01:00

Renamed replication 'log' to 'journal'

This commit is contained in:
Dmitry Yemanov 2021-05-04 20:00:48 +03:00
parent edf327eee0
commit 340bd02363
12 changed files with 191 additions and 190 deletions

View File

@ -31,7 +31,7 @@ database
# Size of the local buffer used to accumulate changes that can be
# deferred until the transaction commit/rollback. The bigger this value
# the less disk access concurrency (related to log IOPS) happens.
# the less disk access concurrency (related to journal IOPS) happens.
#
# For synchronous replication, it also affects number of network round-trips
# between primary and replica hosts.
@ -40,19 +40,19 @@ database
#
# buffer_size = 1048576 # 1MB
# Directory to store replication log files.
# Directory to store replication journal files.
#
# log_directory =
# journal_directory =
# Prefix for replication log file names. It will be automatically suffixed
# Prefix for replication journal file names. It will be automatically suffixed
# with an ordinal sequential number. If not specified, database filename
# (without path) is used as a prefix.
#
# log_file_prefix =
# journal_file_prefix =
# Maximum allowed size for a single replication segment.
#
# log_segment_size = 16777216 # 16MB
# journal_segment_size = 16777216 # 16MB
# Maximum allowed number of full replication segments. Once this limit is reached,
# the replication process is temporarily delayed to allow the archiving to catch up.
@ -61,22 +61,22 @@ database
#
# Zero means an unlimited number of segments pending archiving.
#
# log_segment_count = 8
# journal_segment_count = 8
# Delay, in milliseconds, to wait before the changes are synchronously flushed
# to the log (usually at commit time). This allows multiple concurrently committing
# to the journal (usually at commit time). This allows multiple concurrently committing
# transactions to amortise I/O costs by sharing a single flush operation.
#
# Zero means no delay, i.e. "group flushing" is disabled.
#
# log_group_flush_delay = 0
# journal_group_flush_delay = 0
# Directory for the archived log files.
# Directory for the archived journal files.
#
# Directory to store archived replication segments.
# It also defines the $(archpathname) substitution macro (see below).
#
# log_archive_directory =
# journal_archive_directory =
# Program (complete command line with arguments) that is executed when some
# replication segment gets full and needs archiving.
@ -85,27 +85,27 @@ database
# In particular, it MUST return non-zero if the target archive already exists.
#
# Special predefined macros are available:
# $(logfilename) - file name (without path) of the log segment being archived
# $(logpathname) - full path name of the log segment being archived
# same as log_directory + $(logfilename)
# $(archpathname) - suggested full path name for the archived segment
# same as log_archive_directory + $(logfilename)
# $(filename) - file name (without path) of the journal segment being archived
# $(pathname) - full path name of the journal segment being archived
# same as journal_directory + $(filename)
# $(archivepathname) - suggested full path name for the archived segment
# same as journal_archive_directory + $(filename)
#
# Simplest configuration is to use standard OS commands for archiving, e.g.:
#
# Linux: "test ! -f $(archpathname) && cp $(logpathname) $(archpathname)"
# Linux: "test ! -f $(archivepathname) && cp $(pathname) $(archivepathname)"
# or
# Windows: "copy $(logpathname) $(archpathname)"
# Windows: "copy $(pathname) $(archivepathname)"
#
# log_archive_command =
# journal_archive_command =
# Timeout, in seconds, to wait until incomplete segment is scheduled for archiving.
# It allows to minimize the replication gap if the database is modified rarely.
#
# Zero means no intermediate archiving, i.e. segments are archived only after
# reaching their maximum size (defined by log_segment_size).
# reaching their maximum size (defined by journal_segment_size).
#
# log_archive_timeout = 60
# journal_archive_timeout = 60
# Connection string to the replica database (used for synchronous replication only).
# Expected format:
@ -126,12 +126,13 @@ database
### REPLICA SIDE SETTINGS
# Directory to search for the log files to be replicated.
# Directory to search for the journal files to be replicated.
#
# log_source_directory =
# journal_source_directory =
# Filter to limit replication to the particular source database (based on its GUID).
# Expected format: {XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX}
# Expected format: "{XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX}"
# Note that double quotes are mandatory, as well as curly braces.
#
# source_guid =
@ -169,14 +170,14 @@ database
#
# (for asynchronous replication)
#
# log_directory = /your/db/chlog
# log_archive_directory = /your/db/archlog
# log_archive_timeout = 10
# journal_directory = /your/db/journal
# journal_archive_directory = /your/db/archive
# journal_archive_timeout = 10
# }
#
# (for the replica side)
#
# database = /your/db.fdb
# {
# log_source_directory = /your/db/incominglog
# journal_source_directory = /your/db/source
# }

View File

@ -20,7 +20,7 @@ Asynchronous replication is based on journalling. Replicated changes are written
Segments are regularly rotated, this process is controlled by either maximum segment size or timeout, both thresholds are configurable. Once the active segment reaches the threshold, it's marked as "full" and writing switches to the next available segment. Full segments are archived and then reused for subsequent writes. Archiving basically means copying the segment with a purpose of transferring it to the replica host and applying there. Copying can be done by Firebird server itself or, alternatively, by custom \(user-specified\) command.
On the replica side, journal segments are applied in the replication sequence order. Firebird server periodically scans for the new segments appearing in the configured directory. Once the next segment is found, it gets replicated. Replication state is stored in the local file named {UUID} \(per every replication source\) and contains the following markers: latest segment sequence \(LSS\), oldest segment sequence \(OSS\) and list of active transactions started between OSS and LSS. LSS means the last replicated segment. OSS means the segment that started the earliest transaction that wasn't finished at the time LSS was processed. These markers control two things: \(1\) what segment must be replicated next and \(2\) when segment files can be safely deleted. Segments with numbers between OSS and LSS are preserved for replaying the journal after the replicator disconnects from the replica database \(e.g. due to replication error or idle timeout\). If there are no active transactions pending and LSS was processed without errors, all segments up to \(and including\) LSS are deleted. In the case of any critical error, replication is temporarily suspended and re-attempted after timeout.
On the replica side, journal segments are applied in the replication sequence order. Firebird server periodically scans for the new segments appearing in the configured directory. Once the next segment is found, it gets replicated. Replication state is stored in the local file named {UUID} \(per every replication source\) and contains the following markers: latest segment sequence \(LSS\), oldest segment sequence \(OSS\) and list of active transactions started between OSS and LSS. LSS means the last replicated segment. OSS means the segment that started the earliest transaction that wasn't finished at the time LSS was processed. These markers control two things: \(1\) what segment must be replicated next and \(2\) when segments can be safely deleted. Segments with numbers between OSS and LSS are preserved for replaying the journal after the replicator disconnects from the replica database \(e.g. due to replication error or idle timeout\). If there are no active transactions pending and LSS was processed without errors, all segments up to \(and including\) LSS are deleted. In the case of any critical error, replication is temporarily suspended and re-attempted after timeout.
## Error reporting
@ -51,39 +51,39 @@ Tables enabled for replicated can be additionally filtered using two settings in
Synchronous replication can be turned on using the sync\_replica setting \(multiple entries are allowed\). It must specify a connection string to the replica database, prefixed with username/password. In SuperServer and SuperClassic architectures, replica database is being internally attached when the first user gets connected to the master database and detached when the last user disconnects from the master database. In Classic Server architecture, every server process keeps an active connection to the replica database.
Asynchronous replication requires setting up the journalling mechanism. The primary parameter is log\_directory which defines location of the replication journal. Once this location is specified, asynchronous replication is turned on and Firebird server starts producing the journal segments.
Asynchronous replication requires setting up the journalling mechanism. The primary parameter is journal\_directory which defines location of the replication journal files (_aka_ segments). Once this location is specified, asynchronous replication is turned on and Firebird server starts producing the journal segments.
Minimal configuration looks like this:
database = /data/mydb.fdb
{
log\_directory = /dblogs/mydb/
log\_archive\_directory = /shiplogs/mydb/
journal\_directory = /journal/mydb/
journal\_archive\_directory = /archive/mydb/
}
Archiving is performed by copying the segments from /dblogs/mydb/ to /shiplogs/mydb/, Firebird server copies the segments itself.
Archiving is performed by copying the segments from /journal/mydb/ to /archive/mydb/, Firebird server copies the segments itself.
The same with user-defined archiving:
database = /data/mydb.fdb
{
log\_directory = /dblogs/mydb/
log\_archive\_directory = /shiplogs/mydb/
log\_archive\_command = "test ! -f $\(archpathname\) && cp $\(logpathname\) $\(archpathname\)"
journal\_directory = /journal/mydb/
journal\_archive\_directory = /archive/mydb/
journal\_archive\_command = "test ! -f $\(archivepathname\) && cp $\(pathname\) $\(archivepathname\)"
}
Where $\(logpathname\) and $\(archpathname\) are built-in macros that provide the custom shell command with real file names.
Where $\(pathname\) and $\(archivepathname\) are built-in macros that provide the custom shell command with real file names.
Custom archiving \(log\_archive\_command setting\) allows to use any system shell command \(including scripts / batch files\) to deliver segments to the replica side. It could use compression, FTP, or whatever else available on the server. Actual transport implementation is up to DBA, Firebird just produces segments on the master side and expects them to appear at the replica side. If the replica storage can be remotely attached to the master host, it becomes just a matter of copying the segment files. In other cases, some transport solution is required.
Custom archiving \(journal\_archive\_command setting\) allows to use any system shell command \(including scripts / batch files\) to deliver segments to the replica side. It could use compression, FTP, or whatever else available on the server. Actual transport implementation is up to DBA, Firebird just produces segments on the master side and expects them to appear at the replica side. If the replica storage can be remotely attached to the master host, it becomes just a matter of copying the segments. In other cases, some transport solution is required.
The same with archiving performed every 10 seconds:
database = /data/mydb.fdb
{
log\_directory = /dblogs/mydb/
log\_archive\_directory = /shiplogs/mydb/
log\_archive\_command = "test ! -f $\(archpathname\) && cp $\(logpathname\) $\(archpathname\)"
log\_archive\_timeout = 10
journal\_directory = /journal/mydb/
journal\_archive\_directory = /archive/mydb/
journal\_archive\_command = "test ! -f $\(archivepathname\) && cp $\(pathname\) $\(archivepathname\)"
journal\_archive\_timeout = 10
}
Read replication.conf for other possible settings.
@ -92,14 +92,14 @@ To apply the changed master-side settings, all users must be reconnected.
## Setting up the replica side
The same replication.conf file is used. Setting log\_source\_directory specifies the location that Firebird server scans for the transmitted segments. Additionally, DBA may explicitly specify what source database is accepted for replication. Setting source\_guid is used for that purpose.
The same replication.conf file is used. Setting journal\_source\_directory specifies the location that Firebird server scans for the transmitted segments. Additionally, DBA may explicitly specify what source database is accepted for replication. Setting source\_guid is used for that purpose.
Sample configuration looks like this:
database = /data/mydb.fdb
{
log\_source\_directory = /incominglogs/
source\_guid = {6F9619FF-8B86-D011-B42D-00CF4FC964FF}
journal\_source\_directory = /incoming/
source\_guid = "{6F9619FF-8B86-D011-B42D-00CF4FC964FF}"
}
Read replication.conf for other possible settings.

View File

@ -352,7 +352,7 @@ void BackupManager::beginBackup(thread_db* tdbb)
PAG_replace_entry_first(tdbb, header, Ods::HDR_backup_guid, sizeof(guid),
reinterpret_cast<const UCHAR*>(&guid));
REPL_log_switch(tdbb);
REPL_journal_switch(tdbb);
stateGuard.releaseHeader();
stateGuard.setSuccess();

View File

@ -73,11 +73,11 @@ namespace
const unsigned COPY_BLOCK_SIZE = 64 * 1024; // 64 KB
const char* LOGFILE_PATTERN = "%s.chlog-%09" UQUADFORMAT;
const char* FILENAME_PATTERN = "%s.journal-%09" UQUADFORMAT;
const char* LOGFILENAME_WILDCARD = "$(logfilename)";
const char* LOGPATHNAME_WILDCARD = "$(logpathname)";
const char* ARCHPATHNAME_WILDCARD = "$(archpathname)";
const char* FILENAME_WILDCARD = "$(filename)";
const char* PATHNAME_WILDCARD = "$(pathname)";
const char* ARCHPATHNAME_WILDCARD = "$(archivepathname)";
SegmentHeader g_dummyHeader;
@ -133,9 +133,9 @@ ChangeLog::Segment::~Segment()
void ChangeLog::Segment::init(FB_UINT64 sequence, const Guid& guid)
{
fb_assert(sizeof(LOG_SIGNATURE) == sizeof(m_header->hdr_signature));
strcpy(m_header->hdr_signature, LOG_SIGNATURE);
m_header->hdr_version = LOG_CURRENT_VERSION;
fb_assert(sizeof(CHANGELOG_SIGNATURE) == sizeof(m_header->hdr_signature));
strcpy(m_header->hdr_signature, CHANGELOG_SIGNATURE);
m_header->hdr_version = CHANGELOG_CURRENT_VERSION;
m_header->hdr_state = SEGMENT_STATE_USED;
memcpy(&m_header->hdr_guid, &guid, sizeof(Guid));
m_header->hdr_sequence = sequence;
@ -146,10 +146,10 @@ void ChangeLog::Segment::init(FB_UINT64 sequence, const Guid& guid)
bool ChangeLog::Segment::validate(const Guid& guid) const
{
if (strcmp(m_header->hdr_signature, LOG_SIGNATURE))
if (strcmp(m_header->hdr_signature, CHANGELOG_SIGNATURE))
return false;
if (m_header->hdr_version != LOG_CURRENT_VERSION)
if (m_header->hdr_version != CHANGELOG_CURRENT_VERSION)
return false;
if (m_header->hdr_state != SEGMENT_STATE_FREE &&
@ -212,10 +212,10 @@ void ChangeLog::Segment::append(ULONG length, const UCHAR* data)
const auto currentLength = (SINT64) m_header->hdr_length;
if (os_utils::lseek(m_handle, currentLength, SEEK_SET) != currentLength)
raiseError("Log file %s seek failed (error %d)", m_filename.c_str(), ERRNO);
raiseError("Journal file %s seek failed (error %d)", m_filename.c_str(), ERRNO);
if (::write(m_handle, data, length) != length)
raiseError("Log file %s write failed (error %d)", m_filename.c_str(), ERRNO);
raiseError("Journal file %s write failed (error %d)", m_filename.c_str(), ERRNO);
m_header->hdr_length += length;
}
@ -243,7 +243,7 @@ void ChangeLog::Segment::truncate()
#else
if (os_utils::ftruncate(m_handle, length))
#endif
raiseError("Log file %s truncate failed (error %d)", m_filename.c_str(), ERRNO);
raiseError("Journal file %s truncate failed (error %d)", m_filename.c_str(), ERRNO);
mapHeader();
}
@ -267,18 +267,18 @@ void ChangeLog::Segment::mapHeader()
0, sizeof(SegmentHeader), NULL);
if (m_mapping == INVALID_HANDLE_VALUE)
raiseError("Log file %s mapping failed (error %d)", m_filename.c_str(), ERRNO);
raiseError("Journal file %s mapping failed (error %d)", m_filename.c_str(), ERRNO);
auto address = MapViewOfFile(m_mapping, FILE_MAP_READ | FILE_MAP_WRITE,
0, 0, sizeof(SegmentHeader));
if (!address)
raiseError("Log file %s mapping failed (error %d)", m_filename.c_str(), ERRNO);
raiseError("Journal file %s mapping failed (error %d)", m_filename.c_str(), ERRNO);
#else
auto address = mmap(NULL, sizeof(SegmentHeader), PROT_READ | PROT_WRITE, MAP_SHARED, m_handle, 0);
if (address == MAP_FAILED)
raiseError("Log file %s mapping failed (error %d)", m_filename.c_str(), ERRNO);
raiseError("Journal file %s mapping failed (error %d)", m_filename.c_str(), ERRNO);
#endif
m_header = (SegmentHeader*) address;
@ -567,7 +567,7 @@ FB_UINT64 ChangeLog::write(ULONG length, const UCHAR* data, bool sync)
if (i == 0) // log the warning just once
{
const string warningMsg =
"Out of available space in changelog segments, waiting for archiving...";
"Out of available space in journal segments, waiting for archiving...";
logPrimaryWarning(m_config->dbName, warningMsg);
}
@ -581,7 +581,7 @@ FB_UINT64 ChangeLog::write(ULONG length, const UCHAR* data, bool sync)
}
if (!segment)
raiseError("Out of available space in changelog segments");
raiseError("Out of available space in journal segments");
const auto state = m_sharedMemory->getHeader();
@ -592,13 +592,13 @@ FB_UINT64 ChangeLog::write(ULONG length, const UCHAR* data, bool sync)
if (sync)
{
if (m_config->logGroupFlushDelay)
if (m_config->groupFlushDelay)
{
const auto flushMark = state->flushMark;
segment->addRef();
for (ULONG delay = 0; delay < m_config->logGroupFlushDelay;
for (ULONG delay = 0; delay < m_config->groupFlushDelay;
delay += FLUSH_WAIT_INTERVAL)
{
if (state->flushMark != flushMark)
@ -630,25 +630,25 @@ FB_UINT64 ChangeLog::write(ULONG length, const UCHAR* data, bool sync)
bool ChangeLog::archiveExecute(Segment* segment)
{
if (m_config->logArchiveCommand.hasData())
if (m_config->archiveCommand.hasData())
{
segment->truncate();
auto archiveCommand = m_config->logArchiveCommand;
auto archiveCommand = m_config->archiveCommand;
const auto logfilename = segment->getFileName();
const auto logpathname = m_config->logDirectory + logfilename;
const auto filename = segment->getFileName();
const auto pathname = m_config->journalDirectory + filename;
const auto archpathname = m_config->logArchiveDirectory.hasData() ?
m_config->logArchiveDirectory + logfilename : "";
const auto archpathname = m_config->archiveDirectory.hasData() ?
m_config->archiveDirectory + filename : "";
size_t pos;
while ( (pos = archiveCommand.find(LOGFILENAME_WILDCARD)) != string::npos)
archiveCommand.replace(pos, strlen(LOGFILENAME_WILDCARD), logfilename);
while ( (pos = archiveCommand.find(FILENAME_WILDCARD)) != string::npos)
archiveCommand.replace(pos, strlen(FILENAME_WILDCARD), filename);
while ( (pos = archiveCommand.find(LOGPATHNAME_WILDCARD)) != string::npos)
archiveCommand.replace(pos, strlen(LOGPATHNAME_WILDCARD), logpathname);
while ( (pos = archiveCommand.find(PATHNAME_WILDCARD)) != string::npos)
archiveCommand.replace(pos, strlen(PATHNAME_WILDCARD), pathname);
while ( (pos = archiveCommand.find(ARCHPATHNAME_WILDCARD)) != string::npos)
archiveCommand.replace(pos, strlen(ARCHPATHNAME_WILDCARD), archpathname);
@ -664,12 +664,12 @@ bool ChangeLog::archiveExecute(Segment* segment)
if (res < 0)
{
errorMsg.printf("Cannot execute log archive command (error %d): %s",
errorMsg.printf("Cannot execute journal archive command (error %d): %s",
ERRNO, archiveCommand.c_str());
}
else
{
errorMsg.printf("Unexpected result (%d) while executing log archive command: %s",
errorMsg.printf("Unexpected result (%d) while executing journal archive command: %s",
res, archiveCommand.c_str());
}
@ -677,10 +677,10 @@ bool ChangeLog::archiveExecute(Segment* segment)
return false;
}
}
else if (m_config->logArchiveDirectory.hasData())
else if (m_config->archiveDirectory.hasData())
{
const auto logfilename = segment->getFileName();
const auto archpathname = m_config->logArchiveDirectory + logfilename;
const auto filename = segment->getFileName();
const auto archpathname = m_config->archiveDirectory + filename;
struct stat statistics;
if (os_utils::stat(archpathname.c_str(), &statistics) == 0)
@ -688,7 +688,7 @@ bool ChangeLog::archiveExecute(Segment* segment)
if (statistics.st_size > (int) sizeof(SegmentHeader))
{
string warningMsg;
warningMsg.printf("Destination log file %s exists, it will be overwritten",
warningMsg.printf("Destination journal file %s exists, it will be overwritten",
archpathname.c_str());
logPrimaryWarning(m_config->dbName, warningMsg);
@ -703,7 +703,7 @@ bool ChangeLog::archiveExecute(Segment* segment)
}
catch (const status_exception& ex)
{
string errorMsg = "Cannot copy log segment";
string errorMsg = "Cannot copy journal segment";
const ISC_STATUS* status = ex.value();
TEXT temp[BUFFER_LARGE];
@ -718,7 +718,7 @@ bool ChangeLog::archiveExecute(Segment* segment)
}
catch (...)
{
const string errorMsg = "Cannot copy log segment (reason unknown)";
const string errorMsg = "Cannot copy journal segment (reason unknown)";
logPrimaryError(m_config->dbName, errorMsg);
return false;
}
@ -729,7 +729,7 @@ bool ChangeLog::archiveExecute(Segment* segment)
bool ChangeLog::archiveSegment(Segment* segment)
{
// if (m_config->logArchiveCommand.hasData() || m_config->logArchiveDirectory.hasData())
// if (m_config->archiveCommand.hasData() || m_config->archiveDirectory.hasData())
{
segment->setState(SEGMENT_STATE_ARCH);
segment->addRef();
@ -788,11 +788,11 @@ void ChangeLog::bgArchiver()
{
if (segment->getState() == SEGMENT_STATE_USED)
{
if (segment->hasData() && m_config->logArchiveTimeout)
if (segment->hasData() && m_config->archiveTimeout)
{
const auto delta_timestamp = time(NULL) - state->timestamp;
if (delta_timestamp > m_config->logArchiveTimeout)
if (delta_timestamp > m_config->archiveTimeout)
{
segment->setState(SEGMENT_STATE_FULL);
state->flushMark++;
@ -830,7 +830,7 @@ void ChangeLog::bgArchiver()
}
catch (const Exception& ex)
{
iscLogException("Error in changelog thread", ex);
iscLogException("Error in journal thread", ex);
}
// Signal about our exit
@ -841,7 +841,7 @@ void ChangeLog::bgArchiver()
}
catch (const Exception& ex)
{
iscLogException("Error while exiting changelog thread", ex);
iscLogException("Error while exiting journal thread", ex);
}
}
@ -851,7 +851,7 @@ void ChangeLog::initSegments()
const auto state = m_sharedMemory->getHeader();
for (auto iter = PathUtils::newDirIterator(getPool(), m_config->logDirectory);
for (auto iter = PathUtils::newDirIterator(getPool(), m_config->journalDirectory);
*iter; ++(*iter))
{
const auto filename = **iter;
@ -885,15 +885,15 @@ ChangeLog::Segment* ChangeLog::createSegment()
const auto sequence = ++state->sequence;
PathName filename;
filename.printf(LOGFILE_PATTERN, m_config->logFilePrefix.c_str(), sequence);
filename = m_config->logDirectory + filename;
filename.printf(FILENAME_PATTERN, m_config->filePrefix.c_str(), sequence);
filename = m_config->journalDirectory + filename;
const auto fd = os_utils::openCreateSharedFile(filename.c_str(), O_EXCL | O_BINARY);
if (::write(fd, &g_dummyHeader, sizeof(SegmentHeader)) != sizeof(SegmentHeader))
{
::close(fd);
raiseError("Log file %s write failed (error %d)", filename.c_str(), ERRNO);
raiseError("Journal file %s write failed (error %d)", filename.c_str(), ERRNO);
}
const auto segment = FB_NEW_POOL(getPool()) Segment(getPool(), filename, fd);
@ -931,11 +931,11 @@ ChangeLog::Segment* ChangeLog::reuseSegment(ChangeLog::Segment* segment)
const auto sequence = ++state->sequence;
PathName newname;
newname.printf(LOGFILE_PATTERN, m_config->logFilePrefix.c_str(), sequence);
newname = m_config->logDirectory + newname;
newname.printf(FILENAME_PATTERN, m_config->filePrefix.c_str(), sequence);
newname = m_config->journalDirectory + newname;
if (::rename(orgname.c_str(), newname.c_str()) < 0)
raiseError("Log file %s rename failed (error: %d)", orgname.c_str(), ERRNO);
raiseError("Journal file %s rename failed (error: %d)", orgname.c_str(), ERRNO);
// Re-open the segment using a new name and initialize it
@ -966,7 +966,7 @@ ChangeLog::Segment* ChangeLog::getSegment(ULONG length)
if (segmentState == SEGMENT_STATE_USED)
{
if (activeSegment)
raiseError("Multiple active changelog segments found");
raiseError("Multiple active journal segments found");
activeSegment = segment;
}
@ -984,18 +984,18 @@ ChangeLog::Segment* ChangeLog::getSegment(ULONG length)
if (activeSegment)
{
if (activeSegment->getLength() + length > m_config->logSegmentSize)
if (activeSegment->getLength() + length > m_config->segmentSize)
{
activeSegment->setState(SEGMENT_STATE_FULL);
state->flushMark++;
activeSegment = NULL;
m_workingSemaphore.release();
}
else if (activeSegment->hasData() && m_config->logArchiveTimeout)
else if (activeSegment->hasData() && m_config->archiveTimeout)
{
const size_t deltaTimestamp = time(NULL) - state->timestamp;
if (deltaTimestamp > m_config->logArchiveTimeout)
if (deltaTimestamp > m_config->archiveTimeout)
{
activeSegment->setState(SEGMENT_STATE_FULL);
activeSegment = NULL;
@ -1012,7 +1012,7 @@ ChangeLog::Segment* ChangeLog::getSegment(ULONG length)
// Allocate one more segment if configuration allows that
if (!m_config->logSegmentCount || m_segments.getCount() < m_config->logSegmentCount)
if (!m_config->segmentCount || m_segments.getCount() < m_config->segmentCount)
return createSegment();
return NULL;

View File

@ -51,10 +51,10 @@ namespace Replication
FB_UINT64 hdr_length;
};
const char LOG_SIGNATURE[] = "FBCHANGELOG";
const char CHANGELOG_SIGNATURE[] = "FBCHANGELOG";
const USHORT LOG_VERSION_1 = 1;
const USHORT LOG_CURRENT_VERSION = LOG_VERSION_1;
const USHORT CHANGELOG_VERSION_1 = 1;
const USHORT CHANGELOG_CURRENT_VERSION = CHANGELOG_VERSION_1;
class ChangeLog : protected Firebird::PermanentStorage, public Firebird::IpcObject
{

View File

@ -52,10 +52,10 @@ namespace
const char* REPLICATION_CFGFILE = "replication.conf";
const ULONG DEFAULT_BUFFER_SIZE = 1024 * 1024; // 1 MB
const ULONG DEFAULT_LOG_SEGMENT_SIZE = 16 * 1024 * 1024; // 16 MB
const ULONG DEFAULT_LOG_SEGMENT_COUNT = 8;
const ULONG DEFAULT_LOG_ARCHIVE_TIMEOUT = 60; // seconds
const ULONG DEFAULT_LOG_GROUP_FLUSH_DELAY = 0;
const ULONG DEFAULT_SEGMENT_SIZE = 16 * 1024 * 1024; // 16 MB
const ULONG DEFAULT_SEGMENT_COUNT = 8;
const ULONG DEFAULT_ARCHIVE_TIMEOUT = 60; // seconds
const ULONG DEFAULT_GROUP_FLUSH_DELAY = 0;
const ULONG DEFAULT_APPLY_IDLE_TIMEOUT = 10; // seconds
const ULONG DEFAULT_APPLY_ERROR_TIMEOUT = 60; // seconds
@ -109,16 +109,16 @@ Config::Config()
bufferSize(DEFAULT_BUFFER_SIZE),
includeFilter(getPool()),
excludeFilter(getPool()),
logSegmentSize(DEFAULT_LOG_SEGMENT_SIZE),
logSegmentCount(DEFAULT_LOG_SEGMENT_COUNT),
logDirectory(getPool()),
logFilePrefix(getPool()),
logGroupFlushDelay(DEFAULT_LOG_GROUP_FLUSH_DELAY),
logArchiveDirectory(getPool()),
logArchiveCommand(getPool()),
logArchiveTimeout(DEFAULT_LOG_ARCHIVE_TIMEOUT),
segmentSize(DEFAULT_SEGMENT_SIZE),
segmentCount(DEFAULT_SEGMENT_COUNT),
journalDirectory(getPool()),
filePrefix(getPool()),
groupFlushDelay(DEFAULT_GROUP_FLUSH_DELAY),
archiveDirectory(getPool()),
archiveCommand(getPool()),
archiveTimeout(DEFAULT_ARCHIVE_TIMEOUT),
syncReplicas(getPool()),
logSourceDirectory(getPool()),
sourceDirectory(getPool()),
sourceGuid{},
verboseLogging(false),
applyIdleTimeout(DEFAULT_APPLY_IDLE_TIMEOUT),
@ -135,16 +135,16 @@ Config::Config(const Config& other)
bufferSize(other.bufferSize),
includeFilter(getPool(), other.includeFilter),
excludeFilter(getPool(), other.excludeFilter),
logSegmentSize(other.logSegmentSize),
logSegmentCount(other.logSegmentCount),
logDirectory(getPool(), other.logDirectory),
logFilePrefix(getPool(), other.logFilePrefix),
logGroupFlushDelay(other.logGroupFlushDelay),
logArchiveDirectory(getPool(), other.logArchiveDirectory),
logArchiveCommand(getPool(), other.logArchiveCommand),
logArchiveTimeout(other.logArchiveTimeout),
segmentSize(other.segmentSize),
segmentCount(other.segmentCount),
journalDirectory(getPool(), other.journalDirectory),
filePrefix(getPool(), other.filePrefix),
groupFlushDelay(other.groupFlushDelay),
archiveDirectory(getPool(), other.archiveDirectory),
archiveCommand(getPool(), other.archiveCommand),
archiveTimeout(other.archiveTimeout),
syncReplicas(getPool(), other.syncReplicas),
logSourceDirectory(getPool(), other.logSourceDirectory),
sourceDirectory(getPool(), other.sourceDirectory),
sourceGuid{},
verboseLogging(other.verboseLogging),
applyIdleTimeout(other.applyIdleTimeout),
@ -234,41 +234,41 @@ Config* Config::get(const PathName& lookupName)
ISC_systemToUtf8(value);
config->excludeFilter = value;
}
else if (key == "log_segment_size")
else if (key == "journal_segment_size")
{
parseLong(value, config->logSegmentSize);
parseLong(value, config->segmentSize);
}
else if (key == "log_segment_count")
else if (key == "journal_segment_count")
{
parseLong(value, config->logSegmentCount);
parseLong(value, config->segmentCount);
}
else if (key == "log_directory")
else if (key == "journal_directory")
{
config->logDirectory = value.c_str();
PathUtils::ensureSeparator(config->logDirectory);
checkAccess(config->logDirectory, key);
config->journalDirectory = value.c_str();
PathUtils::ensureSeparator(config->journalDirectory);
checkAccess(config->journalDirectory, key);
}
else if (key == "log_file_prefix")
else if (key == "journal_file_prefix")
{
config->logFilePrefix = value.c_str();
config->filePrefix = value.c_str();
}
else if (key == "log_group_flush_delay")
else if (key == "journal_group_flush_delay")
{
parseLong(value, config->logGroupFlushDelay);
parseLong(value, config->groupFlushDelay);
}
else if (key == "log_archive_directory")
else if (key == "journal_archive_directory")
{
config->logArchiveDirectory = value.c_str();
PathUtils::ensureSeparator(config->logArchiveDirectory);
checkAccess(config->logArchiveDirectory, key);
config->archiveDirectory = value.c_str();
PathUtils::ensureSeparator(config->archiveDirectory);
checkAccess(config->archiveDirectory, key);
}
else if (key == "log_archive_command")
else if (key == "journal_archive_command")
{
config->logArchiveCommand = value.c_str();
config->archiveCommand = value.c_str();
}
else if (key == "log_archive_timeout")
else if (key == "journal_archive_timeout")
{
parseLong(value, config->logArchiveTimeout);
parseLong(value, config->archiveTimeout);
}
else if (key == "plugin")
{
@ -297,15 +297,15 @@ Config* Config::get(const PathName& lookupName)
if (config->pluginName.hasData())
return config.release();
if (config->logDirectory.hasData() || config->syncReplicas.hasData())
if (config->journalDirectory.hasData() || config->syncReplicas.hasData())
{
// If log_directory is specified, then replication is enabled
if (config->logFilePrefix.isEmpty())
if (config->filePrefix.isEmpty())
{
PathName db_directory, db_filename;
PathUtils::splitLastComponent(db_directory, db_filename, config->dbName);
config->logFilePrefix = db_filename;
config->filePrefix = db_filename;
}
return config.release();
@ -378,11 +378,11 @@ void Config::enumerate(Firebird::Array<Config*>& replicas)
if (value.isEmpty())
continue;
if (key == "log_source_directory")
if (key == "journal_source_directory")
{
config->logSourceDirectory = value.c_str();
PathUtils::ensureSeparator(config->logSourceDirectory);
checkAccess(config->logSourceDirectory, key);
config->sourceDirectory = value.c_str();
PathUtils::ensureSeparator(config->sourceDirectory);
checkAccess(config->sourceDirectory, key);
}
else if (key == "source_guid")
{
@ -403,7 +403,7 @@ void Config::enumerate(Firebird::Array<Config*>& replicas)
}
}
if (dbName.hasData() && config->logSourceDirectory.hasData())
if (dbName.hasData() && config->sourceDirectory.hasData())
{
// If source_directory is specified, then replication is enabled

View File

@ -43,16 +43,16 @@ namespace Replication
ULONG bufferSize;
Firebird::string includeFilter;
Firebird::string excludeFilter;
ULONG logSegmentSize;
ULONG logSegmentCount;
Firebird::PathName logDirectory;
Firebird::PathName logFilePrefix;
ULONG logGroupFlushDelay;
Firebird::PathName logArchiveDirectory;
Firebird::string logArchiveCommand;
ULONG logArchiveTimeout;
ULONG segmentSize;
ULONG segmentCount;
Firebird::PathName journalDirectory;
Firebird::PathName filePrefix;
ULONG groupFlushDelay;
Firebird::PathName archiveDirectory;
Firebird::string archiveCommand;
ULONG archiveTimeout;
Firebird::ObjectsArray<Firebird::string> syncReplicas;
Firebird::PathName logSourceDirectory;
Firebird::PathName sourceDirectory;
Firebird::Guid sourceGuid;
bool verboseLogging;
ULONG applyIdleTimeout;

View File

@ -114,7 +114,7 @@ Manager::Manager(const string& dbId,
const Guid& guid = dbb->dbb_guid;
m_sequence = dbb->dbb_repl_sequence;
if (config->logDirectory.hasData())
if (config->journalDirectory.hasData())
{
m_changeLog = FB_NEW_POOL(getPool())
ChangeLog(getPool(), dbId, guid, m_sequence, config);

View File

@ -78,7 +78,7 @@ namespace Replication
void flush(Firebird::UCharBuffer* buffer, bool sync);
void forceLogSwitch()
void forceJournalSwitch()
{
if (m_changeLog)
m_changeLog->forceSwitch();

View File

@ -691,7 +691,7 @@ void REPL_exec_sql(thread_db* tdbb, jrd_tra* transaction, const string& sql)
checkStatus(tdbb, status, transaction);
}
void REPL_log_switch(thread_db* tdbb)
void REPL_journal_switch(thread_db* tdbb)
{
const auto dbb = tdbb->getDatabase();
@ -699,5 +699,5 @@ void REPL_log_switch(thread_db* tdbb)
if (!replMgr)
return;
replMgr->forceLogSwitch();
replMgr->forceJournalSwitch();
}

View File

@ -45,6 +45,6 @@ void REPL_modify(Jrd::thread_db* tdbb, const Jrd::record_param* orgRpb,
void REPL_erase(Jrd::thread_db* tdbb, const Jrd::record_param* rpb, Jrd::jrd_tra* transaction);
void REPL_gen_id(Jrd::thread_db* tdbb, SLONG genId, SINT64 value);
void REPL_exec_sql(Jrd::thread_db* tdbb, Jrd::jrd_tra* transaction, const Firebird::string& sql);
void REPL_log_switch(Jrd::thread_db* tdbb);
void REPL_journal_switch(Jrd::thread_db* tdbb);
#endif // JRD_REPLICATION_PUBLISHER_H

View File

@ -441,7 +441,7 @@ namespace
const PathName& getDirectory() const
{
return m_config->logSourceDirectory;
return m_config->sourceDirectory;
}
void logError(const string& message)
@ -479,9 +479,9 @@ namespace
typedef Array<Target*> TargetList;
struct LogSegment
struct Segment
{
explicit LogSegment(MemoryPool& pool, const PathName& fname, const SegmentHeader& hdr)
explicit Segment(MemoryPool& pool, const PathName& fname, const SegmentHeader& hdr)
: filename(pool, fname)
{
memcpy(&header, &hdr, sizeof(SegmentHeader));
@ -495,14 +495,14 @@ namespace
PathUtils::concatPath(newname, path, "~" + name);
if (rename(filename.c_str(), newname.c_str()) < 0)
raiseError("Log file %s rename failed (error: %d)", filename.c_str(), ERRNO);
raiseError("Journal file %s rename failed (error: %d)", filename.c_str(), ERRNO);
#else
if (unlink(filename.c_str()) < 0)
raiseError("Log file %s unlink failed (error: %d)", filename.c_str(), ERRNO);
raiseError("Journal file %s unlink failed (error: %d)", filename.c_str(), ERRNO);
#endif
}
static const FB_UINT64& generate(const LogSegment* item)
static const FB_UINT64& generate(const Segment* item)
{
return item->header.hdr_sequence;
}
@ -511,7 +511,7 @@ namespace
SegmentHeader header;
};
typedef SortedArray<LogSegment*, EmptyStorage<LogSegment*>, FB_UINT64, LogSegment> ProcessQueue;
typedef SortedArray<Segment*, EmptyStorage<Segment*>, FB_UINT64, Segment> ProcessQueue;
string formatInterval(const TimeStamp& start, const TimeStamp& finish)
{
@ -551,10 +551,10 @@ namespace
bool validateHeader(const SegmentHeader* header)
{
if (strcmp(header->hdr_signature, LOG_SIGNATURE))
if (strcmp(header->hdr_signature, CHANGELOG_SIGNATURE))
return false;
if (header->hdr_version != LOG_CURRENT_VERSION)
if (header->hdr_version != CHANGELOG_CURRENT_VERSION)
return false;
if (header->hdr_state != SEGMENT_STATE_FREE &&
@ -623,7 +623,7 @@ namespace
{
// First pass: create the processing queue
for (auto iter = PathUtils::newDirIterator(pool, config->logSourceDirectory);
for (auto iter = PathUtils::newDirIterator(pool, config->sourceDirectory);
*iter; ++(*iter))
{
const auto filename = **iter;
@ -652,14 +652,14 @@ namespace
continue;
}
raiseError("Log file %s open failed (error: %d)", filename.c_str(), ERRNO);
raiseError("Journal file %s open failed (error: %d)", filename.c_str(), ERRNO);
}
AutoFile file(fd);
struct stat stats;
if (fstat(file, &stats) < 0)
raiseError("Log file %s fstat failed (error: %d)", filename.c_str(), ERRNO);
raiseError("Journal file %s fstat failed (error: %d)", filename.c_str(), ERRNO);
const size_t fileSize = stats.st_size;
@ -671,12 +671,12 @@ namespace
}
if (lseek(file, 0, SEEK_SET) != 0)
raiseError("Log file %s seek failed (error: %d)", filename.c_str(), ERRNO);
raiseError("Journal file %s seek failed (error: %d)", filename.c_str(), ERRNO);
SegmentHeader header;
if (read(file, &header, sizeof(SegmentHeader)) != sizeof(SegmentHeader))
raiseError("Log file %s read failed (error: %d)", filename.c_str(), ERRNO);
raiseError("Journal file %s read failed (error: %d)", filename.c_str(), ERRNO);
if (!validateHeader(&header))
{
@ -713,7 +713,7 @@ namespace
if (header.hdr_state != SEGMENT_STATE_ARCH)
continue;
*/
queue.add(FB_NEW_POOL(pool) LogSegment(pool, filename, header));
queue.add(FB_NEW_POOL(pool) Segment(pool, filename, header));
}
if (queue.isEmpty())
@ -734,9 +734,9 @@ namespace
FB_UINT64 next_sequence = 0;
const bool restart = target->isShutdown();
for (LogSegment** iter = queue.begin(); iter != queue.end(); ++iter)
for (Segment** iter = queue.begin(); iter != queue.end(); ++iter)
{
LogSegment* const segment = *iter;
Segment* const segment = *iter;
const FB_UINT64 sequence = segment->header.hdr_sequence;
const Guid& guid = segment->header.hdr_guid;
@ -808,7 +808,7 @@ namespace
break;
}
raiseError("Log file %s open failed (error: %d)", segment->filename.c_str(), ERRNO);
raiseError("Journal file %s open failed (error: %d)", segment->filename.c_str(), ERRNO);
}
const TimeStamp startTime(TimeStamp::getCurrentTimeStamp());
@ -818,17 +818,17 @@ namespace
SegmentHeader header;
if (read(file, &header, sizeof(SegmentHeader)) != sizeof(SegmentHeader))
raiseError("Log file %s read failed (error: %d)", segment->filename.c_str(), ERRNO);
raiseError("Journal file %s read failed (error: %d)", segment->filename.c_str(), ERRNO);
if (memcmp(&header, &segment->header, sizeof(SegmentHeader)))
raiseError("Log file %s was unexpectedly changed", segment->filename.c_str());
raiseError("Journal file %s was unexpectedly changed", segment->filename.c_str());
ULONG totalLength = sizeof(SegmentHeader);
while (totalLength < segment->header.hdr_length)
{
Block header;
if (read(file, &header, sizeof(Block)) != sizeof(Block))
raiseError("Log file %s read failed (error %d)", segment->filename.c_str(), ERRNO);
raiseError("Journal file %s read failed (error %d)", segment->filename.c_str(), ERRNO);
const auto blockLength = header.length;
const auto length = sizeof(Block) + blockLength;
@ -842,7 +842,7 @@ namespace
memcpy(data, &header, sizeof(Block));
if (read(file, data + sizeof(Block), blockLength) != blockLength)
raiseError("Log file %s read failed (error %d)", segment->filename.c_str(), ERRNO);
raiseError("Journal file %s read failed (error %d)", segment->filename.c_str(), ERRNO);
const bool success =
replicate(localStatus, sequence,
@ -906,7 +906,7 @@ namespace
{
do
{
LogSegment* const segment = queue[pos++];
Segment* const segment = queue[pos++];
const FB_UINT64 sequence = segment->header.hdr_sequence;
if (sequence >= threshold)