8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 21:23:04 +01:00

Use correct way to initialize shared events.

Fixed few race conditions and hangs.
Unlink file used for shared memory when it is gets unused.
This commit is contained in:
Vlad Khorsun 2024-07-18 11:25:16 +03:00
parent 79dfe14465
commit ff0f3cf93f

View File

@ -34,6 +34,8 @@
#include "../jrd/pag_proto.h"
#include "../jrd/tra_proto.h"
#include <atomic>
using namespace Jrd;
using namespace Firebird;
@ -50,10 +52,14 @@ namespace
{
NOP = 0,
SERVER_STARTED,
SERVER_EXITED,
RESPONSE,
EXCEPTION,
CANCEL_SESSION,
FIRST_CLIENT_OP,
CANCEL_SESSION = FIRST_CLIENT_OP,
DISCARD,
FINISH_SESSION,
FLUSH,
@ -89,15 +95,16 @@ namespace
event_t serverEvent;
event_t clientEvent;
USHORT bufferSize;
Tag tag;
std::atomic<Tag> tag;
char userName[USERNAME_LENGTH + 1]; // \0 if has PROFILE_ANY_ATTACHMENT
alignas(FB_ALIGNMENT) UCHAR buffer[4096];
};
static const USHORT VERSION = 1;
static const USHORT VERSION = 2;
public:
ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmentId);
ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmentId, bool server = false);
~ProfilerIpc();
ProfilerIpc(const ProfilerIpc&) = delete;
ProfilerIpc& operator=(const ProfilerIpc&) = delete;
@ -138,10 +145,12 @@ namespace
private:
void internalSendAndReceive(thread_db* tdbb, Tag tag, const void* in, unsigned inSize, void* out, unsigned outSize);
void initClient();
public:
AutoPtr<SharedMemory<Header>> sharedMemory;
AttNumber attachmentId;
const bool isServer;
};
} // anonymous namespace
@ -748,8 +757,9 @@ ProfilerManager::Statement* ProfilerManager::getStatement(Request* request)
//--------------------------------------
ProfilerIpc::ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmentId)
: attachmentId(aAttachmentId)
ProfilerIpc::ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmentId, bool server)
: attachmentId(aAttachmentId),
isServer(server)
{
const auto database = tdbb->getDatabase();
@ -767,7 +777,33 @@ ProfilerIpc::ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmen
throw;
}
checkHeader(sharedMemory->getHeader());
const auto header = sharedMemory->getHeader();
checkHeader(header);
if (isServer)
{
Guard guard(this);
if (sharedMemory->eventInit(&header->serverEvent) != FB_SUCCESS)
(Arg::Gds(isc_random) << "ProfilerIpc eventInit(serverEvent) failed").raise();
}
}
ProfilerIpc::~ProfilerIpc()
{
Guard guard(this);
const auto header = sharedMemory->getHeader();
event_t* evnt = this->isServer ? &header->serverEvent : &header->clientEvent;
if (evnt->event_pid)
{
sharedMemory->eventFini(evnt);
evnt->event_pid = 0;
}
if (header->serverEvent.event_pid == 0 && header->clientEvent.event_pid == 0)
sharedMemory->removeMapFile();
}
bool ProfilerIpc::initialize(SharedMemoryBase* sm, bool init)
@ -778,15 +814,6 @@ bool ProfilerIpc::initialize(SharedMemoryBase* sm, bool init)
// Initialize the shared data header.
initHeader(header);
if (sm->eventInit(&header->serverEvent) != FB_SUCCESS)
(Arg::Gds(isc_random) << "ProfilerIpc eventInit(serverEvent) failed").raise();
if (sm->eventInit(&header->clientEvent) != FB_SUCCESS)
{
sm->eventFini(&header->serverEvent);
(Arg::Gds(isc_random) << "ProfilerIpc eventInit(clientEvent) failed").raise();
}
}
return true;
@ -828,7 +855,34 @@ void ProfilerIpc::internalSendAndReceive(thread_db* tdbb, Tag tag,
const auto header = sharedMemory->getHeader();
header->tag = tag;
initClient();
Cleanup finiClient([&] {
if (header->clientEvent.event_pid)
{
sharedMemory->eventFini(&header->clientEvent);
header->clientEvent.event_pid = 0;
}
});
const SLONG value = sharedMemory->eventClear(&header->clientEvent);
const Tag oldTag = header->tag.exchange(tag);
switch (oldTag)
{
case Tag::NOP:
header->tag = oldTag;
(Arg::Gds(isc_random) << "Remote attachment failed to start listener thread").raise();
break;
case Tag::SERVER_EXITED:
header->tag = oldTag;
(Arg::Gds(isc_random) << "Cannot start remote profile session - attachment exited").raise();
break;
default:
break;
};
if (attachment->locksmith(tdbb, PROFILE_ANY_ATTACHMENT))
header->userName[0] = '\0';
@ -840,27 +894,88 @@ void ProfilerIpc::internalSendAndReceive(thread_db* tdbb, Tag tag,
fb_assert(inSize <= sizeof(header->buffer));
memcpy(header->buffer, in, inSize);
const SLONG value = sharedMemory->eventClear(&header->clientEvent);
sharedMemory->eventPost(&header->serverEvent);
if (sharedMemory->eventPost(&header->serverEvent) != FB_SUCCESS)
(Arg::Gds(isc_random) << "Cannot start remote profile session - attachment exited").raise();
{
EngineCheckout cout(tdbb, FB_FUNCTION);
sharedMemory->eventWait(&header->clientEvent, value, 0);
const SLONG TIMEOUT = 500 * 1000; // 0.5 sec
const int serverPID = header->serverEvent.event_pid;
while (true)
{
{
EngineCheckout cout(tdbb, FB_FUNCTION);
if (sharedMemory->eventWait(&header->clientEvent, value, TIMEOUT) == FB_SUCCESS)
break;
if (serverPID != getpid() && !ISC_check_process_existence(serverPID))
{
// Server process was died or exited
fb_assert((header->tag == tag) || header->tag == Tag::SERVER_EXITED);
if (header->tag == tag)
{
header->tag = Tag::SERVER_EXITED;
if (header->serverEvent.event_pid)
{
sharedMemory->eventFini(&header->serverEvent);
header->serverEvent.event_pid = 0;
}
}
break;
}
}
JRD_reschedule(tdbb, true);
}
}
if (header->tag == Tag::RESPONSE)
switch (header->tag)
{
case Tag::SERVER_EXITED:
(Arg::Gds(isc_random) << "Cannot start remote profile session - attachment exited").raise();
break;
case Tag::RESPONSE:
fb_assert(outSize == header->bufferSize);
memcpy(out, header->buffer, header->bufferSize);
}
else
{
fb_assert(header->tag == Tag::EXCEPTION);
break;
case Tag::EXCEPTION:
(Arg::Gds(isc_random) << (char*) header->buffer).raise();
break;
default:
fb_assert(false);
}
}
void ProfilerIpc::initClient()
{
// Shared memory mutex must be locked by caller
fb_assert(isServer == false);
const auto header = sharedMemory->getHeader();
// Here should not be event created by another alive client
if (header->clientEvent.event_pid)
{
fb_assert(header->clientEvent.event_pid != getpid());
if (header->clientEvent.event_pid != getpid())
{
if (ISC_check_process_existence(header->clientEvent.event_pid))
(Arg::Gds(isc_random) << "ProfilerIpc eventInit(clientEvent) failed").raise();
}
sharedMemory->eventFini(&header->clientEvent);
}
if (sharedMemory->eventInit(&header->clientEvent) != FB_SUCCESS)
(Arg::Gds(isc_random) << "ProfilerIpc eventInit(clientEvent) failed").raise();
}
//--------------------------------------
@ -871,9 +986,10 @@ ProfilerListener::ProfilerListener(thread_db* tdbb)
{
auto& pool = *attachment->att_pool;
ipc = FB_NEW_POOL(pool) ProfilerIpc(tdbb, pool, attachment->att_attachment_id);
ipc = FB_NEW_POOL(pool) ProfilerIpc(tdbb, pool, attachment->att_attachment_id, true);
cleanupSync.run(this);
startupSemaphore.enter();
}
ProfilerListener::~ProfilerListener()
@ -881,19 +997,14 @@ ProfilerListener::~ProfilerListener()
exiting = true;
// Terminate the watcher thread.
startupSemaphore.tryEnter(5);
ProfilerIpc::Guard guard(ipc);
if (ipc)
{
auto& sharedMemory = ipc->sharedMemory;
sharedMemory->eventPost(&sharedMemory->getHeader()->serverEvent);
auto& sharedMemory = ipc->sharedMemory;
sharedMemory->eventPost(&sharedMemory->getHeader()->serverEvent);
cleanupSync.waitForCompletion();
const auto header = sharedMemory->getHeader();
sharedMemory->eventFini(&header->serverEvent);
sharedMemory->eventFini(&header->clientEvent);
cleanupSync.waitForCompletion();
}
}
void ProfilerListener::exceptionHandler(const Exception& ex, ThreadFinishSync<ProfilerListener*>::ThreadRoutine*)
@ -904,23 +1015,32 @@ void ProfilerListener::exceptionHandler(const Exception& ex, ThreadFinishSync<Pr
void ProfilerListener::watcherThread()
{
bool startup = true;
auto& sharedMemory = ipc->sharedMemory;
const auto header = sharedMemory->getHeader();
fb_assert(header->tag == ProfilerIpc::Tag::NOP);
header->tag = ProfilerIpc::Tag::SERVER_STARTED;
try
{
while (!exiting)
{
auto& sharedMemory = ipc->sharedMemory;
const auto header = sharedMemory->getHeader();
const SLONG value = sharedMemory->eventClear(&header->serverEvent);
if (header->tag != ProfilerIpc::Tag::NOP)
if (startup)
{
FbLocalStatus statusVector;
EngineContextHolder tdbb(&statusVector, attachment->getInterface(), FB_FUNCTION);
startup = false;
startupSemaphore.release();
}
else
{
fb_assert(header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP);
try
{
FbLocalStatus statusVector;
EngineContextHolder tdbb(&statusVector, attachment->getInterface(), FB_FUNCTION);
processCommand(tdbb);
header->tag = ProfilerIpc::Tag::RESPONSE;
}
@ -950,12 +1070,6 @@ void ProfilerListener::watcherThread()
sharedMemory->eventPost(&header->clientEvent);
}
if (startup)
{
startup = false;
startupSemaphore.release();
}
if (exiting)
break;
@ -967,6 +1081,13 @@ void ProfilerListener::watcherThread()
iscLogException("Error in profiler watcher thread\n", ex);
}
const ProfilerIpc::Tag oldTag = header->tag.exchange(ProfilerIpc::Tag::SERVER_EXITED);
if (oldTag >= ProfilerIpc::Tag::FIRST_CLIENT_OP)
{
fb_assert(header->clientEvent.event_pid);
sharedMemory->eventPost(&header->clientEvent);
}
try
{
if (startup)