8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 20:03:02 +01:00

Implemented CORE-733: Compress Data over the Network

This commit is contained in:
alexpeshkoff 2014-11-25 13:24:10 +00:00
parent 686e9e1e58
commit e65a5e3b3b
11 changed files with 334 additions and 61 deletions

View File

@ -564,6 +564,17 @@
#
#WireCrypt = Enabled (for client) / Required (for server)
#
# Should connection over the wire be compressed?
# Client only value - server should follow client setting if connect using
# correct protocol (>=13).
#
# Per-connection configurable.
#
# Type: boolean
#
#WireCompression = false
#
# Seconds to wait on a silent client connection before the server sends
# dummy packets to request acknowledgment.

View File

@ -723,6 +723,7 @@ AC_CHECK_HEADERS(langinfo.h)
AC_CHECK_HEADERS(iconv.h)
AC_CHECK_HEADERS(libio.h)
AC_CHECK_HEADERS(linux/falloc.h)
AC_CHECK_HEADERS(zlib.h)
dnl check for ICU presence
AC_CHECK_HEADER(unicode/ucnv.h,,AC_MSG_ERROR(ICU support not found - please install development ICU package))
@ -739,6 +740,7 @@ AC_SUBST(MATHLIB)
dnl Check for libraries
AC_SEARCH_LIBS(dlopen, dl)
AC_CHECK_LIB(z, deflate)
AC_CHECK_LIB(m, main)
if test "$EDITLINE_FLG" = "Y"; then
AC_CHECK_LIB(curses, tgetent, TERMLIB=curses, \

View File

@ -198,7 +198,8 @@ const Config::ConfigEntry Config::entries[MAX_CONFIG_KEY] =
{TYPE_STRING, "WireCryptPlugin", (ConfigValue) "Arc4"},
{TYPE_STRING, "KeyHolderPlugin", (ConfigValue) ""},
{TYPE_BOOLEAN, "RemoteAccess", (ConfigValue) true},
{TYPE_BOOLEAN, "IPv6V6Only", (ConfigValue) false}
{TYPE_BOOLEAN, "IPv6V6Only", (ConfigValue) false},
{TYPE_BOOLEAN, "WireCompression", (ConfigValue) false}
};
/******************************************************************************
@ -750,3 +751,8 @@ bool Config::getRemoteAccess() const
{
return get<bool>(KEY_REMOTE_ACCESS);
}
bool Config::getWireCompression() const
{
return get<bool>(KEY_WIRE_COMPRESSION);
}

View File

@ -136,6 +136,7 @@ public:
KEY_PLUG_KEY_HOLDER,
KEY_REMOTE_ACCESS,
KEY_IPV6_V6ONLY,
KEY_WIRE_COMPRESSION,
MAX_CONFIG_KEY // keep it last
};
@ -333,6 +334,8 @@ public:
int getWireCrypt(WireCryptMode wcMode) const;
bool getRemoteAccess() const;
bool getWireCompression() const;
};
// Implementation of interface to access master configuration file

View File

@ -5952,6 +5952,7 @@ static THREAD_ENTRY_DECLARE event_thread(THREAD_ENTRY_PARAM arg)
P_OP operation = op_void;
{ // scope
RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION);
fb_assert(!port->port_compressed);
stuff = port->receive(&packet);
operation = packet.p_operation;
@ -6235,6 +6236,9 @@ static void authReceiveResponse(bool havePacket, ClntAuthBlock& cBlock, rem_port
HANDSHAKE_DEBUG(fprintf(stderr, "Cli: authReceiveResponse: cond_accept d=%d n=%d '%.*s' 0x%x\n",
d->cstr_length, n->cstr_length,
n->cstr_length, n->cstr_address, n->cstr_address ? n->cstr_address[0] : 0));
if (packet->p_acpd.p_acpt_type & pflag_compress)
port->initCompression();
packet->p_acpd.p_acpt_type &= ptype_MASK;
break;
case op_crypt:

View File

@ -218,9 +218,9 @@ static ULONG inet_debug_timer()
}
#endif // DEBUG
const SLONG MAX_DATA_LW = 1448; // Low Water mark
const SLONG MAX_DATA_HW = 32768; // High Water mark
const SLONG DEF_MAX_DATA = 8192;
const ULONG MAX_DATA_LW = 1448; // Low Water mark
const ULONG MAX_DATA_HW = 32768; // High Water mark
const ULONG DEF_MAX_DATA = 8192;
//const int MAXHOSTLEN = 64;
@ -281,6 +281,10 @@ public:
HandleState ok(const rem_port* port)
{
#ifdef WIRE_COMPRESS_SUPPORT
if (port->port_flags & PORT_z_data)
return SEL_READY;
#endif
SOCKET n = port->port_handle;
#if defined(WIN_NT)
return FD_ISSET(n, &slct_fdset) ? SEL_READY : SEL_NO_DATA;
@ -440,7 +444,7 @@ static void inet_gen_error(bool, rem_port*, const Arg::StatusVector& v);
static bool_t inet_getbytes(XDR*, SCHAR *, u_int);
static void inet_error(bool, rem_port*, const TEXT*, ISC_STATUS, int);
static bool_t inet_putbytes(XDR*, const SCHAR*, u_int);
static bool_t inet_read(XDR*);
static bool inet_read(XDR*);
static rem_port* inet_try_connect( PACKET*,
Rdb*,
const PathName&,
@ -448,7 +452,7 @@ static rem_port* inet_try_connect( PACKET*,
ClumpletReader&,
RefPtr<Config>*,
const PathName*);
static bool_t inet_write(XDR*); //, int);
static bool inet_write(XDR*);
static void INET_server_socket(rem_port* port, USHORT flag, const addrinfo* pai);
#ifdef DEBUG
@ -456,6 +460,7 @@ static void packet_print(const TEXT*, const UCHAR*, int, ULONG);
#endif
static bool packet_receive(rem_port*, UCHAR*, SSHORT, SSHORT*);
static bool packet_receive2(rem_port*, UCHAR*, SSHORT, SSHORT*);
static bool packet_send(rem_port*, const SCHAR*, SSHORT);
static rem_port* receive(rem_port*, PACKET *);
static rem_port* select_accept(rem_port*);
@ -506,7 +511,7 @@ static XDR::xdr_ops inet_ops =
SLONG INET_remote_buffer;
ULONG INET_remote_buffer;
static GlobalPtr<Mutex> init_mutex;
static volatile bool INET_initialized = false;
static volatile bool INET_shutting_down = false;
@ -585,6 +590,10 @@ rem_port* INET_analyze(ClntAuthBlock* cBlock,
user_id.insertBytes(CNCT_group, reinterpret_cast<UCHAR*>(&eff_gid), sizeof(eff_gid));
}
// Should compression be tried?
bool compression = config && (*config)->getWireCompression();
// Establish connection to server
// If we want user verification, we can't speak anything less than version 7
@ -605,10 +614,10 @@ rem_port* INET_analyze(ClntAuthBlock* cBlock,
for (size_t i = 0; i < cnct->p_cnct_count; i++) {
cnct->p_cnct_versions[i] = protocols_to_try[i];
if (compression && cnct->p_cnct_versions[i].p_cnct_version >= PROTOCOL_VERSION13)
cnct->p_cnct_versions[i].p_cnct_max_type |= pflag_compress;
}
// Try connection using first set of protocols
rem_port* port = inet_try_connect(packet, rdb, file_name, node_name, dpb, config, ref_db_name);
P_ACPT* accept = NULL;
@ -671,6 +680,9 @@ rem_port* INET_analyze(ClntAuthBlock* cBlock,
port->port_flags |= PORT_symmetric;
}
bool compress = accept->p_acpt_type & pflag_compress;
accept->p_acpt_type &= ptype_MASK;
if (accept->p_acpt_type != ptype_out_of_band) {
port->port_flags |= PORT_no_oob;
}
@ -679,6 +691,9 @@ rem_port* INET_analyze(ClntAuthBlock* cBlock,
port->port_flags |= PORT_lazy;
}
if (compress)
port->initCompression();
return port;
}
@ -1275,14 +1290,14 @@ static rem_port* alloc_port(rem_port* const parent, const USHORT flags)
port->port_request = aux_request;
port->port_buff_size = (USHORT) INET_remote_buffer;
port->port_async_receive = inet_async_receive;
port->port_flags = flags;
port->port_flags |= flags;
xdrinet_create( &port->port_send, port,
&port->port_buffer[INET_remote_buffer],
(USHORT) INET_remote_buffer,
XDR_ENCODE);
xdrinet_create(&port->port_send, port,
&port->port_buffer[REM_SEND_OFFSET(INET_remote_buffer)],
(USHORT) INET_remote_buffer, XDR_ENCODE);
xdrinet_create( &port->port_receive, port, port->port_buffer, 0, XDR_DECODE);
xdrinet_create(&port->port_receive, port,
&port->port_buffer[REM_RECV_OFFSET(INET_remote_buffer)], 0, XDR_DECODE);
if (parent && !(parent->port_server_flags & SRVR_thread_per_port))
{
@ -1894,7 +1909,7 @@ static bool select_multi(rem_port* main_port, UCHAR* buffer, SSHORT bufsize, SSH
}
else if (port = select_accept(main_port))
{
if (!packet_receive(port, buffer, bufsize, length))
if (!REMOTE_inflate(port, packet_receive, buffer, bufsize, length))
{
*length = 0;
}
@ -1914,7 +1929,7 @@ static bool select_multi(rem_port* main_port, UCHAR* buffer, SSHORT bufsize, SSH
return true;
}
if (!packet_receive(port, buffer, bufsize, length))
if (!REMOTE_inflate(port, packet_receive, buffer, bufsize, length))
{
if (port->port_flags & PORT_disconnect) {
continue;
@ -2191,7 +2206,7 @@ static int send_full( rem_port* port, PACKET * packet)
port->port_send.x_client = !(port->port_flags & PORT_server);
#endif
if (!xdr_protocol(&port->port_send, packet))
return FALSE;
return false;
#ifdef DEBUG
{ // scope
@ -2206,7 +2221,7 @@ static int send_full( rem_port* port, PACKET * packet)
} // end scope
#endif
return inet_write(&port->port_send /*, TRUE*/);
return REMOTE_deflate(&port->port_send, inet_write, packet_send, true);
}
static int send_partial( rem_port* port, PACKET * packet)
@ -2262,7 +2277,7 @@ static int xdrinet_create(XDR* xdrs, rem_port* port, UCHAR* buffer, USHORT lengt
xdrs->x_ops = (xdr_t::xdr_ops*) &inet_ops;
xdrs->x_op = x_op;
return TRUE;
return true;
}
#ifdef HAVE_SETITIMER
@ -2447,8 +2462,10 @@ static bool_t inet_putbytes( XDR* xdrs, const SCHAR* buff, u_int count)
xdrs->x_handy = 0;
}
if (!inet_write(xdrs /*, 0*/))
if (!REMOTE_deflate(xdrs, inet_write, packet_send, false))
{
return FALSE;
}
}
// Scalar values and bulk transfer remainder fall thru
@ -2468,7 +2485,7 @@ static bool_t inet_putbytes( XDR* xdrs, const SCHAR* buff, u_int count)
while (--bytecount >= 0)
{
if (xdrs->x_handy <= 0 && !inet_write(xdrs /*, 0*/))
if (xdrs->x_handy <= 0 && !REMOTE_deflate(xdrs, inet_write, packet_send, false))
return FALSE;
--xdrs->x_handy;
*xdrs->x_private++ = *buff++;
@ -2477,7 +2494,7 @@ static bool_t inet_putbytes( XDR* xdrs, const SCHAR* buff, u_int count)
return TRUE;
}
static bool_t inet_read( XDR* xdrs)
static bool inet_read( XDR* xdrs)
{
/**************************************
*
@ -2504,27 +2521,40 @@ static bool_t inet_read( XDR* xdrs)
p += xdrs->x_handy;
}
while (true)
{
SSHORT length = end - p;
if (!packet_receive(port, reinterpret_cast<UCHAR*>(p), length, &length))
{
return FALSE;
}
if (length >= 0)
{
p += length;
break;
}
p -= length;
if (!packet_send(port, 0, 0))
return FALSE;
}
SSHORT length = end - p;
port->port_flags &= ~PORT_z_data;
if (!REMOTE_inflate(port, packet_receive2, (UCHAR*)p, length, &length))
return false;
p += length;
xdrs->x_handy = (int) ((SCHAR *) p - xdrs->x_base);
xdrs->x_private = xdrs->x_base;
return TRUE;
return true;
}
static bool packet_receive2(rem_port* port, UCHAR* p, SSHORT bufSize, SSHORT* length)
{
*length = 0;
while (true)
{
SSHORT l = bufSize - *length;
if (!packet_receive(port, p + *length, l, &l))
return false;
if (l >= 0)
{
*length += l;
break;
}
*length -= l;
if (!packet_send(port, 0, 0))
return false;
}
return true;
}
static rem_port* inet_try_connect(PACKET* packet,
@ -2565,7 +2595,7 @@ static rem_port* inet_try_connect(PACKET* packet,
rem_port* port = NULL;
try
{
port = INET_connect(node_name, packet, FALSE, &dpb, config);
port = INET_connect(node_name, packet, false, &dpb, config);
}
catch (const Exception&)
{
@ -2587,7 +2617,7 @@ static rem_port* inet_try_connect(PACKET* packet,
return port;
}
static bool_t inet_write(XDR* xdrs)
static bool inet_write(XDR* xdrs)
{
/**************************************
*
@ -2603,7 +2633,7 @@ static bool_t inet_write(XDR* xdrs)
rem_port* port = (rem_port*) xdrs->x_public;
const char* p = xdrs->x_base;
SSHORT length = xdrs->x_private - p;
USHORT length = xdrs->x_private - p;
// Send data in manageable hunks. If a packet is partial, indicate
// that with a negative length. A positive length marks the end.
@ -2613,14 +2643,14 @@ static bool_t inet_write(XDR* xdrs)
const SSHORT l = (SSHORT) MIN(length, INET_remote_buffer);
length -= l;
if (!packet_send(port, p, (SSHORT) (length ? -l : l)))
return FALSE;
return false;
p += l;
}
xdrs->x_private = xdrs->x_base;
xdrs->x_handy = INET_remote_buffer;
return TRUE;
return true;
}

View File

@ -105,6 +105,10 @@ enum P_ARCH
const USHORT ptype_batch_send = 3; // Batch sends, no asynchrony
const USHORT ptype_out_of_band = 4; // Batch sends w/ out of band notification
const USHORT ptype_lazy_send = 5; // Deferred packets delivery
const USHORT ptype_MASK = 0xFF; // Mask - up to 255 types of protocol
//
// upper byte is used for protocol flags
const USHORT pflag_compress = 0x100; // Turn on compression if possible
// Generic object id

View File

@ -44,6 +44,9 @@ namespace Remote
struct rem_port;
struct rem_fmt;
struct Rdb;
typedef bool PacketReceive(rem_port*, UCHAR*, SSHORT, SSHORT*);
typedef bool PacketSend(rem_port*, const SCHAR*, SSHORT);
typedef bool ProtoWrite(XDR*);
void REMOTE_cleanup_transaction (struct Rtr *);
USHORT REMOTE_compute_batch_size (rem_port*, USHORT, P_OP, const rem_fmt*);
@ -63,6 +66,8 @@ Firebird::RefPtr<Config> REMOTE_get_config(const Firebird::PathName* dbName,
void REMOTE_parseList(Remote::ParsedList&, Firebird::PathName);
void REMOTE_makeList(Firebird::PathName& list, const Remote::ParsedList& parsed);
void REMOTE_check_response(Firebird::IStatus* warning, Rdb* rdb, PACKET* packet, bool checkKeys = false);
bool REMOTE_inflate(rem_port*, PacketReceive*, UCHAR*, SSHORT, SSHORT*);
bool REMOTE_deflate(XDR*, ProtoWrite*, PacketSend*, bool flash);
extern signed char wcCompatible[3][3];

View File

@ -994,7 +994,7 @@ void ClntAuthBlock::extractDataFromPluginTo(Firebird::ClumpletWriter& user_id)
addMutliPartConnectParameter(dataFromPlugin, user_id, CNCT_specific_data);
// Client's wirecrypt requested level
user_id.insertInt(CNCT_client_crypt, config->getWireCrypt(WC_CLIENT));
user_id.insertInt(CNCT_client_crypt, clntConfig->getWireCrypt(WC_CLIENT));
}
void ClntAuthBlock::resetClnt(const Firebird::PathName* fileName, const CSTRING* listStr)
@ -1020,8 +1020,8 @@ void ClntAuthBlock::resetClnt(const Firebird::PathName* fileName, const CSTRING*
dataFromPlugin.clear();
firstTime = true;
config = REMOTE_get_config(fileName, &dpbConfig);
pluginList = config->getPlugins(Firebird::IPluginManager::AuthClient);
clntConfig = REMOTE_get_config(fileName, &dpbConfig);
pluginList = clntConfig->getPlugins(Firebird::IPluginManager::AuthClient);
Firebird::PathName final;
if (serverPluginList.hasData())
@ -1064,7 +1064,7 @@ void ClntAuthBlock::resetClnt(const Firebird::PathName* fileName, const CSTRING*
Firebird::RefPtr<Config>* ClntAuthBlock::getConfig()
{
return config.hasData() ? &config : NULL;
return clntConfig.hasData() ? &clntConfig : NULL;
}
void ClntAuthBlock::storeDataForPlugin(unsigned int length, const unsigned char* data)
@ -1391,6 +1391,191 @@ void SrvAuthBlock::putKey(Firebird::IStatus* status, Firebird::FbCryptKey* crypt
}
bool REMOTE_inflate(rem_port* port, PacketReceive* packet_receive, UCHAR* buffer, SSHORT buffer_length, SSHORT* length)
{
#ifdef WIRE_COMPRESS_SUPPORT
if (!port->port_compressed)
return packet_receive(port, buffer, buffer_length, length);
z_stream& strm = port->port_recv_stream;
strm.avail_out = buffer_length;
strm.next_out = buffer;
for(;;)
{
if (strm.avail_in)
{
#ifdef COMPRESS_DEBUG
fprintf(stderr, "Data to inflate %d port %p\n", strm.avail_in, port);
#if COMPRESS_DEBUG>1
for (unsigned n = 0; n < strm.avail_in; ++n) fprintf(stderr, "%02x ", strm.next_in[n]);
fprintf(stderr, "\n");
#endif
#endif
if (inflate(&strm, Z_NO_FLUSH) != Z_OK)
{
#ifdef COMPRESS_DEBUG
fprintf(stderr, "Inflate error\n");
#endif
(void)inflateEnd(&strm);
port->port_flags &= ~PORT_z_data;
return false;
}
#ifdef COMPRESS_DEBUG
fprintf(stderr, "Inflated data %d\n", buffer_length - strm.avail_out);
#if COMPRESS_DEBUG>1
for (unsigned n = 0; n < buffer_length - strm.avail_out; ++n) fprintf(stderr, "%02x ", buffer[n]);
fprintf(stderr, "\n");
#endif
#endif
if (strm.next_out != buffer)
break;
if (port->port_flags & PORT_z_data) // Was called from select_multi() but nothing decompressed
{
port->port_flags &= ~PORT_z_data;
return false;
}
if (strm.next_in != &port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)])
{
memmove(&port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)], strm.next_in, strm.avail_in);
strm.next_in = &port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)];
}
}
else
strm.next_in = &port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)];
SSHORT l = (SSHORT) (port->port_buff_size - strm.avail_in);
if ((!packet_receive(port, strm.next_in, l, &l)) || (l <= 0)) // fixit - 2 ways to report errors in same routine
{
(void)inflateEnd(&strm);
port->port_flags &= ~PORT_z_data;
return false;
}
strm.avail_in += l;
}
*length = (SSHORT) (buffer_length - strm.avail_out);
if (strm.avail_in) // Z-buffer still has some data - probably can call inflate() once more on them
port->port_flags |= PORT_z_data;
else
port->port_flags &= ~PORT_z_data;
return true;
#else
return packet_receive(port, buffer, buffer_length, length);
#endif
}
bool REMOTE_deflate(XDR* xdrs, ProtoWrite* proto_write, PacketSend* packet_send, bool flash)
{
#ifdef WIRE_COMPRESS_SUPPORT
rem_port* port = (rem_port*) xdrs->x_public;
if (!port->port_compressed)
return proto_write(xdrs);
z_stream& strm = port->port_send_stream;
strm.avail_in = xdrs->x_private - xdrs->x_base;
strm.next_in = (Bytef*)xdrs->x_base;
if (!strm.next_out)
{
strm.avail_out = port->port_buff_size;
strm.next_out = (Bytef*)&port->port_compressed[REM_SEND_OFFSET(port->port_buff_size)];
}
bool expectMoreOut = flash;
while(strm.avail_in || expectMoreOut)
{
#ifdef COMPRESS_DEBUG
fprintf(stderr, "Data to deflate %d port %p\n", strm.avail_in, port);
#if COMPRESS_DEBUG>1
for (unsigned n = 0; n < strm.avail_in; ++n) fprintf(stderr, "%02x ", strm.next_in[n]);
fprintf(stderr, "\n");
#endif
#endif
int ret = deflate(&strm, flash ? Z_SYNC_FLUSH : Z_NO_FLUSH);
if (ret == Z_BUF_ERROR)
ret = 0;
if (ret != 0)
{
#ifdef COMPRESS_DEBUG
fprintf(stderr, "Deflate error %d\n", ret);
#endif
return false;
}
#ifdef COMPRESS_DEBUG
fprintf(stderr, "Deflated data %d\n", port->port_buff_size - strm.avail_out);
#if COMPRESS_DEBUG>1
for (unsigned n = 0; n < port->port_buff_size - strm.avail_out; ++n)
fprintf(stderr, "%02x ", port->port_compressed[REM_SEND_OFFSET(port->port_buff_size) + n]);
fprintf(stderr, "\n");
#endif
#endif
expectMoreOut = !strm.avail_out;
if ((port->port_buff_size != strm.avail_out) && (flash || !strm.avail_out))
{
if (!packet_send(port, (SCHAR*) &port->port_compressed[REM_SEND_OFFSET(port->port_buff_size)],
(SSHORT) (port->port_buff_size - strm.avail_out)))
{
return false;
}
strm.avail_out = port->port_buff_size;
strm.next_out = (Bytef*)&port->port_compressed[REM_SEND_OFFSET(port->port_buff_size)];
}
}
xdrs->x_private = xdrs->x_base;
xdrs->x_handy = port->port_buff_size;
return true;
#else
return proto_write(xdrs);
#endif
}
void rem_port::initCompression()
{
#ifdef WIRE_COMPRESS_SUPPORT
if (port_protocol >= PROTOCOL_VERSION13 && !port_compressed)
{
port_send_stream.zalloc = Z_NULL;
port_send_stream.zfree = Z_NULL;
port_send_stream.opaque = Z_NULL;
int ret = deflateInit(&port_send_stream, Z_DEFAULT_COMPRESSION);
if (ret != Z_OK)
(Firebird::Arg::Gds(isc_random) << "compression stream init error").raise(); // add error code
port_send_stream.next_out = NULL;
port_recv_stream.zalloc = Z_NULL;
port_recv_stream.zfree = Z_NULL;
port_recv_stream.opaque = Z_NULL;
port_recv_stream.avail_in = 0;
port_recv_stream.next_in = Z_NULL;
ret = inflateInit(&port_recv_stream);
if (ret != Z_OK)
(Firebird::Arg::Gds(isc_random) << "decompression stream init error").raise(); // add error code
port_compressed.reset(FB_NEW(getPool()) UCHAR[port_buff_size * 2]);
memset(port_compressed, 0, port_buff_size * 2);
port_recv_stream.next_in = &port_compressed[REM_RECV_OFFSET(port_buff_size)];
#ifdef COMPRESS_DEBUG
fprintf(stderr, "Completed init port %p\n", this);
#endif
}
#endif
}
signed char wcCompatible[3][3] = {
/* DISABLED ENABLED REQUIRED */
/* DISABLED */ {0, 0, -1},

View File

@ -55,6 +55,17 @@
#endif
#endif // !WIN_NT
#if defined(HAVE_ZLIB_H) && defined(HAVE_LIBZ)
#define WIRE_COMPRESS_SUPPORT 1
#endif
#ifdef WIRE_COMPRESS_SUPPORT
#include <zlib.h>
//#define COMPRESS_DEBUG 1
#endif // WIRE_COMPRESS_SUPPORT
#define REM_SEND_OFFSET(bs) (0)
#define REM_RECV_OFFSET(bs) (bs)
// Uncomment this line if you need to trace module activity
//#define REMOTE_DEBUG
@ -666,7 +677,7 @@ private:
Firebird::UCharBuffer dataForPlugin, dataFromPlugin;
Firebird::HalfStaticArray<InternalCryptKey*, 1> cryptKeys; // Wire crypt keys that came from plugin(s) last time
Firebird::string dpbConfig; // Used to recreate config with new filename
Firebird::RefPtr<Config> config; // Used to get plugins list and pass to port
Firebird::RefPtr<Config> clntConfig; // Used to get plugins list and pass to port
unsigned nextKey; // First key to be analyzed
bool hasCryptKey; // DPB contains disk crypt key, may be passed only over encrypted wire
@ -811,6 +822,7 @@ const USHORT PORT_server = 0x0080; // Server (not client) port
const USHORT PORT_detached = 0x0100; // op_detach, op_drop_database or op_service_detach was processed
const USHORT PORT_rdb_shutdown = 0x0200; // Database is shut down
const USHORT PORT_connecting = 0x0400; // Aux connection waits for a channel to be activated by client
const USHORT PORT_z_data = 0x0800; // Zlib incoming buffer has data left after decompression
// Port itself
@ -860,7 +872,7 @@ struct rem_port : public Firebird::GlobalStorage, public Firebird::RefCounted
struct srvr* port_server; // server of port
USHORT port_server_flags; // TRUE if server
USHORT port_protocol; // protocol version number
USHORT port_buff_size; // port buffer size (approx)
USHORT port_buff_size; // port buffer size
USHORT port_flags; // Misc flags
SLONG port_connect_timeout; // Connection timeout value
SLONG port_dummy_packet_interval; // keep alive dummy packet interval
@ -922,6 +934,11 @@ struct rem_port : public Firebird::GlobalStorage, public Firebird::RefCounted
FB_UINT64 port_snd_bytes;
FB_UINT64 port_rcv_bytes;
#ifdef WIRE_COMPRESS_SUPPORT
z_stream port_send_stream, port_recv_stream;
UCharArrayAutoPtr port_compressed;
#endif
public:
rem_port(rem_port_t t, size_t rpt) :
port_sync(FB_NEW(getPool()) Firebird::RefMutex()),
@ -931,7 +948,7 @@ public:
port_send_partial(0), port_connect(0), port_request(0), port_select_multi(0),
port_type(t), port_state(PENDING), port_clients(0), port_next(0),
port_parent(0), port_async(0), port_async_receive(0),
port_server(0), port_server_flags(0), port_protocol(0), port_buff_size(0),
port_server(0), port_server_flags(0), port_protocol(0), port_buff_size(rpt / 2),
port_flags(0), port_connect_timeout(0), port_dummy_packet_interval(0),
port_dummy_timeout(0), port_handle(INVALID_SOCKET), port_channel(INVALID_SOCKET), port_context(0),
port_events_thread(0), port_events_shutdown(0),
@ -964,10 +981,11 @@ public:
#endif
}
private: // this is refCounted object
~rem_port();
private:
~rem_port(); // this is refCounted object - private dtor is OK
public:
void initCompression();
void linkParent(rem_port* const parent);
void unlinkParent();
const Firebird::RefPtr<Config>& getPortConfig() const;

View File

@ -501,6 +501,8 @@ public:
}
authPort->send(send);
if (send->p_acpt.p_acpt_type & pflag_compress)
authPort->initCompression();
memset(&send->p_auth_cont, 0, sizeof send->p_auth_cont);
return false;
@ -719,6 +721,7 @@ public:
}
RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION);
fb_assert(!port->port_compressed);
PACKET packet;
packet.p_operation = op_event;
@ -1663,6 +1666,7 @@ static bool accept_connection(rem_port* port, P_CNCT* connect, PACKET* send)
P_ARCH architecture = arch_generic;
USHORT version = 0;
USHORT type = 0;
bool compress = false;
bool accepted = false;
USHORT weight = 0;
const p_cnct::p_cnct_repeat* protocol = connect->p_cnct_versions;
@ -1682,7 +1686,8 @@ static bool accept_connection(rem_port* port, P_CNCT* connect, PACKET* send)
weight = protocol->p_cnct_weight;
version = protocol->p_cnct_version;
architecture = protocol->p_cnct_architecture;
type = MIN(protocol->p_cnct_max_type, ptype_lazy_send);
type = MIN(protocol->p_cnct_max_type & ptype_MASK, ptype_lazy_send);
compress = protocol->p_cnct_max_type & pflag_compress;
}
}
@ -1691,12 +1696,12 @@ static bool accept_connection(rem_port* port, P_CNCT* connect, PACKET* send)
send->p_acpd.p_acpt_version = port->port_protocol = version;
send->p_acpd.p_acpt_architecture = architecture;
send->p_acpd.p_acpt_type = type;
send->p_acpd.p_acpt_type = type | (compress ? pflag_compress : 0);
send->p_acpd.p_acpt_authenticated = 0;
send->p_acpt.p_acpt_version = port->port_protocol = version;
send->p_acpt.p_acpt_architecture = architecture;
send->p_acpt.p_acpt_type = type;
send->p_acpt.p_acpt_type = type | (compress ? pflag_compress : 0);
// modify the version string to reflect the chosen protocol
string buffer;
@ -1864,6 +1869,8 @@ static bool accept_connection(rem_port* port, P_CNCT* connect, PACKET* send)
send->p_operation = returnData ? op_accept_data : op_accept;
port->send(send);
if (send->p_acpt.p_acpt_type & pflag_compress)
port->initCompression();
return true;
}
@ -1889,6 +1896,8 @@ void ConnectAuth::accept(PACKET* send, Auth::WriterImplementation*)
authPort->port_srv_auth_block->extractNewKeys(s);
send->p_acpd.p_acpt_authenticated = 1;
authPort->send(send);
if (send->p_acpt.p_acpt_type & pflag_compress)
authPort->initCompression();
}
}
@ -2144,8 +2153,6 @@ void DatabaseAuth::accept(PACKET* send, Auth::WriterImplementation* authBlock)
authBlock->store(pb, isc_dpb_auth_block);
send->p_operation = op_accept;
for (pb->rewind(); !pb->isEof();)
{
switch (pb->getClumpTag())
@ -5252,8 +5259,6 @@ ISC_STATUS rem_port::service_attach(const char* service_name,
* Connect to a Firebird service.
*
**************************************/
sendL->p_operation = op_accept;
// Now insert additional clumplets into spb
addClumplets(spb, spbParam, this);