8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 15:23:03 +01:00

Wire protocol improvement: prefetch blob info and some data when open blob (#8307)

* Group op_open_blob2, op_info_blob and op_get_segment into single physical packet.
It allows to prefetch and cache blob info and some blob data in single network roundtrip.
Return cached blob info, if present, without remote access.

* No need to reserve space for segment length.
Fixed typo.
Thanks to @dyemanov for comments.

* Add 'const' for parameter declaration, as @AlexPeshkoff suggested

* Use 'if constexpr' as @TreeHunter9 suggested.
Also, avoid hardcoded constants when possible.
This commit is contained in:
Vlad Khorsun 2024-11-06 18:20:54 +02:00
parent 79583d675b
commit e78b721256
4 changed files with 265 additions and 4 deletions

View File

@ -31,10 +31,13 @@
#include <cctype>
#include <string.h>
#include <type_traits>
#include "../common/classes/fb_string.h"
#include "../common/classes/array.h"
#include "iberror.h"
#include "firebird/Interface.h"
#include "memory_routines.h"
#ifdef SFIO
#include <stdio.h>
@ -271,6 +274,47 @@ namespace fb_utils
// Frequently used actions with clumplets
bool isBpbSegmented(unsigned parLength, const unsigned char* par);
// Put integer value into info buffer
template<typename T>
inline unsigned char* putInfoItemInt(const unsigned char item, T value,
unsigned char* ptr, const unsigned char* end)
{
static_assert(std::is_integral_v<T>, "Integral type expected");
constexpr auto len = sizeof(T);
if (ptr + len + 1 + 2 > end)
{
if (ptr < end)
{
*ptr++ = isc_info_truncated;
if (ptr < end)
*ptr++ = isc_info_end;
}
return nullptr;
}
*ptr++ = item;
*ptr++ = len;
*ptr++ = 0;
if constexpr (len == sizeof(SINT64))
put_vax_int64(ptr, value);
else if constexpr (len == sizeof(SLONG))
put_vax_long(ptr, value);
else if constexpr (len == sizeof(SSHORT))
put_vax_short(ptr, value);
else if constexpr (len == sizeof(char))
*ptr = value;
else
static_assert(always_false<T>::value, "unknown data type");
ptr += len;
return ptr;
}
// RAII to call fb_shutdown() in utilities
class FbShutdown
{

View File

@ -1303,6 +1303,10 @@ void Blob::getInfo(CheckStatusWrapper* status,
Rdb* rdb = blob->rbl_rdb;
CHECK_HANDLE(rdb, isc_bad_db_handle);
if (blob->rbl_info.getLocalInfo(itemsLength, items, bufferLength, buffer))
return;
rem_port* port = rdb->rdb_port;
RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION);
@ -5611,7 +5615,79 @@ IBlob* Attachment::openBlob(CheckStatusWrapper* status, ITransaction* apiTra, IS
// would try to write to the application's provided R/O buffer.
p_blob->p_blob_bpb.cstr_address = bpb;
send_and_receive(status, rdb, packet);
UCHAR infoBuffer[128];
if (port->port_flags & PORT_lazy)
{
send_partial_packet(port, packet);
// prefetch blob info
const UCHAR items[] = {
isc_info_blob_num_segments,
isc_info_blob_max_segment,
isc_info_blob_total_length,
isc_info_blob_type,
isc_info_end
};
packet->p_operation = op_info_blob;
P_INFO* information = &packet->p_info;
information->p_info_object = INVALID_OBJECT;
information->p_info_incarnation = 0;
information->p_info_items.cstr_length = sizeof(items);
information->p_info_items.cstr_address = items;
information->p_info_buffer_length = sizeof(infoBuffer);
send_partial_packet(port, packet);
// prefetch some data
packet->p_operation = op_get_segment;
P_SGMT* segment = &packet->p_sgmt;
segment->p_sgmt_length = BLOB_LENGTH;
segment->p_sgmt_blob = INVALID_OBJECT;
segment->p_sgmt_segment.cstr_length = 0;
send_packet(port, packet);
try
{
receive_response(status, rdb, packet);
}
catch (const Exception& ex)
{
// re-throw network error immediately, for other errors receive two more packets first
if (port->port_state != rem_port::PENDING)
throw;
FbLocalStatus local;
ex.stuffException(&local);
auto errs = local->getErrors();
if (fb_utils::containsErrorCode(errs, isc_network_error) ||
fb_utils::containsErrorCode(errs, isc_net_read_err) ||
port->port_state != rem_port::PENDING)
{
throw;
}
for (int i = 0; i < 2; i++)
{
try
{
UseStandardBuffer temp(packet->p_resp.p_resp_data);
receive_response(status, rdb, packet);
}
catch (const Exception&) {}
}
throw;
}
}
else
{
send_and_receive(status, rdb, packet);
}
// CVC: It's not evident to me why these two lines that I've copied
// here as comments are only found in create_blob calls.
@ -5627,9 +5703,46 @@ IBlob* Attachment::openBlob(CheckStatusWrapper* status, ITransaction* apiTra, IS
blob->rbl_next = transaction->rtr_blobs;
transaction->rtr_blobs = blob;
Firebird::IBlob* b = FB_NEW Blob(blob);
b->addRef();
return b;
Blob* iBlob = FB_NEW Blob(blob);
iBlob->addRef();
if (port->port_flags & PORT_lazy)
{
// Receive two more responses. Ignore errors here, let client to receive
// and handle it later, when/if it runs corresponding action by itself.
P_RESP* response = &packet->p_resp;
// receive blob info
try
{
UsePreallocatedBuffer temp(response->p_resp_data, sizeof(infoBuffer), infoBuffer);
receive_response(status, rdb, packet);
blob->rbl_info.parseInfo(sizeof(infoBuffer), infoBuffer);
}
catch (const Exception&)
{ }
// receive blob data
try
{
UsePreallocatedBuffer temp(response->p_resp_data, blob->rbl_buffer_length, blob->rbl_buffer);
receive_response(status, rdb, packet);
blob->rbl_length = (USHORT) response->p_resp_data.cstr_length;
blob->rbl_ptr = blob->rbl_buffer;
if (response->p_resp_object == 1)
blob->rbl_flags |= Rbl::SEGMENT;
else if (response->p_resp_object == 2)
blob->rbl_flags |= Rbl::EOF_PENDING;
}
catch (const Exception&)
{ }
}
return iBlob;
}
catch (const Exception& ex)
{

View File

@ -38,6 +38,7 @@
#include "../common/os/mod_loader.h"
#include "../jrd/license.h"
#include "../common/classes/ImplementHelper.h"
#include "../common/utils_proto.h"
#ifdef DEV_BUILD
Firebird::AtomicCounter rem_port::portCounter;
@ -855,6 +856,87 @@ ISC_STATUS* Rdb::get_status_vector() noexcept
}
*/
bool RBlobInfo::getLocalInfo(unsigned int itemsLength, const unsigned char* items,
unsigned int bufferLength, unsigned char* buffer)
{
if (!valid)
return false;
unsigned char* p = buffer;
const unsigned char* const end = buffer + bufferLength;
for (auto item = items; p && (item < items + itemsLength); item++)
{
switch (*item)
{
case isc_info_blob_num_segments:
p = fb_utils::putInfoItemInt(*item, num_segments, p, end);
break;
case isc_info_blob_max_segment:
p = fb_utils::putInfoItemInt(*item, max_segment, p, end);
break;
case isc_info_blob_total_length:
p = fb_utils::putInfoItemInt(*item, total_length, p, end);
break;
case isc_info_blob_type:
p = fb_utils::putInfoItemInt(*item, blob_type, p, end);
break;
case isc_info_end:
if (p < end)
*p++ = isc_info_end;
break;
default:
// unknown info item, let remote server handle it
return false;
}
}
return true;
}
void RBlobInfo::parseInfo(unsigned int bufferLength, const unsigned char* buffer)
{
int c = 0;
valid = false;
Firebird::ClumpletReader p(Firebird::ClumpletReader::InfoResponse, buffer, bufferLength);
for (; !p.isEof(); p.moveNext())
{
switch (p.getClumpTag())
{
case isc_info_blob_num_segments:
num_segments = p.getInt();
c++;
break;
case isc_info_blob_max_segment:
max_segment = p.getInt();
c++;
break;
case isc_info_blob_total_length:
total_length = p.getInt();
c++;
break;
case isc_info_blob_type:
blob_type = p.getInt();
c++;
break;
case isc_info_end:
break;
default:
fb_assert(false);
break;
}
}
valid = (c == 4);
}
void Rrq::saveStatus(const Firebird::Exception& ex) noexcept
{
if (rrqStatus.isSuccess())

View File

@ -221,6 +221,27 @@ public:
};
struct RBlobInfo
{
bool valid;
UCHAR blob_type;
ULONG num_segments;
ULONG max_segment;
ULONG total_length;
RBlobInfo()
{
memset(this, 0, sizeof(*this));
}
// parse into response into m_info, assume buffer contains all known info items
void parseInfo(unsigned int bufferLength, const unsigned char* buffer);
// returns false if there is no valid local info or if unknown item encountered
bool getLocalInfo(unsigned int itemsLength, const unsigned char* items,
unsigned int bufferLength, unsigned char* buffer);
};
struct Rbl : public Firebird::GlobalStorage, public TypedHandle<rem_type_rbl>
{
Firebird::HalfStaticArray<UCHAR, BLOB_LENGTH> rbl_data;
@ -239,6 +260,7 @@ struct Rbl : public Firebird::GlobalStorage, public TypedHandle<rem_type_rbl>
USHORT rbl_source_interp; // source interp (for writing)
USHORT rbl_target_interp; // destination interp (for reading)
Rbl** rbl_self;
RBlobInfo rbl_info;
public:
// Values for rbl_flags