8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 22:43:03 +01:00

Fixed bug #7487 : Parallel restore could miss few records

This commit is contained in:
Vlad Khorsun 2023-02-27 16:17:23 +02:00
parent 4411f42229
commit 4e3a2a238d
2 changed files with 133 additions and 114 deletions

View File

@ -173,6 +173,7 @@ public:
m_batch = nullptr;
m_request = nullptr;
m_recs = 0;
m_batchRecs = 0;
m_resync = true;
}
@ -186,6 +187,7 @@ public:
void compile(BurpGlobals* tdgbl, Firebird::IAttachment* att);
void send(BurpGlobals* tdgbl, Firebird::ITransaction* tran, bool lastRec);
void execBatch(BurpGlobals * tdgbl);
void release();
ULONG getDataLength() const
@ -225,7 +227,8 @@ private:
Firebird::Array<UCHAR> m_batchMsg;
Firebird::IBatch* m_batch;
Firebird::IRequest* m_request;
int m_recs;
int m_recs; // total records sent
int m_batchRecs; // records in current batch
bool m_resync;
};

View File

@ -3229,120 +3229,124 @@ void get_data(BurpGlobals* tdgbl, burp_rel* relation, WriteRelationReq* req)
try
{
while (!task->isStopped())
try
{
if (get(tdgbl) != att_data_length)
BURP_error_redirect(NULL, 39); // msg 39 expected record length
RCRD_LENGTH len = get_int32(tdgbl);
if (!tdgbl->gbl_sw_transportable && len != length)
while (!task->isStopped())
{
if (get(tdgbl) != att_data_length)
BURP_error_redirect(NULL, 39); // msg 39 expected record length
RCRD_LENGTH len = get_int32(tdgbl);
if (!tdgbl->gbl_sw_transportable && len != length)
{
#ifdef sparc
if (!old_length)
old_length = recompute_length(tdgbl, relation);
if (!old_length)
old_length = recompute_length(tdgbl, relation);
#endif
if (len != old_length)
{
BURP_error(40, true, SafeArg() << length << len);
// msg 40 wrong length record, expected %ld encountered %ld
}
}
UCHAR* p;
if (tdgbl->gbl_sw_transportable)
{
if (get(tdgbl) != att_xdr_length)
{
BURP_error_redirect(NULL, 55);
// msg 55 Expected XDR record length
}
else
{
data.lstr_length = len = get_int32(tdgbl);
if (len > data.lstr_allocated)
if (len != old_length)
{
data.lstr_allocated = len;
if (data.lstr_address)
BURP_free (data.lstr_address);
data.lstr_address = BURP_alloc(data.lstr_allocated);
}
p = data.lstr_address;
}
}
else
p = buffer;
if (get(tdgbl) != att_data_data)
BURP_error_redirect(NULL, 41); // msg 41 expected data attribute
if (tdgbl->gbl_sw_compress)
decompress(tdgbl, p, len);
else
get_block(tdgbl, p, len);
if (old_length)
realign(tdgbl, buffer, relation);
if (tdgbl->gbl_sw_transportable)
{
if (batch)
{
buffer = sql;
CAN_encode_decode(relation, &data, buffer, false, true);
}
else
CAN_encode_decode(relation, &data, buffer, false);
}
burp_fld* field;
for (field = relation->rel_fields; field; field = field->fld_next)
{
if (!(field->fld_flags & FLD_computed))
{
if (field->fld_type == blr_blob || (field->fld_flags & FLD_array))
{
ISC_QUAD* blob_id = (ISC_QUAD*)(buffer + field->fld_offset);
blob_id->gds_quad_high = 0;
blob_id->gds_quad_low = 0;
BURP_error(40, true, SafeArg() << length << len);
// msg 40 wrong length record, expected %ld encountered %ld
}
}
}
get_record(&record, tdgbl);
while (record == rec_blob || record == rec_array)
{
if (record == rec_blob)
get_blob(tdgbl, batch, relation->rel_fields, buffer);
else if (record == rec_array)
get_array(tdgbl, relation, buffer);
get_record(&record, tdgbl);
}
UCHAR* p;
if (tdgbl->gbl_sw_transportable)
{
if (get(tdgbl) != att_xdr_length)
{
BURP_error_redirect(NULL, 55);
// msg 55 Expected XDR record length
}
else
{
data.lstr_length = len = get_int32(tdgbl);
if (len > data.lstr_allocated)
{
data.lstr_allocated = len;
if (data.lstr_address)
BURP_free(data.lstr_address);
data.lstr_address = BURP_alloc(data.lstr_allocated);
}
p = data.lstr_address;
}
}
else
p = buffer;
if (batch && !tdgbl->gbl_sw_transportable)
{
if (get(tdgbl) != att_data_data)
BURP_error_redirect(NULL, 41); // msg 41 expected data attribute
if (tdgbl->gbl_sw_compress)
decompress(tdgbl, p, len);
else
get_block(tdgbl, p, len);
if (old_length)
realign(tdgbl, buffer, relation);
if (tdgbl->gbl_sw_transportable)
{
if (batch)
{
buffer = sql;
CAN_encode_decode(relation, &data, buffer, false, true);
}
else
CAN_encode_decode(relation, &data, buffer, false);
}
burp_fld* field;
for (field = relation->rel_fields; field; field = field->fld_next)
{
if (field->fld_flags & FLD_computed)
continue;
// convert record to SQL format
memcpy(&sql[field->fld_sql], &buffer[field->fld_offset], field->fld_total_len);
memcpy(&sql[field->fld_null], &buffer[field->fld_missing_offset], sizeof(SSHORT));
if (!(field->fld_flags & FLD_computed))
{
if (field->fld_type == blr_blob || (field->fld_flags & FLD_array))
{
ISC_QUAD* blob_id = (ISC_QUAD*) (buffer + field->fld_offset);
blob_id->gds_quad_high = 0;
blob_id->gds_quad_low = 0;
}
}
}
}
req->send(tdgbl, gds_trans, record != rec_data);
get_record(&record, tdgbl);
while (record == rec_blob || record == rec_array)
{
if (record == rec_blob)
get_blob(tdgbl, batch, relation->rel_fields, buffer);
else if (record == rec_array)
get_array(tdgbl, relation, buffer);
get_record(&record, tdgbl);
}
task->verbRecs(records, false);
if (record != rec_data)
break;
} // while (!task->isStopped())
}
catch (RestoreRelationTask::ExcReadDone&)
{
// its OK to ignore ExcReadDone exception
if (batch && !tdgbl->gbl_sw_transportable)
{
for (field = relation->rel_fields; field; field = field->fld_next)
{
if (field->fld_flags & FLD_computed)
continue;
// convert record to SQL format
memcpy(&sql[field->fld_sql], &buffer[field->fld_offset], field->fld_total_len);
memcpy(&sql[field->fld_null], &buffer[field->fld_missing_offset], sizeof(SSHORT));
}
}
req->send(tdgbl, gds_trans, record != rec_data);
task->verbRecs(records, false);
if (record != rec_data)
break;
} // while (!task->isStopped())
}
catch (RestoreRelationTask::ExcReadDone&)
{
if (batch)
req->execBatch(tdgbl);
}
}
catch (const FbException& ex)
{
@ -12082,6 +12086,7 @@ void WriteRelationReq::reset(WriteRelationMeta* meta)
m_relation = NULL;
m_request = 0;
m_recs = 0;
m_batchRecs = 0;
m_resync = true;
}
}
@ -12091,6 +12096,7 @@ void WriteRelationReq::clear()
m_relation = NULL;
m_meta = NULL;
m_recs = 0;
m_batchRecs = 0;
m_resync = true;
if (m_batch)
@ -12131,22 +12137,11 @@ void WriteRelationReq::send(BurpGlobals* tdgbl, ITransaction* tran, bool lastRec
if (m_batch)
{
m_batch->add(&tdgbl->throwStatus, 1, m_batchMsg.begin());
m_batchRecs++;
if ((m_recs % m_meta->m_batchStep != 0) && !lastRec)
return;
AutoDispose<IBatchCompletionState> cs(m_batch->execute(&tdgbl->throwStatus, gds_trans));
if (tdgbl->throwStatus->getState() & IStatus::STATE_WARNINGS)
BURP_print_warning(&tdgbl->throwStatus);
for (unsigned pos = 0;
pos = cs->findError(&tdgbl->throwStatus, pos),
pos != IBatchCompletionState::NO_MORE_ERRORS;
++pos)
{
LocalStatus status_vector;
cs->getStatus(&tdgbl->throwStatus, &status_vector, pos);
check_data_error(tdgbl, &status_vector, m_relation);
}
execBatch(tdgbl);
}
else
{
@ -12166,6 +12161,27 @@ void WriteRelationReq::send(BurpGlobals* tdgbl, ITransaction* tran, bool lastRec
}
}
void WriteRelationReq::execBatch(BurpGlobals * tdgbl)
{
if (!m_batch || m_batchRecs == 0)
return;
AutoDispose<IBatchCompletionState> cs(m_batch->execute(&tdgbl->throwStatus, gds_trans));
m_batchRecs = 0;
if (tdgbl->throwStatus->getState() & IStatus::STATE_WARNINGS)
BURP_print_warning(&tdgbl->throwStatus);
for (unsigned pos = 0;
pos = cs->findError(&tdgbl->throwStatus, pos),
pos != IBatchCompletionState::NO_MORE_ERRORS;
++pos)
{
LocalStatus status_vector;
cs->getStatus(&tdgbl->throwStatus, &status_vector, pos);
check_data_error(tdgbl, &status_vector, m_relation);
}
}
void WriteRelationReq::release()
{
if (m_batch)