8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 20:43:02 +01:00

Implemented #7051: Network support for bi-directional cursors

This commit is contained in:
Dmitry Yemanov 2021-11-26 09:35:31 +03:00
parent 5a5a2992f7
commit 8956b6ab62
6 changed files with 781 additions and 436 deletions

File diff suppressed because it is too large Load Diff

View File

@ -302,6 +302,8 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
}
#endif
const auto port = xdrs->x_public;
switch (p->p_operation)
{
case op_reject:
@ -659,11 +661,10 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
}
MAP(xdr_short, reinterpret_cast<SSHORT&>(sqldata->p_sqldata_out_message_number));
}
{ // scope
rem_port* port = xdrs->x_public;
if (port->port_protocol >= PROTOCOL_STMT_TOUT)
MAP(xdr_u_long, sqldata->p_sqldata_timeout);
}
if (port->port_protocol >= PROTOCOL_STMT_TOUT)
MAP(xdr_u_long, sqldata->p_sqldata_timeout);
if (port->port_protocol >= PROTOCOL_FETCH_SCROLL)
MAP(xdr_u_long, sqldata->p_sqldata_cursor_flags);
DEBUG_PRINTSIZE(xdrs, p->p_operation);
return P_TRUE(xdrs, p);
@ -702,6 +703,7 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
return P_TRUE(xdrs, p);
case op_fetch:
case op_fetch_scroll:
sqldata = &p->p_sqldata;
MAP(xdr_short, reinterpret_cast<SSHORT&>(sqldata->p_sqldata_statement));
if (!xdr_sql_blr(xdrs, (SLONG) sqldata->p_sqldata_statement,
@ -711,6 +713,15 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
}
MAP(xdr_short, reinterpret_cast<SSHORT&>(sqldata->p_sqldata_message_number));
MAP(xdr_short, reinterpret_cast<SSHORT&>(sqldata->p_sqldata_messages));
if (p->p_operation == op_fetch_scroll)
{
MAP(xdr_short, reinterpret_cast<SSHORT&>(sqldata->p_sqldata_fetch_op));
if (sqldata->p_sqldata_fetch_op == fetch_absolute ||
sqldata->p_sqldata_fetch_op == fetch_relative)
{
MAP(xdr_long, sqldata->p_sqldata_fetch_pos);
}
}
DEBUG_PRINTSIZE(xdrs, p->p_operation);
return P_TRUE(xdrs, p);
@ -820,7 +831,6 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
P_CRYPT_CALLBACK* cc = &p->p_cc;
MAP(xdr_cstring, cc->p_cc_data);
rem_port* port = xdrs->x_public;
// If the protocol is 0 we are in the process of establishing a connection.
// crypt_key_callback at this phaze means server protocol is at least P15
if (port->port_protocol >= PROTOCOL_VERSION14 || port->port_protocol == 0)
@ -856,7 +866,6 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
return P_TRUE(xdrs, p);
}
rem_port* port = xdrs->x_public;
SSHORT statement_id = b->p_batch_statement;
Rsr* statement;
if (statement_id >= 0)
@ -933,7 +942,6 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
if (xdrs->x_op == XDR_FREE)
return P_TRUE(xdrs, p);
rem_port* port = xdrs->x_public;
SSHORT statement_id = b->p_batch_statement;
DEB_RBATCH(fprintf(stderr, "BatRem: xdr CS %d\n", statement_id));
Rsr* statement;

View File

@ -94,9 +94,10 @@ const USHORT PROTOCOL_VERSION16 = (FB_PROTOCOL_FLAG | 16);
const USHORT PROTOCOL_STMT_TOUT = PROTOCOL_VERSION16;
// Protocol 17:
// - supports op_batch_sync, op_info_batch
// - supports op_batch_sync, op_info_batch, op_fetch_scroll
const USHORT PROTOCOL_VERSION17 = (FB_PROTOCOL_FLAG | 17);
const USHORT PROTOCOL_FETCH_SCROLL = PROTOCOL_VERSION17;
// Architecture types
@ -144,6 +145,18 @@ const int INVALID_OBJECT = MAX_USHORT;
const USHORT STMT_NO_BATCH = 2;
const USHORT STMT_DEFER_EXECUTE = 4;
enum P_FETCH
{
fetch_next = 0,
fetch_prior = 1,
fetch_first = 2,
fetch_last = 3,
fetch_absolute = 4,
fetch_relative = 5
};
const P_FETCH fetch_execute = fetch_next;
// Operation (packet) types
enum P_OP
@ -297,6 +310,8 @@ enum P_OP
op_batch_sync = 110,
op_info_batch = 111,
op_fetch_scroll = 112,
op_max
};
@ -605,6 +620,9 @@ typedef struct p_sqldata
USHORT p_sqldata_out_message_number;
ULONG p_sqldata_status; // final eof status
ULONG p_sqldata_timeout; // statement timeout
ULONG p_sqldata_cursor_flags; // cursor flags
P_FETCH p_sqldata_fetch_op; // Fetch operation
SLONG p_sqldata_fetch_pos; // Fetch position
} P_SQLDATA;
typedef struct p_sqlfree

View File

@ -497,6 +497,9 @@ struct Rsr : public Firebird::GlobalStorage, public TypedHandle<rem_type_rsr>
Firebird::BatchCompletionState* rsr_batch_cs; // client
};
P_FETCH rsr_fetch_operation; // Last performed fetch operation
SLONG rsr_fetch_position; // and position
struct BatchStream
{
BatchStream()
@ -526,17 +529,21 @@ struct Rsr : public Firebird::GlobalStorage, public TypedHandle<rem_type_rsr>
public:
// Values for rsr_flags.
enum {
enum : USHORT {
FETCHED = 1, // Cleared by execute, set by fetch
EOF_SET = 2, // End-of-stream encountered
//BLOB = 4, // Statement relates to blob op
NO_BATCH = 8, // Do not batch fetch rows
STREAM_ERR = 16, // There is an error pending in the batched rows
LAZY = 32, // To be allocated at the first reference
DEFER_EXECUTE = 64, // op_execute can be deferred
PAST_EOF = 128 // EOF was returned by fetch from this statement
NO_BATCH = 4, // Do not batch fetch rows
STREAM_ERR = 8, // There is an error pending in the batched rows
LAZY = 16, // To be allocated at the first reference
DEFER_EXECUTE = 32, // op_execute can be deferred
PAST_EOF = 64, // EOF was returned by fetch from this statement
BOF_SET = 128, // Beginning-of-stream
PAST_BOF = 256 // BOF was returned by fetch from this statement
};
static const auto STREAM_END = (BOF_SET | EOF_SET);
static const auto PAST_END = (PAST_BOF | PAST_EOF);
public:
Rsr() :
rsr_next(0), rsr_rdb(0), rsr_rtr(0), rsr_iface(NULL), rsr_cursor(NULL), rsr_batch(NULL),
@ -544,7 +551,8 @@ public:
rsr_format(0), rsr_message(0), rsr_buffer(0), rsr_status(0),
rsr_id(0), rsr_fmt_length(0),
rsr_rows_pending(0), rsr_msgs_waiting(0), rsr_reorder_level(0), rsr_batch_count(0),
rsr_cursor_name(getPool()), rsr_delayed_format(false), rsr_timeout(0), rsr_self(NULL)
rsr_cursor_name(getPool()), rsr_delayed_format(false), rsr_timeout(0), rsr_self(NULL),
rsr_fetch_operation(fetch_next), rsr_fetch_position(0)
{ }
~Rsr()
@ -1336,7 +1344,7 @@ public:
ISC_STATUS end_transaction(P_OP, P_RLSE*, PACKET*);
ISC_STATUS execute_immediate(P_OP, P_SQLST*, PACKET*);
ISC_STATUS execute_statement(P_OP, P_SQLDATA*, PACKET*);
ISC_STATUS fetch(P_SQLDATA*, PACKET*);
ISC_STATUS fetch(P_SQLDATA*, PACKET*, bool);
ISC_STATUS get_segment(P_SGMT*, PACKET*);
ISC_STATUS get_slice(P_SLC*, PACKET*);
void info(P_OP, P_INFO*, PACKET*);

View File

@ -3858,11 +3858,14 @@ ISC_STATUS rem_port::execute_statement(P_OP op, P_SQLDATA* sqldata, PACKET* send
if ((flags & IStatement::FLAG_HAS_CURSOR) && (out_msg_length == 0))
{
const auto cursorFlags = (port_protocol >= PROTOCOL_FETCH_SCROLL) ?
sqldata->p_sqldata_cursor_flags : 0;
statement->rsr_cursor =
statement->rsr_iface->openCursor(&status_vector, tra,
iMsgBuffer.metadata, iMsgBuffer.buffer,
(out_blr_length ? oMsgBuffer.metadata : DELAYED_OUT_FORMAT),
0);
cursorFlags);
if (!(status_vector.getState() & Firebird::IStatus::STATE_ERRORS))
{
transaction->rtr_cursors.add(statement);
@ -3909,7 +3912,7 @@ ISC_STATUS rem_port::execute_statement(P_OP op, P_SQLDATA* sqldata, PACKET* send
}
ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL, bool scroll)
{
/*****************************************
*
@ -3926,16 +3929,29 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
Rsr* statement;
getHandle(statement, sqldata->p_sqldata_statement);
// On first fetch, clear the end-of-stream flag & reset the message buffers
// The default (and legacy) scrolling option is FETCH NEXT
const auto operation = scroll ? sqldata->p_sqldata_fetch_op : fetch_next;
const auto position = scroll ? sqldata->p_sqldata_fetch_pos : 0;
// Whether we're fetching in the forward direction
const bool forward =
(operation == fetch_next || operation == fetch_last ||
((operation == fetch_absolute || operation == fetch_relative) && position > 0));
// Whether we're fetching relatively to the current position
const bool relative =
(operation == fetch_next || operation == fetch_prior || operation == fetch_relative);
if (!statement->rsr_flags.test(Rsr::FETCHED))
{
statement->rsr_flags.clear(Rsr::EOF_SET | Rsr::STREAM_ERR);
// On first fetch, clear the end-of-stream flag & reset the message buffers
statement->rsr_flags.clear(Rsr::STREAM_END | Rsr::STREAM_ERR);
statement->clearException();
RMessage* message = statement->rsr_message;
if (message != NULL)
if (message)
{
statement->rsr_buffer = message;
@ -3949,12 +3965,18 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
}
}
}
else if (!relative)
{
// Clear the end-of-stream flag if the fetch is positioned absolutely
statement->rsr_flags.clear(Rsr::STREAM_END);
}
const ULONG msg_length = statement->rsr_format ? statement->rsr_format->fmt_length : 0;
// If required, call setDelayedOutputFormat()
statement->checkCursor();
if (statement->rsr_delayed_format)
{
InternalMessageBuffer msgBuffer(sqldata->p_sqldata_blr.cstr_length,
@ -3969,10 +3991,15 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
statement->rsr_delayed_format = false;
}
// Get ready to ship the data out
// Setup the prefetch count. It's available only for FETCH NEXT and FETCH PRIOR
// operations, unless batching is disabled explicitly.
const USHORT max_records = statement->rsr_flags.test(Rsr::NO_BATCH) ?
1 : sqldata->p_sqldata_messages;
const bool prefetch = (operation == fetch_next || operation == fetch_prior) &&
!statement->rsr_flags.test(Rsr::NO_BATCH);
const USHORT max_records = prefetch ? sqldata->p_sqldata_messages : 1;
// Get ready to ship the data out
P_SQLDATA* response = &sendL->p_sqldata;
sendL->p_operation = op_fetch_response;
@ -3986,31 +4013,55 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
const FB_UINT64 org_packets = this->port_snd_packets;
USHORT count = 0;
bool rc = true;
bool success = true;
int rc = 0;
for (; count < max_records; count++)
{
// Have we exhausted the cache & reached cursor EOF?
if (statement->rsr_flags.test(Rsr::EOF_SET) && !statement->rsr_msgs_waiting)
{
statement->rsr_flags.clear(Rsr::EOF_SET);
rc = false;
break;
}
// If we have exhausted the cache...
// Have we exhausted the cache & have a pending error?
if (statement->rsr_flags.test(Rsr::STREAM_ERR) && !statement->rsr_msgs_waiting)
if (!statement->rsr_msgs_waiting)
{
fb_assert(statement->rsr_status);
statement->rsr_flags.clear(Rsr::STREAM_ERR);
return this->send_response(sendL, 0, 0, statement->rsr_status->value(), false);
// ... have we reached end of the cursor?
if (relative)
{
if (forward)
{
if (statement->rsr_flags.test(Rsr::EOF_SET))
{
statement->rsr_flags.clear(Rsr::EOF_SET);
success = false;
}
}
else
{
if (statement->rsr_flags.test(Rsr::BOF_SET))
{
statement->rsr_flags.clear(Rsr::BOF_SET);
success = false;
}
}
}
if (!success)
break;
// ... do we have a pending error?
if (statement->rsr_flags.test(Rsr::STREAM_ERR))
{
fb_assert(statement->rsr_status);
statement->rsr_flags.clear(Rsr::STREAM_ERR);
return this->send_response(sendL, 0, 0, statement->rsr_status->value(), false);
}
}
message = statement->rsr_buffer;
// Make sure message can be de referenced, if not then return false
// Make sure message can be dereferenced, if not then return false
if (message == NULL)
return FALSE;
return FB_FAILURE;
// If we don't have a message cached, get one from the access method.
@ -4018,15 +4069,46 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
{
fb_assert(statement->rsr_msgs_waiting == 0);
rc = statement->rsr_cursor->fetchNext(
&status_vector, message->msg_buffer) == IStatus::RESULT_OK;
switch (operation)
{
case fetch_next:
rc = statement->rsr_cursor->fetchNext(&status_vector, message->msg_buffer);
break;
case fetch_prior:
rc = statement->rsr_cursor->fetchPrior(&status_vector, message->msg_buffer);
break;
case fetch_first:
rc = statement->rsr_cursor->fetchFirst(&status_vector, message->msg_buffer);
break;
case fetch_last:
rc = statement->rsr_cursor->fetchLast(&status_vector, message->msg_buffer);
break;
case fetch_absolute:
rc = statement->rsr_cursor->fetchAbsolute(&status_vector, position,
message->msg_buffer);
break;
case fetch_relative:
rc = statement->rsr_cursor->fetchRelative(&status_vector, position,
message->msg_buffer);
break;
default:
fb_assert(false);
}
statement->rsr_flags.set(Rsr::FETCHED);
if (status_vector.getState() & Firebird::IStatus::STATE_ERRORS)
return this->send_response(sendL, 0, 0, &status_vector, false);
if (!rc)
success = (rc == IStatus::RESULT_OK);
if (!success)
break;
message->msg_address = message->msg_buffer;
@ -4040,8 +4122,7 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
// There's a buffer waiting -- send it
if (!this->send_partial(sendL))
return FALSE;
this->send_partial(sendL);
message->msg_address = NULL;
@ -4053,7 +4134,7 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
break;
}
response->p_sqldata_status = rc ? 0 : 100;
response->p_sqldata_status = success ? 0 : 100;
response->p_sqldata_messages = 0;
// hvlad: message->msg_address not used in xdr_protocol because of
@ -4079,7 +4160,7 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
while (message->msg_address && message->msg_next != statement->rsr_buffer)
message = message->msg_next;
USHORT prefetch_count = (rc && !statement->rsr_flags.test(Rsr::NO_BATCH)) ? count : 0;
USHORT prefetch_count = (success && prefetch) ? count : 0;
for (; prefetch_count; --prefetch_count)
{
@ -4098,8 +4179,21 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
next = message;
}
rc = statement->rsr_cursor->fetchNext(
&status_vector, message->msg_buffer) == IStatus::RESULT_OK;
// Only FETCH NEXT and FETCH PRIOR operations are available for prefetch
switch (operation)
{
case fetch_next:
rc = statement->rsr_cursor->fetchNext(&status_vector, message->msg_buffer);
break;
case fetch_prior:
rc = statement->rsr_cursor->fetchPrior(&status_vector, message->msg_buffer);
break;
default:
fb_assert(false);
}
if (status_vector.getState() & Firebird::IStatus::STATE_ERRORS)
{
@ -4109,11 +4203,18 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
statement->rsr_flags.set(Rsr::STREAM_ERR);
statement->saveException(&status_vector, true);
}
break;
}
if (!rc)
if (rc == IStatus::RESULT_NO_DATA)
{
statement->rsr_flags.set(Rsr::EOF_SET);
if (statement->rsr_cursor->isBof(&status_vector))
statement->rsr_flags.set(Rsr::BOF_SET);
if (statement->rsr_cursor->isEof(&status_vector))
statement->rsr_flags.set(Rsr::EOF_SET);
break;
}
@ -4122,7 +4223,7 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL)
statement->rsr_msgs_waiting++;
}
return TRUE;
return FB_SUCCESS;
}
@ -4967,7 +5068,8 @@ static bool process_packet(rem_port* port, PACKET* sendL, PACKET* receive, rem_p
break;
case op_fetch:
port->fetch(&receive->p_sqldata, sendL);
case op_fetch_scroll:
port->fetch(&receive->p_sqldata, sendL, op == op_fetch_scroll);
break;
case op_free_statement:

View File

@ -4149,7 +4149,7 @@ int YBlob::getSegment(CheckStatusWrapper* status, unsigned int bufferLength,
e.stuffException(status);
}
return 0;
return IStatus::RESULT_ERROR;
}
void YBlob::putSegment(CheckStatusWrapper* status, unsigned int length, const void* buffer)
@ -4750,7 +4750,7 @@ int YResultSet::fetchNext(CheckStatusWrapper* status, void* buffer)
e.stuffException(status);
}
return FB_FALSE;
return IStatus::RESULT_ERROR;
}
int YResultSet::fetchPrior(CheckStatusWrapper* status, void* buffer)
@ -4766,7 +4766,7 @@ int YResultSet::fetchPrior(CheckStatusWrapper* status, void* buffer)
e.stuffException(status);
}
return FB_FALSE;
return IStatus::RESULT_ERROR;
}
int YResultSet::fetchFirst(CheckStatusWrapper* status, void* buffer)
@ -4782,7 +4782,7 @@ int YResultSet::fetchFirst(CheckStatusWrapper* status, void* buffer)
e.stuffException(status);
}
return FB_FALSE;
return IStatus::RESULT_ERROR;
}
int YResultSet::fetchLast(CheckStatusWrapper* status, void* buffer)
@ -4798,7 +4798,7 @@ int YResultSet::fetchLast(CheckStatusWrapper* status, void* buffer)
e.stuffException(status);
}
return FB_FALSE;
return IStatus::RESULT_ERROR;
}
int YResultSet::fetchAbsolute(CheckStatusWrapper* status, int position, void* buffer)
@ -4814,7 +4814,7 @@ int YResultSet::fetchAbsolute(CheckStatusWrapper* status, int position, void* bu
e.stuffException(status);
}
return FB_FALSE;
return IStatus::RESULT_ERROR;
}
int YResultSet::fetchRelative(CheckStatusWrapper* status, int offset, void* buffer)
@ -4830,7 +4830,7 @@ int YResultSet::fetchRelative(CheckStatusWrapper* status, int offset, void* buff
e.stuffException(status);
}
return FB_FALSE;
return IStatus::RESULT_ERROR;
}
FB_BOOLEAN YResultSet::isEof(CheckStatusWrapper* status)