8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 22:43:03 +01:00

Enhanced control on the size of available batch buffers when restoring database ensures never getting isc_batch_too_big error, related to #6959

This commit is contained in:
AlexPeshkoff 2021-09-15 17:20:01 +03:00
parent ee088c22e3
commit b448ed8f6d
7 changed files with 106 additions and 55 deletions

View File

@ -1149,6 +1149,7 @@ public:
bool hdr_forced_writes;
TEXT database_security_class[GDS_NAME_LEN]; // To save database security class for deferred update
unsigned batchInlineBlobLimit;
static inline BurpGlobals* getSpecific()
{

View File

@ -2098,11 +2098,6 @@ void get_array(BurpGlobals* tdgbl, burp_rel* relation, UCHAR* record_buffer)
}
static const unsigned GBAK_BATCH_STEP = 1000;
static const unsigned GBAK_BATCH_BLOBSTEP = 1024; // keep slightly bigger than GBAK_BATCH_STEP
static unsigned batchBufferSize = 0;
void get_blob(BurpGlobals* tdgbl, Firebird::IBatch* batch, const burp_fld* fields, UCHAR* record_buffer)
{
/**************************************
@ -2168,48 +2163,30 @@ void get_blob(BurpGlobals* tdgbl, Firebird::IBatch* batch, const burp_fld* field
// msg 36 Can't find field for blob
}
// Choose blob creation method
FbLocalStatus status_vector;
// Choose blob creation method based on maximum possible blob size
const UCHAR blob_desc[] = {isc_bpb_version1, isc_bpb_type, 1, blob_type};
bool blobInline = false;
if (batch)
{
if (!batchBufferSize)
ULONG segHeaderOverhead = 0;
if (!(blob_type & isc_bpb_type_stream))
{
UCHAR item = Firebird::IBatch::INF_BUFFER_BYTES_SIZE;
UCHAR infoBuf[64];
batch->getInfo(&status_vector, 1, &item, sizeof infoBuf, infoBuf);
if (status_vector->hasData())
{
if (status_vector->getErrors()[1] == isc_interface_version_too_old) // FB4.0.0
batchBufferSize = 256 * 1024 * 1024;
else
BURP_error_redirect(&status_vector, 405);
}
if (!batchBufferSize)
{
Firebird::ClumpletReader rdr(Firebird::ClumpletReader::InfoResponse, infoBuf, sizeof infoBuf);
rdr.rewind();
if (rdr.isEof() || rdr.getClumpTag() != item)
BURP_error(405, true);
else
batchBufferSize = rdr.getInt();
}
fb_assert(batchBufferSize);
// Is it segmented blob? header itself segment alignment
segHeaderOverhead = Firebird::IBatch::BLOB_SEGHDR_ALIGN + (Firebird::IBatch::BLOB_SEGHDR_ALIGN - 1);
}
FB_UINT64 fullSize = (FB_UINT64(max_segment) + segHeaderOverhead) * segments;
if (FB_UINT64(segments) * max_segment < batchBufferSize / GBAK_BATCH_BLOBSTEP)
blobInline = true;
// Take into an account parameters size
fullSize += sizeof(blob_desc);
blobInline = fullSize < tdgbl->batchInlineBlobLimit;
}
// Create new blob
ISC_QUAD* blob_id = (ISC_QUAD*) ((UCHAR*) record_buffer + field->fld_offset);
const UCHAR blob_desc[] = {isc_bpb_version1, isc_bpb_type, 1, blob_type};
BlobBuffer local_buffer;
UCHAR* const buffer = local_buffer.getBuffer(max_segment);
bool first = true;
FbLocalStatus status_vector;
BlobWrapper blob(&status_vector);
if (!blobInline)
@ -3144,12 +3121,13 @@ rec_type get_data(BurpGlobals* tdgbl, burp_rel* relation, bool skip_relation)
RCRD_LENGTH length = offset;
// Create batch
unsigned batchStep = 1000;
Firebird::AutoDispose<Firebird::IXpbBuilder> pb(Firebird::UtilInterfacePtr()->
getXpbBuilder(&tdgbl->throwStatus, Firebird::IXpbBuilder::BATCH, NULL, 0));
pb->insertInt(&tdgbl->throwStatus, Firebird::IBatch::TAG_MULTIERROR, 1);
pb->insertInt(&tdgbl->throwStatus, Firebird::IBatch::TAG_BLOB_POLICY, Firebird::IBatch::BLOB_ID_ENGINE);
pb->insertInt(&tdgbl->throwStatus, Firebird::IBatch::TAG_DETAILED_ERRORS, GBAK_BATCH_STEP);
pb->insertInt(&tdgbl->throwStatus, Firebird::IBatch::TAG_DETAILED_ERRORS, batchStep);
pb->insertInt(&tdgbl->throwStatus, Firebird::IBatch::TAG_BUFFER_BYTES_SIZE, 0);
Firebird::RefPtr<Firebird::IBatch> batch(Firebird::REF_NO_INCR, DB->
@ -3165,6 +3143,60 @@ rec_type get_data(BurpGlobals* tdgbl, burp_rel* relation, bool skip_relation)
return get_data_old(tdgbl, relation);
}
// determine batch parameters
tdgbl->batchInlineBlobLimit = 0;
const UCHAR items[] =
{Firebird::IBatch::INF_BUFFER_BYTES_SIZE,
Firebird::IBatch::INF_BLOB_ALIGNMENT,
Firebird::IBatch::INF_BLOB_HEADER};
UCHAR infoBuf[64];
batch->getInfo(fbStatus, sizeof items, items, sizeof infoBuf, infoBuf);
if (fbStatus->hasData())
{
if (fbStatus->getErrors()[1] == isc_interface_version_too_old) // v4.0.0
tdgbl->batchInlineBlobLimit = 255 * 1024; // 1Kb reserve should be always enough
else
BURP_error_redirect(fbStatus, 405);
}
if (!tdgbl->batchInlineBlobLimit)
{
ULONG blAlign = 0, blHdr = 0, bufSize = 0;
Firebird::ClumpletReader rdr(Firebird::ClumpletReader::InfoResponse, infoBuf, sizeof infoBuf);
for (rdr.rewind(); !rdr.isEof(); rdr.moveNext())
{
if (rdr.getClumpTag() == isc_info_end)
break;
switch (rdr.getClumpTag())
{
case Firebird::IBatch::INF_BUFFER_BYTES_SIZE:
bufSize = rdr.getInt();
break;
case Firebird::IBatch::INF_BLOB_ALIGNMENT:
blAlign = rdr.getInt();
break;
case Firebird::IBatch::INF_BLOB_HEADER:
blHdr = rdr.getInt();
break;
}
}
if (!(bufSize && blAlign && blHdr))
BURP_error(405, true);
// correct batchStep if necessary
unsigned msgSize = meta->getAlignedLength(&tdgbl->throwStatus);
if (msgSize * batchStep > bufSize)
batchStep = bufSize / msgSize;
// determine maximum blob size for inline transfer
tdgbl->batchInlineBlobLimit = bufSize / batchStep;
// take into an account: blob alignment header size
tdgbl->batchInlineBlobLimit -= ((blAlign - 1) + blHdr);
}
fb_assert(tdgbl->batchInlineBlobLimit);
UCHAR* buffer = NULL;
Firebird::Array<UCHAR> requestBuffer;
@ -3281,7 +3313,7 @@ rec_type get_data(BurpGlobals* tdgbl, burp_rel* relation, bool skip_relation)
}
batch->add(&tdgbl->throwStatus, 1, sql);
if ((records % GBAK_BATCH_STEP != 0) && (record == rec_data))
if ((records % batchStep != 0) && (record == rec_data))
continue;
Firebird::AutoDispose<Firebird::IBatchCompletionState> cs(batch->execute(&tdgbl->throwStatus, gds_trans));

View File

@ -1029,6 +1029,9 @@ void DsqlBatch::info(thread_db* tdbb, unsigned int itemsLength, const unsigned c
case IBatch::INF_BLOB_ALIGNMENT:
out.insertInt(item, BLOB_STREAM_ALIGN);
break;
case IBatch::INF_BLOB_HEADER:
out.insertInt(item, SIZEOF_BLOB_HEAD);
break;
case isc_info_length:
flInfoLength = true;
break;

View File

@ -517,23 +517,28 @@ interface Batch : ReferenceCounted
{
const uchar VERSION1 = 1; // Tag for parameters block
// Tags for parameters
const uchar TAG_MULTIERROR = 1; // Can have >1 buffers with errors
const uchar TAG_RECORD_COUNTS = 2; // Per-record modified records accountung
const uchar TAG_BUFFER_BYTES_SIZE = 3; // Maximum possible buffer size
const uchar TAG_BLOB_POLICY = 4; // What policy is used to store blobs
const uchar TAG_DETAILED_ERRORS = 5; // How many vectors with detailed error info are stored
// Info items
const uchar INF_BUFFER_BYTES_SIZE = 10; // Maximum possible buffer size
const uchar INF_DATA_BYTES_SIZE = 11; // Already added messages size
const uchar INF_BLOBS_BYTES_SIZE = 12; // Already added blobs size
const uchar INF_BLOB_ALIGNMENT = 13; // Duplicate getBlobAlignment
const uchar INF_BLOB_HEADER = 14; // Blob header size
// How should batch work with blobs
const uchar BLOB_NONE = 0; // Blobs can't be used
const uchar BLOB_ID_ENGINE = 1; // Blobs are added one by one, IDs are generated by firebird
const uchar BLOB_ID_USER = 2; // Blobs are added one by one, IDs are generated by user
const uchar BLOB_STREAM = 3; // Blobs are added in a stream, IDs are generated by user
const uint BLOB_SEGHDR_ALIGN = 2; // Alignment of segment header in the stream
const uchar INF_BUFFER_BYTES_SIZE = 10; // Maximum possible buffer size
const uchar INF_DATA_BYTES_SIZE = 11; // Already added messages size
const uchar INF_BLOBS_BYTES_SIZE = 12; // Already added blobs size
const uchar INF_BLOB_ALIGNMENT = 13; // Duplicate getBlobAlignment
// Blob stream
const uint BLOB_SEGHDR_ALIGN = 2; // Alignment of segment header in the blob stream
void add(Status status, uint count, const void* inBuffer);
void addBlob(Status status, uint length, const void* inBuffer, ISC_QUAD* blobId, uint parLength, const uchar* par);

View File

@ -2021,15 +2021,16 @@ namespace Firebird
static const unsigned char TAG_BUFFER_BYTES_SIZE = 3;
static const unsigned char TAG_BLOB_POLICY = 4;
static const unsigned char TAG_DETAILED_ERRORS = 5;
static const unsigned char INF_BUFFER_BYTES_SIZE = 10;
static const unsigned char INF_DATA_BYTES_SIZE = 11;
static const unsigned char INF_BLOBS_BYTES_SIZE = 12;
static const unsigned char INF_BLOB_ALIGNMENT = 13;
static const unsigned char INF_BLOB_HEADER = 14;
static const unsigned char BLOB_NONE = 0;
static const unsigned char BLOB_ID_ENGINE = 1;
static const unsigned char BLOB_ID_USER = 2;
static const unsigned char BLOB_STREAM = 3;
static const unsigned BLOB_SEGHDR_ALIGN = 2;
static const unsigned char INF_BUFFER_BYTES_SIZE = 10;
static const unsigned char INF_DATA_BYTES_SIZE = 11;
static const unsigned char INF_BLOBS_BYTES_SIZE = 12;
static const unsigned char INF_BLOB_ALIGNMENT = 13;
template <typename StatusType> void add(StatusType* status, unsigned count, const void* inBuffer)
{

View File

@ -1560,15 +1560,16 @@ type
const TAG_BUFFER_BYTES_SIZE = Byte(3);
const TAG_BLOB_POLICY = Byte(4);
const TAG_DETAILED_ERRORS = Byte(5);
const INF_BUFFER_BYTES_SIZE = Byte(10);
const INF_DATA_BYTES_SIZE = Byte(11);
const INF_BLOBS_BYTES_SIZE = Byte(12);
const INF_BLOB_ALIGNMENT = Byte(13);
const INF_BLOB_HEADER = Byte(14);
const BLOB_NONE = Byte(0);
const BLOB_ID_ENGINE = Byte(1);
const BLOB_ID_USER = Byte(2);
const BLOB_STREAM = Byte(3);
const BLOB_SEGHDR_ALIGN = Cardinal(2);
const INF_BUFFER_BYTES_SIZE = Byte(10);
const INF_DATA_BYTES_SIZE = Byte(11);
const INF_BLOBS_BYTES_SIZE = Byte(12);
const INF_BLOB_ALIGNMENT = Byte(13);
procedure add(status: IStatus; count: Cardinal; inBuffer: Pointer);
procedure addBlob(status: IStatus; length: Cardinal; inBuffer: Pointer; blobId: ISC_QUADPtr; parLength: Cardinal; par: BytePtr);

View File

@ -553,7 +553,7 @@ private:
UCHAR blobPolicy;
bool segmented, defSegmented, batchActive;
ULONG messageCount, blobCount, serverSize;
ULONG messageCount, blobCount, serverSize, blobHeadSize;
public:
bool tmpStatement;
@ -2375,7 +2375,8 @@ Batch::Batch(Statement* s, IMessageMetadata* inFmt, unsigned parLength, const un
messageSize(0), alignedSize(0), blobBufferSize(0), messageBufferSize(0), flags(0),
stmt(s), format(inFmt), blobAlign(0), blobPolicy(BLOB_NONE),
segmented(false), defSegmented(false), batchActive(false),
messageCount(0), blobCount(0), serverSize(0), tmpStatement(false)
messageCount(0), blobCount(0), serverSize(0), blobHeadSize(0),
tmpStatement(false)
{
LocalStatus ls;
CheckStatusWrapper st(&ls);
@ -2762,8 +2763,8 @@ void Batch::setServerInfo()
}
// Perform info call to server
UCHAR items[] = {IBatch::INF_BLOB_ALIGNMENT, IBatch::INF_BUFFER_BYTES_SIZE};
UCHAR buffer[32];
UCHAR items[] = {IBatch::INF_BLOB_ALIGNMENT, IBatch::INF_BUFFER_BYTES_SIZE, IBatch::INF_BLOB_HEADER};
UCHAR buffer[64];
info(&s, rdb, op_info_batch, statement->rsr_id, 0,
sizeof(items), items, 0, 0, sizeof(buffer), buffer);
check(&s);
@ -2784,6 +2785,9 @@ void Batch::setServerInfo()
case IBatch::INF_BUFFER_BYTES_SIZE:
serverSize = out.getInt();
break;
case IBatch::INF_BLOB_HEADER:
blobHeadSize = out.getInt();
break;
case isc_info_error:
(Arg::Gds(isc_batch_align) << Arg::Gds(out.getInt())).raise();
case isc_info_truncated:
@ -2797,7 +2801,7 @@ void Batch::setServerInfo()
}
}
if (! (blobAlign && serverSize))
if (! (blobAlign && serverSize && blobHeadSize))
Arg::Gds(isc_batch_align).raise();
}
@ -3051,6 +3055,10 @@ void Batch::getInfo(CheckStatusWrapper* status, unsigned int itemsLength, const
setServerInfo();
out.insertInt(item, blobAlign);
break;
case IBatch::INF_BLOB_HEADER:
setServerInfo();
out.insertInt(item, blobHeadSize);
break;
default:
out.insertInt(isc_info_error, isc_infunk);
break;