8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 20:03:02 +01:00

Fixed #6900: IBatch add method fails silently when a memory limit is exceeded

This commit is contained in:
AlexPeshkoff 2021-08-12 15:28:37 +03:00
parent 864c9f08ad
commit 3ff54b7d77
9 changed files with 409 additions and 291 deletions

File diff suppressed because it is too large Load Diff

View File

@ -324,6 +324,8 @@ int ResultSet::release()
class Batch FB_FINAL : public RefCntIface<IBatchImpl<Batch, CheckStatusWrapper> >
{
public:
static const ULONG DEFER_BATCH_LIMIT = 64;
Batch(Statement* s, IMessageMetadata* inFmt, unsigned parLength, const unsigned char* par);
// IResultSet implementation
@ -346,6 +348,14 @@ private:
void releaseStatement();
void setBlobAlignment();
void cleanup()
{
if (blobPolicy != BLOB_NONE)
blobStream = blobStreamBuffer;
sizePointer = nullptr;
messageStream = 0;
}
void genBlobId(ISC_QUAD* blobId)
{
if (++genId.gds_quad_low == 0)
@ -372,7 +382,7 @@ private:
if (step == messageBufferSize)
{
// direct packet sent
sendMessagePacket(step, ptr);
sendMessagePacket(step, ptr, false);
}
else
{
@ -381,7 +391,7 @@ private:
messageStream += step;
if (messageStream == messageBufferSize)
{
sendMessagePacket(messageBufferSize, messageStreamBuffer);
sendMessagePacket(messageBufferSize, messageStreamBuffer, false);
messageStream = 0;
}
}
@ -401,7 +411,7 @@ private:
ULONG space = blobBufferSize - (blobStream - blobStreamBuffer);
if (space < Rsr::BatchStream::SIZEOF_BLOB_HEAD)
{
sendBlobPacket(blobStream - blobStreamBuffer, blobStreamBuffer);
sendBlobPacket(blobStream - blobStreamBuffer, blobStreamBuffer, false);
blobStream = blobStreamBuffer;
}
}
@ -431,7 +441,7 @@ private:
if (step == blobBufferSize)
{
// direct packet sent
sendBlobPacket(blobBufferSize, ptr);
sendBlobPacket(blobBufferSize, ptr, false);
}
else
{
@ -440,7 +450,7 @@ private:
blobStream += step;
if (blobStream - blobStreamBuffer == blobBufferSize)
{
sendBlobPacket(blobBufferSize, blobStreamBuffer);
sendBlobPacket(blobBufferSize, blobStreamBuffer, false);
blobStream = blobStreamBuffer;
sizePointer = NULL;
}
@ -497,22 +507,23 @@ private:
ULONG size = blobStream - blobStreamBuffer;
if (size)
{
sendBlobPacket(size, blobStreamBuffer);
sendBlobPacket(size, blobStreamBuffer, messageStream == 0);
blobStream = blobStreamBuffer;
}
}
if (messageStream)
{
sendMessagePacket(messageStream, messageStreamBuffer);
sendMessagePacket(messageStream, messageStreamBuffer, true);
messageStream = 0;
}
batchActive = false;
}
void sendBlobPacket(unsigned size, const UCHAR* ptr);
void sendMessagePacket(unsigned size, const UCHAR* ptr);
void sendBlobPacket(unsigned size, const UCHAR* ptr, bool flash);
void sendMessagePacket(unsigned size, const UCHAR* ptr, bool flash);
void sendDeferredPacket(rem_port* port, PACKET* packet, bool flash);
Firebird::AutoPtr<UCHAR, Firebird::ArrayDelete> messageStreamBuffer, blobStreamBuffer;
ULONG messageStream;
@ -2385,7 +2396,7 @@ void Batch::add(CheckStatusWrapper* status, unsigned count, const void* inBuffer
}
void Batch::sendMessagePacket(unsigned count, const UCHAR* ptr)
void Batch::sendMessagePacket(unsigned count, const UCHAR* ptr, bool flash)
{
Rsr* statement = stmt->getStatement();
CHECK_HANDLE(statement, isc_bad_req_handle);
@ -2401,8 +2412,7 @@ void Batch::sendMessagePacket(unsigned count, const UCHAR* ptr)
batch->p_batch_data.cstr_address = const_cast<UCHAR*>(ptr);
statement->rsr_batch_size = alignedSize;
send_partial_packet(port, packet);
defer_packet(port, packet, true);
sendDeferredPacket(port, packet, flash);
}
@ -2529,7 +2539,7 @@ void Batch::addBlobStream(CheckStatusWrapper* status, unsigned length, const voi
}
void Batch::sendBlobPacket(unsigned size, const UCHAR* ptr)
void Batch::sendBlobPacket(unsigned size, const UCHAR* ptr, bool flash)
{
Rsr* statement = stmt->getStatement();
Rdb* rdb = statement->rsr_rdb;
@ -2545,8 +2555,31 @@ void Batch::sendBlobPacket(unsigned size, const UCHAR* ptr)
batch->p_batch_blob_data.cstr_address = const_cast<UCHAR*>(ptr);
batch->p_batch_blob_data.cstr_length = size;
sendDeferredPacket(port, packet, flash);
}
void Batch::sendDeferredPacket(rem_port* port, PACKET* packet, bool flash)
{
send_partial_packet(port, packet);
defer_packet(port, packet, true);
if ((port->port_protocol >= PROTOCOL_VERSION17) &&
((port->port_deferred_packets->getCount() >= DEFER_BATCH_LIMIT) || flash))
{
packet->p_operation = op_batch_sync;
send_packet(port, packet);
receive_packet(port, packet);
LocalStatus warning;
port->checkResponse(&warning, packet, false);
Rsr* statement = stmt->getStatement();
if (statement->haveException())
{
cleanup();
statement->raiseException();
}
}
}
@ -2735,7 +2768,12 @@ IBatchCompletionState* Batch::execute(CheckStatusWrapper* status, ITransaction*
statement->rsr_batch_cs = nullptr;
if (packet->p_operation == op_batch_cs)
{
// when working with 4.0.0 server we could not raise it in advance...
statement->clearException();
return cs.release();
}
REMOTE_check_response(status, rdb, packet);
}
@ -2765,10 +2803,7 @@ void Batch::cancel(CheckStatusWrapper* status)
RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION);
// Cleanup local data
if (blobPolicy != BLOB_NONE)
blobStream = blobStreamBuffer;
sizePointer = nullptr;
messageStream = 0;
cleanup();
batchActive = false;
// Prepare packet
@ -7951,8 +7986,6 @@ static void receive_packet_noqueue(rem_port* port, PACKET* packet)
// Receive responses for all deferred packets that were already sent
Rdb* rdb = port->port_context;
if (port->port_deferred_packets)
{
while (port->port_deferred_packets->getCount())
@ -7962,17 +7995,25 @@ static void receive_packet_noqueue(rem_port* port, PACKET* packet)
break;
OBJCT stmt_id = 0;
bool bCheckResponse = false, bFreeStmt = false;
bool bCheckResponse = false, bFreeStmt = false, bAssign = false;
if (p->packet.p_operation == op_execute)
switch (p->packet.p_operation)
{
case op_execute:
stmt_id = p->packet.p_sqldata.p_sqldata_statement;
bCheckResponse = true;
}
else if (p->packet.p_operation == op_free_statement)
{
bAssign = true;
break;
case op_batch_msg:
stmt_id = p->packet.p_batch_msg.p_batch_statement;
bCheckResponse = true;
break;
case op_free_statement:
stmt_id = p->packet.p_sqlfree.p_sqlfree_statement;
bFreeStmt = (p->packet.p_sqlfree.p_sqlfree_option == DSQL_drop);
break;
}
receive_packet_with_callback(port, &p->packet);
@ -7983,9 +8024,9 @@ static void receive_packet_noqueue(rem_port* port, PACKET* packet)
if (bCheckResponse)
{
bool bAssign = true;
try
{
Rdb* rdb = port->port_context;
LocalStatus ls;
CheckStatusWrapper status(&ls);
REMOTE_check_response(&status, rdb, &p->packet);

View File

@ -715,7 +715,8 @@ rem_port* INET_analyze(ClntAuthBlock* cBlock,
REMOTE_PROTOCOL(PROTOCOL_VERSION13, ptype_lazy_send, 4),
REMOTE_PROTOCOL(PROTOCOL_VERSION14, ptype_lazy_send, 5),
REMOTE_PROTOCOL(PROTOCOL_VERSION15, ptype_lazy_send, 6),
REMOTE_PROTOCOL(PROTOCOL_VERSION16, ptype_lazy_send, 7)
REMOTE_PROTOCOL(PROTOCOL_VERSION16, ptype_lazy_send, 7),
REMOTE_PROTOCOL(PROTOCOL_VERSION17, ptype_lazy_send, 8)
};
fb_assert(FB_NELEM(protocols_to_try) <= FB_NELEM(cnct->p_cnct_versions));
cnct->p_cnct_count = FB_NELEM(protocols_to_try);

View File

@ -173,7 +173,8 @@ rem_port* WNET_analyze(ClntAuthBlock* cBlock,
REMOTE_PROTOCOL(PROTOCOL_VERSION13, ptype_batch_send, 4),
REMOTE_PROTOCOL(PROTOCOL_VERSION14, ptype_batch_send, 5),
REMOTE_PROTOCOL(PROTOCOL_VERSION15, ptype_batch_send, 6),
REMOTE_PROTOCOL(PROTOCOL_VERSION16, ptype_batch_send, 7)
REMOTE_PROTOCOL(PROTOCOL_VERSION16, ptype_batch_send, 7),
REMOTE_PROTOCOL(PROTOCOL_VERSION17, ptype_batch_send, 8)
};
fb_assert(FB_NELEM(protocols_to_try) <= FB_NELEM(cnct->p_cnct_versions));
cnct->p_cnct_count = FB_NELEM(protocols_to_try);

View File

@ -305,7 +305,8 @@ rem_port* XNET_analyze(ClntAuthBlock* cBlock,
REMOTE_PROTOCOL(PROTOCOL_VERSION13, ptype_batch_send, 4),
REMOTE_PROTOCOL(PROTOCOL_VERSION14, ptype_batch_send, 5),
REMOTE_PROTOCOL(PROTOCOL_VERSION15, ptype_batch_send, 6),
REMOTE_PROTOCOL(PROTOCOL_VERSION16, ptype_batch_send, 7)
REMOTE_PROTOCOL(PROTOCOL_VERSION16, ptype_batch_send, 7),
REMOTE_PROTOCOL(PROTOCOL_VERSION17, ptype_batch_send, 8)
};
fb_assert(FB_NELEM(protocols_to_try) <= FB_NELEM(cnct->p_cnct_versions));
cnct->p_cnct_count = FB_NELEM(protocols_to_try);

View File

@ -1068,6 +1068,11 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
return P_TRUE(xdrs, p);
}
case op_batch_sync:
{
return P_TRUE(xdrs, p);
}
case op_batch_set_bpb:
{
P_BATCH_SETBPB* b = &p->p_batch_setbpb;

View File

@ -93,6 +93,11 @@ const USHORT PROTOCOL_VERSION15 = (FB_PROTOCOL_FLAG | 15);
const USHORT PROTOCOL_VERSION16 = (FB_PROTOCOL_FLAG | 16);
const USHORT PROTOCOL_STMT_TOUT = PROTOCOL_VERSION16;
// Protocol 17:
// - supports op_batch_sync
const USHORT PROTOCOL_VERSION17 = (FB_PROTOCOL_FLAG | 17);
// Architecture types
enum P_ARCH
@ -289,6 +294,7 @@ enum P_OP
op_repl_req = 108,
op_batch_cancel = 109,
op_batch_sync = 110,
op_max
};

View File

@ -1368,6 +1368,7 @@ public:
void batch_exec(P_BATCH_EXEC*, PACKET*);
void batch_rls(P_BATCH_FREE_CANCEL*, PACKET*);
void batch_cancel(P_BATCH_FREE_CANCEL*, PACKET*);
void batch_sync(PACKET*);
void batch_bpb(P_BATCH_SETBPB*, PACKET*);
void replicate(P_REPLICATE*, PACKET*);

View File

@ -1918,7 +1918,7 @@ static bool accept_connection(rem_port* port, P_CNCT* connect, PACKET* send)
{
if ((protocol->p_cnct_version == PROTOCOL_VERSION10 ||
(protocol->p_cnct_version >= PROTOCOL_VERSION11 &&
protocol->p_cnct_version <= PROTOCOL_VERSION16)) &&
protocol->p_cnct_version <= PROTOCOL_VERSION17)) &&
(protocol->p_cnct_architecture == arch_generic ||
protocol->p_cnct_architecture == ARCHITECTURE) &&
protocol->p_cnct_weight >= weight)
@ -3717,6 +3717,16 @@ void rem_port::batch_cancel(P_BATCH_FREE_CANCEL* batch, PACKET* sendL)
}
void rem_port::batch_sync(PACKET* sendL)
{
LocalStatus ls;
CheckStatusWrapper status_vector(&ls);
// no need checking protocol version if client is using batch_sync, just return synced response
this->send_response(sendL, 0, 0, &status_vector, false);
}
void rem_port::replicate(P_REPLICATE* repl, PACKET* sendL)
{
LocalStatus ls;
@ -4992,6 +5002,10 @@ static bool process_packet(rem_port* port, PACKET* sendL, PACKET* receive, rem_p
port->batch_cancel(&receive->p_batch_free_cancel, sendL);
break;
case op_batch_sync:
port->batch_sync(sendL);
break;
case op_batch_blob_stream:
port->batch_blob_stream(&receive->p_batch_blob, sendL);
break;