8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 16:43:03 +01:00

Implemented Batch::cancel() over the wire

This commit is contained in:
AlexPeshkoff 2021-05-05 13:24:12 +03:00
parent a111138650
commit 054431fce7
6 changed files with 77 additions and 13 deletions

View File

@ -223,7 +223,7 @@ int main()
NULL, NULL, NULL, NULL);
//
printf("\nPart 1. Simple messages. Adding one by one or by groups of messages.\n");
printf("\nPart 1. Simple messages. Adding one by one or by groups of messages, cancel batch.\n");
//
// Message to store in a table
@ -251,7 +251,7 @@ int main()
// fill batch with data record by record
project1->id.set("BAT11");
project1->name.set("SNGL_REC");
project1->name.set("SNGL_REC1");
batch->add(&status, 1, project1.getData());
project1->id.set("BAT12");
@ -262,6 +262,17 @@ int main()
cs = batch->execute(&status, tra);
print_cs(status, cs, utl);
// add a big set of same records ...
for (int i = 0; i < 100000; ++i)
{
project1->id.set("BAT11");
project1->name.set("SNGL_REC");
batch->add(&status, 1, project1.getData());
}
// ... and cancel that records
batch->cancel(&status);
// fill batch with data using many records at once
stream = streamStart;
@ -285,11 +296,11 @@ int main()
project1->name.set("STRM_REC_D");
putMsg(stream, project1.getData(), mesLength, mesAlign);
project1->id.set("BAT16");
project1->id.set("BAT16"); // will not be processed due to return on single error
project1->name.set("STRM_REC_E");
putMsg(stream, project1.getData(), mesLength, mesAlign);
batch->add(&status, 1, streamStart);
batch->add(&status, 2, streamStart);
// execute it
cs = batch->execute(&status, tra);

View File

@ -2752,6 +2752,35 @@ void Batch::cancel(CheckStatusWrapper* status)
{
try
{
// Check and validate handles, etc.
if (!stmt)
{
Arg::Gds(isc_dsql_cursor_err).raise();
}
Rsr* statement = stmt->getStatement();
CHECK_HANDLE(statement, isc_bad_req_handle);
Rdb* rdb = statement->rsr_rdb;
rem_port* port = rdb->rdb_port;
RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION);
// Cleanup local data
if (blobPolicy != BLOB_NONE)
blobStream = blobStreamBuffer;
sizePointer = nullptr;
messageStream = 0;
batchActive = false;
// Prepare packet
PACKET* packet = &rdb->rdb_packet;
packet->p_operation = op_batch_cancel;
P_BATCH_FREE_CANCEL* batch = &packet->p_batch_free_cancel;
batch->p_batch_statement = statement->rsr_id;
send_and_receive(status, rdb, packet);
batchActive = false;
}
catch (const Exception& ex)
{
@ -2780,7 +2809,7 @@ void Batch::freeClientData(CheckStatusWrapper* status, bool force)
PACKET* packet = &rdb->rdb_packet;
packet->p_operation = op_batch_rls;
P_BATCH_FREE* batch = &packet->p_batch_free;
P_BATCH_FREE_CANCEL* batch = &packet->p_batch_free_cancel;
batch->p_batch_statement = statement->rsr_id;
if (rdb->rdb_port->port_flags & PORT_lazy)

View File

@ -1057,12 +1057,13 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
}
case op_batch_rls:
case op_batch_cancel:
{
P_BATCH_FREE* b = &p->p_batch_free;
P_BATCH_FREE_CANCEL* b = &p->p_batch_free_cancel;
MAP(xdr_short, reinterpret_cast<SSHORT&>(b->p_batch_statement));
if (xdrs->x_op != XDR_FREE)
DEB_RBATCH(fprintf(stderr, "BatRem: xdr release\n"));
DEB_RBATCH(fprintf(stderr, "BatRem: xdr release/cancel %d\n", p->p_operation));
return P_TRUE(xdrs, p);
}

View File

@ -288,6 +288,8 @@ enum P_OP
op_repl_data = 107,
op_repl_req = 108,
op_batch_cancel = 109,
op_max
};
@ -689,10 +691,10 @@ typedef struct p_batch_cs // completion state
ULONG p_batch_errors; // error's recnums
} P_BATCH_CS;
typedef struct p_batch_free
typedef struct p_batch_free_cancel
{
OBJCT p_batch_statement; // statement object
} P_BATCH_FREE;
} P_BATCH_FREE_CANCEL;
typedef struct p_batch_blob
{
@ -769,7 +771,7 @@ typedef struct packet
P_BATCH_CREATE p_batch_create; // Create batch interface
P_BATCH_MSG p_batch_msg; // Add messages to batch
P_BATCH_EXEC p_batch_exec; // Run batch
P_BATCH_FREE p_batch_free; // Destroy batch
P_BATCH_FREE_CANCEL p_batch_free_cancel; // Cancel or destroy batch
P_BATCH_CS p_batch_cs; // Batch completion state
P_BATCH_BLOB p_batch_blob; // BLOB stream portion in batch
P_BATCH_REGBLOB p_batch_regblob; // Register already existing BLOB in batch

View File

@ -1366,7 +1366,8 @@ public:
void batch_blob_stream(P_BATCH_BLOB*, PACKET*);
void batch_regblob(P_BATCH_REGBLOB*, PACKET*);
void batch_exec(P_BATCH_EXEC*, PACKET*);
void batch_rls(P_BATCH_FREE*, PACKET*);
void batch_rls(P_BATCH_FREE_CANCEL*, PACKET*);
void batch_cancel(P_BATCH_FREE_CANCEL*, PACKET*);
void batch_bpb(P_BATCH_SETBPB*, PACKET*);
void replicate(P_REPLICATE*, PACKET*);

View File

@ -3684,7 +3684,7 @@ void rem_port::batch_exec(P_BATCH_EXEC* batch, PACKET* sendL)
}
void rem_port::batch_rls(P_BATCH_FREE* batch, PACKET* sendL)
void rem_port::batch_rls(P_BATCH_FREE_CANCEL* batch, PACKET* sendL)
{
LocalStatus ls;
CheckStatusWrapper status_vector(&ls);
@ -3701,6 +3701,22 @@ void rem_port::batch_rls(P_BATCH_FREE* batch, PACKET* sendL)
}
void rem_port::batch_cancel(P_BATCH_FREE_CANCEL* batch, PACKET* sendL)
{
LocalStatus ls;
CheckStatusWrapper status_vector(&ls);
Rsr* statement;
getHandle(statement, batch->p_batch_statement);
statement->checkIface();
statement->checkBatch();
statement->rsr_batch->cancel(&status_vector);
this->send_response(sendL, 0, 0, &status_vector, false);
}
void rem_port::replicate(P_REPLICATE* repl, PACKET* sendL)
{
LocalStatus ls;
@ -4969,7 +4985,11 @@ static bool process_packet(rem_port* port, PACKET* sendL, PACKET* receive, rem_p
break;
case op_batch_rls:
port->batch_rls(&receive->p_batch_free, sendL);
port->batch_rls(&receive->p_batch_free_cancel, sendL);
break;
case op_batch_cancel:
port->batch_cancel(&receive->p_batch_free_cancel, sendL);
break;
case op_batch_blob_stream: