From 054431fce7650c0d58df3f9a4100ea98d215486e Mon Sep 17 00:00:00 2001 From: AlexPeshkoff Date: Wed, 5 May 2021 13:24:12 +0300 Subject: [PATCH] Implemented Batch::cancel() over the wire --- examples/interfaces/11.batch.cpp | 19 +++++++++++++++---- src/remote/client/interface.cpp | 31 ++++++++++++++++++++++++++++++- src/remote/protocol.cpp | 5 +++-- src/remote/protocol.h | 8 +++++--- src/remote/remote.h | 3 ++- src/remote/server/server.cpp | 24 ++++++++++++++++++++++-- 6 files changed, 77 insertions(+), 13 deletions(-) diff --git a/examples/interfaces/11.batch.cpp b/examples/interfaces/11.batch.cpp index 40c1d7d82f..d4925975e1 100644 --- a/examples/interfaces/11.batch.cpp +++ b/examples/interfaces/11.batch.cpp @@ -223,7 +223,7 @@ int main() NULL, NULL, NULL, NULL); // - printf("\nPart 1. Simple messages. Adding one by one or by groups of messages.\n"); + printf("\nPart 1. Simple messages. Adding one by one or by groups of messages, cancel batch.\n"); // // Message to store in a table @@ -251,7 +251,7 @@ int main() // fill batch with data record by record project1->id.set("BAT11"); - project1->name.set("SNGL_REC"); + project1->name.set("SNGL_REC1"); batch->add(&status, 1, project1.getData()); project1->id.set("BAT12"); @@ -262,6 +262,17 @@ int main() cs = batch->execute(&status, tra); print_cs(status, cs, utl); + // add a big set of same records ... + for (int i = 0; i < 100000; ++i) + { + project1->id.set("BAT11"); + project1->name.set("SNGL_REC"); + batch->add(&status, 1, project1.getData()); + } + + // ... and cancel that records + batch->cancel(&status); + // fill batch with data using many records at once stream = streamStart; @@ -285,11 +296,11 @@ int main() project1->name.set("STRM_REC_D"); putMsg(stream, project1.getData(), mesLength, mesAlign); - project1->id.set("BAT16"); + project1->id.set("BAT16"); // will not be processed due to return on single error project1->name.set("STRM_REC_E"); putMsg(stream, project1.getData(), mesLength, mesAlign); - batch->add(&status, 1, streamStart); + batch->add(&status, 2, streamStart); // execute it cs = batch->execute(&status, tra); diff --git a/src/remote/client/interface.cpp b/src/remote/client/interface.cpp index fded2212d7..fc5fda7952 100644 --- a/src/remote/client/interface.cpp +++ b/src/remote/client/interface.cpp @@ -2752,6 +2752,35 @@ void Batch::cancel(CheckStatusWrapper* status) { try { + // Check and validate handles, etc. + if (!stmt) + { + Arg::Gds(isc_dsql_cursor_err).raise(); + } + + Rsr* statement = stmt->getStatement(); + CHECK_HANDLE(statement, isc_bad_req_handle); + Rdb* rdb = statement->rsr_rdb; + rem_port* port = rdb->rdb_port; + RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION); + + // Cleanup local data + if (blobPolicy != BLOB_NONE) + blobStream = blobStreamBuffer; + sizePointer = nullptr; + messageStream = 0; + batchActive = false; + + // Prepare packet + PACKET* packet = &rdb->rdb_packet; + packet->p_operation = op_batch_cancel; + + P_BATCH_FREE_CANCEL* batch = &packet->p_batch_free_cancel; + batch->p_batch_statement = statement->rsr_id; + + send_and_receive(status, rdb, packet); + + batchActive = false; } catch (const Exception& ex) { @@ -2780,7 +2809,7 @@ void Batch::freeClientData(CheckStatusWrapper* status, bool force) PACKET* packet = &rdb->rdb_packet; packet->p_operation = op_batch_rls; - P_BATCH_FREE* batch = &packet->p_batch_free; + P_BATCH_FREE_CANCEL* batch = &packet->p_batch_free_cancel; batch->p_batch_statement = statement->rsr_id; if (rdb->rdb_port->port_flags & PORT_lazy) diff --git a/src/remote/protocol.cpp b/src/remote/protocol.cpp index 754f63b495..baacfb51c0 100644 --- a/src/remote/protocol.cpp +++ b/src/remote/protocol.cpp @@ -1057,12 +1057,13 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) } case op_batch_rls: + case op_batch_cancel: { - P_BATCH_FREE* b = &p->p_batch_free; + P_BATCH_FREE_CANCEL* b = &p->p_batch_free_cancel; MAP(xdr_short, reinterpret_cast(b->p_batch_statement)); if (xdrs->x_op != XDR_FREE) - DEB_RBATCH(fprintf(stderr, "BatRem: xdr release\n")); + DEB_RBATCH(fprintf(stderr, "BatRem: xdr release/cancel %d\n", p->p_operation)); return P_TRUE(xdrs, p); } diff --git a/src/remote/protocol.h b/src/remote/protocol.h index cd2b2987b1..9f7b56f052 100644 --- a/src/remote/protocol.h +++ b/src/remote/protocol.h @@ -288,6 +288,8 @@ enum P_OP op_repl_data = 107, op_repl_req = 108, + op_batch_cancel = 109, + op_max }; @@ -689,10 +691,10 @@ typedef struct p_batch_cs // completion state ULONG p_batch_errors; // error's recnums } P_BATCH_CS; -typedef struct p_batch_free +typedef struct p_batch_free_cancel { OBJCT p_batch_statement; // statement object -} P_BATCH_FREE; +} P_BATCH_FREE_CANCEL; typedef struct p_batch_blob { @@ -769,7 +771,7 @@ typedef struct packet P_BATCH_CREATE p_batch_create; // Create batch interface P_BATCH_MSG p_batch_msg; // Add messages to batch P_BATCH_EXEC p_batch_exec; // Run batch - P_BATCH_FREE p_batch_free; // Destroy batch + P_BATCH_FREE_CANCEL p_batch_free_cancel; // Cancel or destroy batch P_BATCH_CS p_batch_cs; // Batch completion state P_BATCH_BLOB p_batch_blob; // BLOB stream portion in batch P_BATCH_REGBLOB p_batch_regblob; // Register already existing BLOB in batch diff --git a/src/remote/remote.h b/src/remote/remote.h index 80aae17f88..1a6ce84f16 100644 --- a/src/remote/remote.h +++ b/src/remote/remote.h @@ -1366,7 +1366,8 @@ public: void batch_blob_stream(P_BATCH_BLOB*, PACKET*); void batch_regblob(P_BATCH_REGBLOB*, PACKET*); void batch_exec(P_BATCH_EXEC*, PACKET*); - void batch_rls(P_BATCH_FREE*, PACKET*); + void batch_rls(P_BATCH_FREE_CANCEL*, PACKET*); + void batch_cancel(P_BATCH_FREE_CANCEL*, PACKET*); void batch_bpb(P_BATCH_SETBPB*, PACKET*); void replicate(P_REPLICATE*, PACKET*); diff --git a/src/remote/server/server.cpp b/src/remote/server/server.cpp index 947c0e78a8..a4fcc826a2 100644 --- a/src/remote/server/server.cpp +++ b/src/remote/server/server.cpp @@ -3684,7 +3684,7 @@ void rem_port::batch_exec(P_BATCH_EXEC* batch, PACKET* sendL) } -void rem_port::batch_rls(P_BATCH_FREE* batch, PACKET* sendL) +void rem_port::batch_rls(P_BATCH_FREE_CANCEL* batch, PACKET* sendL) { LocalStatus ls; CheckStatusWrapper status_vector(&ls); @@ -3701,6 +3701,22 @@ void rem_port::batch_rls(P_BATCH_FREE* batch, PACKET* sendL) } +void rem_port::batch_cancel(P_BATCH_FREE_CANCEL* batch, PACKET* sendL) +{ + LocalStatus ls; + CheckStatusWrapper status_vector(&ls); + + Rsr* statement; + getHandle(statement, batch->p_batch_statement); + statement->checkIface(); + statement->checkBatch(); + + statement->rsr_batch->cancel(&status_vector); + + this->send_response(sendL, 0, 0, &status_vector, false); +} + + void rem_port::replicate(P_REPLICATE* repl, PACKET* sendL) { LocalStatus ls; @@ -4969,7 +4985,11 @@ static bool process_packet(rem_port* port, PACKET* sendL, PACKET* receive, rem_p break; case op_batch_rls: - port->batch_rls(&receive->p_batch_free, sendL); + port->batch_rls(&receive->p_batch_free_cancel, sendL); + break; + + case op_batch_cancel: + port->batch_cancel(&receive->p_batch_free_cancel, sendL); break; case op_batch_blob_stream: