/* * PROGRAM: JRD Remote Interface * MODULE: remote.cpp * DESCRIPTION: Common routines for remote interface/server * * The contents of this file are subject to the Interbase Public * License Version 1.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy * of the License at http://www.Inprise.com/IPL.html * * Software distributed under the License is distributed on an * "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express * or implied. See the License for the specific language governing * rights and limitations under the License. * * The Original Code was created by Inprise Corporation * and its predecessors. Portions created by Inprise Corporation are * Copyright (C) Inprise Corporation. * * All Rights Reserved. * Contributor(s): ______________________________________. */ #include "firebird.h" #include #include #include "../jrd/ibase.h" #include "../remote/remote.h" #include "../jrd/file_params.h" #include "../jrd/gdsassert.h" #include "../remote/proto_proto.h" #include "../remote/remot_proto.h" #include "../remote/xdr_proto.h" #include "../jrd/gds_proto.h" #include "../jrd/thread_proto.h" #include "../common/config/config.h" #include "../common/classes/init.h" #ifdef DEV_BUILD Firebird::AtomicCounter rem_port::portCounter; #endif #ifdef REMOTE_DEBUG IMPLEMENT_TRACE_ROUTINE(remote_trace, "REMOTE") #endif const SLONG DUMMY_INTERVAL = 60; // seconds const int ATTACH_FAILURE_SPACE = 16 * 1024; // bytes static Firebird::StringsBuffer* attachFailures = NULL; static Firebird::GlobalPtr attachFailuresMutex; static void cleanup_memory(void*); void REMOTE_cleanup_transaction( Rtr* transaction) { /************************************** * * R E M O T E _ c l e a n u p _ t r a n s a c t i o n * ************************************** * * Functional description * A transaction is being committed or rolled back. * Purge any active messages in case the user calls * receive while we still have something cached. * **************************************/ for (Rrq* request = transaction->rtr_rdb->rdb_requests; request; request = request->rrq_next) { if (request->rrq_rtr == transaction) { REMOTE_reset_request(request, 0); request->rrq_rtr = NULL; } for (Rrq* level = request->rrq_levels; level; level = level->rrq_next) if (level->rrq_rtr == transaction) { REMOTE_reset_request(level, 0); level->rrq_rtr = NULL; } } for (Rsr* statement = transaction->rtr_rdb->rdb_sql_requests; statement; statement = statement->rsr_next) { if (statement->rsr_rtr == transaction) { REMOTE_reset_statement(statement); statement->rsr_flags.clear(Rsr::FETCHED); statement->rsr_rtr = NULL; } } } ULONG REMOTE_compute_batch_size(rem_port* port, USHORT buffer_used, P_OP op_code, const rem_fmt* format) { /************************************** * * R E M O T E _ c o m p u t e _ b a t c h _ s i z e * ************************************** * * Functional description * * When batches of records are returned, they are returned as * follows: * * * ... * * * * end-of-batch is indicated by setting p_sqldata_messages to * 0 in the op_fetch_response. End of cursor is indicated * by setting p_sqldata_status to a non-zero value. Note * that a fetch CAN be attempted after end of cursor, this * is sent to the server for the server to return the appropriate * error code. * * Each data block has one overhead packet * to indicate the data is present. * * (See also op_send in receive_msg() - which is a kissing cousin * to this routine) * * Here we make a guess for the optimal number of records to * send in each batch. This is important as we wait for the * whole batch to be received before we return the first item * to the client program. How many are cached on the client also * impacts client-side memory utilization. * * We optimize the number by how many can fit into a packet. * The client calculates this number (n from the list above) * and sends it to the server. * * I asked why it is that the client doesn't just ask for a packet * full of records and let the server return however many fits in * a packet. According to Sudesh, this is because of a bug in * Superserver which showed up in the WIN_NT 4.2.x kits. So I * imagine once we up the protocol so that we can be sure we're not * talking to a 4.2 kit, then we can make this optimization. * - Deej 2/28/97 * * Note: A future optimization can look at setting the packet * size to optimize the transfer. * * Note: This calculation must use worst-case to determine the * packing. Should the data record have VARCHAR data, it is * often possible to fit more than the packing specification * into each packet. This is also a candidate for future * optimization. * * The data size is either the XDR data representation, or the * actual message size (rounded up) if this is a symmetric * architecture connection. * **************************************/ const USHORT MAX_PACKETS_PER_BATCH = 4; /* packets - picked by SWAG */ const USHORT MIN_PACKETS_PER_BATCH = 2; /* packets - picked by SWAG */ const USHORT DESIRED_ROWS_PER_BATCH = 20; /* data rows - picked by SWAG */ const USHORT MIN_ROWS_PER_BATCH = 10; /* data rows - picked by SWAG */ const USHORT op_overhead = (USHORT) xdr_protocol_overhead(op_code); #ifdef DEBUG fprintf(stderr, "port_buff_size = %d fmt_net_length = %d fmt_length = %d overhead = %d\n", port->port_buff_size, format->fmt_net_length, format->fmt_length, op_overhead); #endif ULONG row_size; if (port->port_flags & PORT_symmetric) { /* Same architecture connection */ row_size = (ROUNDUP(format->fmt_length, 4) + op_overhead); } else { /* Using XDR for data transfer */ row_size = (ROUNDUP(format->fmt_net_length, 4) + op_overhead); } USHORT num_packets = (USHORT) (((DESIRED_ROWS_PER_BATCH * row_size) /* data set */ + buffer_used /* used in 1st pkt */ + (port->port_buff_size - 1)) /* to round up */ / port->port_buff_size); if (num_packets > MAX_PACKETS_PER_BATCH) { num_packets = (USHORT) (((MIN_ROWS_PER_BATCH * row_size) /* data set */ + buffer_used /* used in 1st pkt */ + (port->port_buff_size - 1)) /* to round up */ / port->port_buff_size); } num_packets = MAX(num_packets, MIN_PACKETS_PER_BATCH); /* Now that we've picked the number of packets in a batch, pack as many rows as we can into the set of packets */ ULONG result = (num_packets * port->port_buff_size - buffer_used) / row_size; /* Must always send some messages, even if message size is more than packet size. */ result = MAX(result, MIN_ROWS_PER_BATCH); #ifdef DEBUG { // CVC: I don't see the point in replacing this with fb_utils::readenv(). const char* p = getenv("DEBUG_BATCH_SIZE"); if (p) result = atoi(p); fprintf(stderr, "row_size = %lu num_packets = %d\n", row_size, num_packets); fprintf(stderr, "result = %lu\n", result); } #endif return result; } Rrq* REMOTE_find_request(Rrq* request, USHORT level) { /************************************** * * R E M O T E _ f i n d _ r e q u e s t * ************************************** * * Functional description * Find sub-request if level is non-zero. * **************************************/ /* See if we already know about the request level */ for (;;) { if (request->rrq_level == level) return request; if (!request->rrq_levels) break; request = request->rrq_levels; } /* This is a new level -- make up a new request block. */ request->rrq_levels = request->clone(); /* FREE: REMOTE_remove_request() */ #ifdef DEBUG_REMOTE_MEMORY printf("REMOTE_find_request allocate request %x\n", request->rrq_levels); #endif request = request->rrq_levels; request->rrq_level = level; request->rrq_levels = NULL; /* Allocate message block for known messages */ Rrq::rrq_repeat* tail = request->rrq_rpt.begin(); const Rrq::rrq_repeat* const end = tail + request->rrq_max_msg; for (; tail <= end; tail++) { const rem_fmt* format = tail->rrq_format; if (!format) continue; RMessage* msg = new RMessage(format->fmt_length); tail->rrq_xdr = msg; #ifdef DEBUG_REMOTE_MEMORY printf("REMOTE_find_request allocate message %x\n", msg); #endif msg->msg_next = msg; #ifdef SCROLLABLE_CURSORS msg->msg_prior = msg; #endif msg->msg_number = tail->rrq_message->msg_number; tail->rrq_message = msg; } return request; } void REMOTE_free_packet( rem_port* port, PACKET * packet, bool partial) { /************************************** * * R E M O T E _ f r e e _ p a c k e t * ************************************** * * Functional description * Zero out a full packet block (partial == false) or * part of packet used in last operation (partial == true) **************************************/ XDR xdr; USHORT n; if (packet) { xdrmem_create(&xdr, reinterpret_cast(packet), sizeof(PACKET), XDR_FREE); xdr.x_public = (caddr_t) port; if (partial) { xdr_protocol(&xdr, packet); } else { for (n = (USHORT) op_connect; n < (USHORT) op_max; n++) { packet->p_operation = (P_OP) n; xdr_protocol(&xdr, packet); } } #ifdef DEBUG_XDR_MEMORY // All packet memory allocations should now be voided. // note: this code will may work properly if partial == true for (n = 0; n < P_MALLOC_SIZE; n++) fb_assert(packet->p_malloc[n].p_operation == op_void); #endif packet->p_operation = op_void; } } void REMOTE_get_timeout_params(rem_port* port, Firebird::ClumpletReader* pb) { /************************************** * * R E M O T E _ g e t _ t i m e o u t _ p a r a m s * ************************************** * * Functional description * Determine the connection timeout parameter values for this newly created * port. If the client did a specification in the DPB, use those values. * Otherwise, see if there is anything in the configuration file. The * configuration file management code will set the default values if there * is no other specification. * **************************************/ //bool got_dpb_connect_timeout = false; fb_assert(isc_dpb_connect_timeout == isc_spb_connect_timeout); port->port_connect_timeout = pb && pb->find(isc_dpb_connect_timeout) ? pb->getInt() : Config::getConnectionTimeout(); port->port_flags |= PORT_dummy_pckt_set; port->port_dummy_packet_interval = Config::getDummyPacketInterval(); if (port->port_dummy_packet_interval < 0) port->port_dummy_packet_interval = DUMMY_INTERVAL; port->port_dummy_timeout = port->port_dummy_packet_interval; #ifdef DEBUG printf("REMOTE_get_timeout dummy = %lu conn = %lu\n", port->port_dummy_packet_interval, port->port_connect_timeout); fflush(stdout); #endif } rem_str* REMOTE_make_string(const SCHAR* input) { /************************************** * * R E M O T E _ m a k e _ s t r i n g * ************************************** * * Functional description * Copy a given string to a permanent location, returning * address of new string. * **************************************/ const USHORT length = strlen(input); rem_str* string = FB_NEW_RPT(*getDefaultMemoryPool(), length) rem_str; #ifdef DEBUG_REMOTE_MEMORY printf("REMOTE_make_string allocate string %x\n", string); #endif strcpy(string->str_data, input); string->str_length = length; return string; } void REMOTE_release_messages( RMessage* messages) { /************************************** * * R E M O T E _ r e l e a s e _ m e s s a g e s * ************************************** * * Functional description * Release a circular list of messages. * **************************************/ RMessage* message = messages; if (message) { while (true) { RMessage* temp = message; message = message->msg_next; #ifdef DEBUG_REMOTE_MEMORY printf("REMOTE_release_messages free message %x\n", temp); #endif delete temp; if (message == messages) break; } } } void REMOTE_release_request( Rrq* request) { /************************************** * * R E M O T E _ r e l e a s e _ r e q u e s t * ************************************** * * Functional description * Release a request block and friends. * **************************************/ Rdb* rdb = request->rrq_rdb; for (Rrq** p = &rdb->rdb_requests; *p; p = &(*p)->rrq_next) { if (*p == request) { *p = request->rrq_next; break; } } /* Get rid of request and all levels */ for (;;) { Rrq::rrq_repeat* tail = request->rrq_rpt.begin(); const Rrq::rrq_repeat* const end = tail + request->rrq_max_msg; for (; tail <= end; tail++) { RMessage* message = tail->rrq_message; if (message) { if (!request->rrq_level) { #ifdef DEBUG_REMOTE_MEMORY printf("REMOTE_release_request free format %x\n", tail->rrq_format); #endif delete tail->rrq_format; } REMOTE_release_messages(message); } } Rrq* next = request->rrq_levels; #ifdef DEBUG_REMOTE_MEMORY printf("REMOTE_release_request free request %x\n", request); #endif delete request; if (!(request = next)) break; } } void REMOTE_reset_request( Rrq* request, RMessage* active_message) { /************************************** * * R E M O T E _ r e s e t _ r e q u e s t * ************************************** * * Functional description * Clean up a request in preparation to use it again. Since * there may be an active message (start_and_send), exercise * some care to avoid zapping that message. * **************************************/ Rrq::rrq_repeat* tail = request->rrq_rpt.begin(); const Rrq::rrq_repeat* const end = tail + request->rrq_max_msg; for (; tail <= end; tail++) { RMessage* message = tail->rrq_message; if (message != NULL && message != active_message) { tail->rrq_xdr = message; tail->rrq_rows_pending = 0; tail->rrq_reorder_level = 0; tail->rrq_batch_count = 0; while (true) { message->msg_address = NULL; message = message->msg_next; if (message == tail->rrq_message) break; } } } /* Initialize the request status to FB_SUCCESS */ request->rrq_status_vector[1] = 0; } void REMOTE_reset_statement( Rsr* statement) { /************************************** * * R E M O T E _ r e s e t _ s t a t e m e n t * ************************************** * * Functional description * Reset a statement by releasing all buffers except 1 * **************************************/ RMessage* message; if ((!statement) || (!(message = statement->rsr_message))) return; /* Reset all the pipeline counters */ statement->rsr_rows_pending = 0; statement->rsr_msgs_waiting = 0; statement->rsr_reorder_level = 0; statement->rsr_batch_count = 0; /* only one entry */ if (message->msg_next == message) return; /* find the entry before statement->rsr_message */ RMessage* temp = message->msg_next; while (temp->msg_next != message) temp = temp->msg_next; temp->msg_next = message->msg_next; message->msg_next = message; #ifdef SCROLLABLE_CURSORS message->msg_prior = message; #endif statement->rsr_buffer = statement->rsr_message; REMOTE_release_messages(temp); } void REMOTE_save_status_strings( ISC_STATUS* vector) { /************************************** * * R E M O T E _ s a v e _ s t a t u s _ s t r i n g s * ************************************** * * Functional description * There has been a failure during attach/create database. * The included string have been allocated off of the database block, * which is going to be released before the message gets passed * back to the user. So, to preserve information, copy any included * strings to a special buffer. * **************************************/ Firebird::MutexLockGuard guard(attachFailuresMutex); if (!attachFailures) { try { attachFailures = FB_NEW(*getDefaultMemoryPool()) Firebird::CircularStringsBuffer; /* FREE: freed by exit handler cleanup_memory() */ } catch (const Firebird::BadAlloc&) /* NOMEM: don't bother trying to copy */ { return; } gds__register_cleanup(cleanup_memory, 0); } attachFailures->makePermanentVector(vector, vector); } static void cleanup_memory(void* block) { /************************************** * * c l e a n u p _ m e m o r y * ************************************** * * Functional description * Cleanup any allocated memory. * **************************************/ delete attachFailures; attachFailures = NULL; gds__unregister_cleanup(cleanup_memory, 0); } // TMN: Beginning of C++ port - ugly but a start void rem_port::linkParent(rem_port* const parent) { fb_assert(parent); port_parent = parent; port_next = parent->port_clients; port_handle = parent->port_handle; port_server = parent->port_server; port_server_flags = parent->port_server_flags; parent->port_clients = parent->port_next = this; } void rem_port::unlinkParent() { fb_assert(this->port_parent != NULL); if (this->port_parent == NULL) return; #if DEV_BUILD bool found = false; #endif for (rem_port** ptr = &this->port_parent->port_clients; *ptr; ptr = &(*ptr)->port_next) { if (*ptr == this) { *ptr = this->port_next; if (ptr == &this->port_parent->port_clients) { fb_assert(this->port_parent->port_next == this); this->port_parent->port_next = *ptr; } #if DEV_BUILD found = true; #endif break; } } // for fb_assert(found); this->port_parent = NULL; } bool rem_port::accept(p_cnct* cnct) { return (*this->port_accept)(this, cnct); } void rem_port::disconnect() { (*this->port_disconnect)(this); } void rem_port::force_close() { (*this->port_force_close)(this); } rem_port* rem_port::receive(PACKET* pckt) { return (*this->port_receive_packet)(this, pckt); } bool rem_port::select_multi(UCHAR* buffer, SSHORT bufsize, SSHORT* length, RemPortPtr& port) { return (*this->port_select_multi)(this, buffer, bufsize, length, port); } XDR_INT rem_port::send(PACKET* pckt) { return (*this->port_send_packet)(this, pckt); } XDR_INT rem_port::send_partial(PACKET* pckt) { return (*this->port_send_partial)(this, pckt); } rem_port* rem_port::connect(PACKET* pckt) { return (*this->port_connect)(this, pckt); } rem_port* rem_port::request(PACKET* pckt) { return (*this->port_request)(this, pckt); } #ifdef REM_SERVER bool_t REMOTE_getbytes (XDR * xdrs, SCHAR * buff, u_int count) { /************************************** * * R E M O T E _ g e t b y t e s * ************************************** * * Functional description * Get a bunch of bytes from a port buffer * **************************************/ SLONG bytecount = count; /* Use memcpy to optimize bulk transfers. */ while (bytecount > 0) { if (xdrs->x_handy >= bytecount) { memcpy(buff, xdrs->x_private, bytecount); xdrs->x_private += bytecount; xdrs->x_handy -= bytecount; break; } if (xdrs->x_handy > 0) { memcpy(buff, xdrs->x_private, xdrs->x_handy); xdrs->x_private += xdrs->x_handy; buff += xdrs->x_handy; bytecount -= xdrs->x_handy; xdrs->x_handy = 0; } rem_port* port = (rem_port*) xdrs->x_public; Firebird::RefMutexGuard queGuard(*port->port_que_sync); if (port->port_qoffset >= port->port_queue.getCount()) { port->port_flags |= PORT_partial_data; return FALSE; } xdrs->x_handy = port->port_queue[port->port_qoffset].getCount(); fb_assert(xdrs->x_handy <= port->port_buff_size); memcpy(xdrs->x_base, port->port_queue[port->port_qoffset].begin(), xdrs->x_handy); ++port->port_qoffset; xdrs->x_private = xdrs->x_base; } return TRUE; } #endif //REM_SERVER #ifdef TRUSTED_AUTH ServerAuth::ServerAuth(const char* fName, int fLen, const Firebird::ClumpletWriter& pb, ServerAuth::Part2* p2, P_OP op) : fileName(*getDefaultMemoryPool()), clumplet(*getDefaultMemoryPool()), part2(p2), operation(op) { fileName.assign(fName, fLen); size_t pbLen = pb.getBufferLength(); if (pbLen) { memcpy(clumplet.getBuffer(pbLen), pb.getBuffer(), pbLen); } authSspi = FB_NEW(*getDefaultMemoryPool()) AuthSspi; } ServerAuth::~ServerAuth() { delete authSspi; } #endif // TRUSTED_AUTH void PortsCleanup::registerPort(rem_port* port) { Firebird::MutexLockGuard guard(m_mutex); if (!m_ports) { Firebird::MemoryPool& pool = *getDefaultMemoryPool(); m_ports = FB_NEW (pool) PortsArray(pool); } m_ports->add(port); } void PortsCleanup::unRegisterPort(rem_port* port) { Firebird::MutexLockGuard guard(m_mutex); if (m_ports) { size_t i; const bool found = m_ports->find(port, i); //fb_assert(found); if (found) m_ports->remove(i); } } void PortsCleanup::closePorts() { Firebird::MutexLockGuard guard(m_mutex); if (m_ports) { rem_port* const* ptr = m_ports->begin(); const rem_port* const* end = m_ports->end(); for (; ptr < end; ptr++) { (*ptr)->force_close(); } delete m_ports; m_ports = NULL; } }