8
0
mirror of https://github.com/FirebirdSQL/firebird.git synced 2025-01-22 23:23:02 +01:00
firebird-mirror/examples/interfaces/11.batch.cpp
2021-09-10 17:18:48 +03:00

591 lines
16 KiB
C++

/*
* PROGRAM: Object oriented API samples.
* MODULE: 11.batch.cpp
* DESCRIPTION: A trivial sample of using Batch interface.
*
* Example for the following interfaces:
* IBatch - interface to work with FB batches
* IBatchCompletionState - contains result of batch execution
*
* c++ 11.batch.cpp -lfbclient
*
* The contents of this file are subject to the Initial
* Developer's Public License Version 1.0 (the "License");
* you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl.
*
* Software distributed under the License is distributed AS IS,
* WITHOUT WARRANTY OF ANY KIND, either express or implied.
* See the License for the specific language governing rights
* and limitations under the License.
*
* The Original Code was created by Alexander Peshkoff
* for the Firebird Open Source RDBMS project.
*
* Copyright (c) 2017 Alexander Peshkoff <peshkoff@mail.ru>
* and all contributors signed below.
*
* All Rights Reserved.
* Contributor(s): ______________________________________.
*/
#include "ifaceExamples.h"
#include <firebird/Message.h>
static IMaster* master = fb_get_master_interface();
// output error message to user
static void errPrint(IStatus* status)
{
char buf[256];
master->getUtilInterface()->formatStatus(buf, sizeof(buf), status);
fprintf(stderr, "%s\n", buf);
}
// align target to alignment boundary
template <typename T>
static inline T align(T target, uintptr_t alignment)
{
return (T) ((((uintptr_t) target) + alignment - 1) & ~(alignment - 1));
}
// append given message to buffer ptr
static void putMsg(unsigned char*& ptr, const void* from, unsigned size, unsigned alignment)
{
memcpy(ptr, from, size);
ptr += align(size, alignment);
}
// append blob header with BPB to buffer ptr
// return pointer to blob size field - prefilled with BPB size
static unsigned* putBlobHdr(unsigned char*& ptr, unsigned alignment, ISC_QUAD* id, unsigned bpbSize, const unsigned char* bpb)
{
ptr = align(ptr, alignment);
memcpy(ptr, id, sizeof(ISC_QUAD));
ptr += sizeof(ISC_QUAD);
unsigned* rc = reinterpret_cast<unsigned*>(ptr);
memcpy(ptr, &bpbSize, sizeof(unsigned));
ptr += sizeof(unsigned);
memcpy(ptr, &bpbSize, sizeof(unsigned));
ptr += sizeof(unsigned);
memcpy(ptr, bpb, bpbSize);
ptr += bpbSize;
return rc;
}
// append given blob to buffer ptr
static void putBlob(unsigned char*& ptr, const void* from, unsigned size, unsigned alignment, ISC_QUAD* id)
{
unsigned* sizePtr = putBlobHdr(ptr, alignment, id, 0, NULL);
memcpy(ptr, from, size);
*sizePtr += size;
ptr += size;
ptr = align(ptr, alignment);
}
// append given segment to buffer ptr
unsigned putSegment(unsigned char*& ptr, const char* testData)
{
ptr = align(ptr, IBatch::BLOB_SEGHDR_ALIGN);
unsigned short l = strlen(testData);
memcpy(ptr, &l, sizeof l);
ptr += sizeof l;
memcpy(ptr, testData, l);
ptr += l;
return align(l + sizeof l, IBatch::BLOB_SEGHDR_ALIGN);
}
// batch info printer - prints what we know about batch
static void printInfo(ThrowStatusWrapper& status, const char* hdr, IBatch* b, IUtil* utl)
{
printf("\n%s\n", hdr);
const unsigned char items[] = {IBatch::INF_BLOB_ALIGNMENT, IBatch::INF_BUFFER_BYTES_SIZE,
IBatch::INF_DATA_BYTES_SIZE, IBatch::INF_BLOBS_BYTES_SIZE};
unsigned char buffer[29];
b->getInfo(&status, sizeof items, items, sizeof buffer, buffer);
IXpbBuilder* pb = utl->getXpbBuilder(&status, IXpbBuilder::INFO_RESPONSE, buffer, sizeof buffer);
for (pb->rewind(&status); !pb->isEof(&status); pb->moveNext(&status))
{
int val = pb->getInt(&status);
const char* text = "Unknown tag";
switch (pb->getTag(&status))
{
case IBatch::INF_BLOB_ALIGNMENT:
text = "Blob alignment";
break;
case IBatch::INF_BUFFER_BYTES_SIZE:
text = "Buffer size";
break;
case IBatch::INF_DATA_BYTES_SIZE:
text = "Messages size";
break;
case IBatch::INF_BLOBS_BYTES_SIZE:
text = "Blobs size";
break;
case isc_info_truncated:
printf(" truncated\n");
// fall down...
case isc_info_end:
pb->dispose();
return;
default:
printf("Unexpected item %d\n", pb->getTag(&status));
pb->dispose();
return;
}
printf("%s = %d\n", text, val);
}
pb->dispose();
}
// BatchCompletionState printer - prints all what we know about completed batch
static void print_cs(ThrowStatusWrapper& status, IBatchCompletionState* cs, IUtil* utl)
{
unsigned p = 0;
IStatus* s2 = NULL;
bool pr1 = false, pr2 = false;
// 1. Print per-message state info
unsigned upcount = cs->getSize(&status);
unsigned unk = 0, succ = 0;
for (p = 0; p < upcount; ++p)
{
int s = cs->getState(&status, p);
switch (s)
{
case IBatchCompletionState::EXECUTE_FAILED:
if (!pr1)
{
printf("Message Status\n");
pr1 = true;
}
printf("%5u Execute failed\n", p);
break;
case IBatchCompletionState::SUCCESS_NO_INFO:
++unk;
break;
default:
if (!pr1)
{
printf("Message Status\n");
pr1 = true;
}
printf("%5u Updated %d record(s)\n", p, s);
++succ;
break;
}
}
printf("Summary: total=%u success=%u success(but no update info)=%u\n", upcount, succ, unk);
// 2. Print detailed errors (if exist) for messages
s2 = master->getStatus();
for(p = 0; (p = cs->findError(&status, p)) != IBatchCompletionState::NO_MORE_ERRORS; ++p)
{
try
{
cs->getStatus(&status, s2, p);
char text[1024];
utl->formatStatus(text, sizeof(text) - 1, s2);
text[sizeof(text) - 1] = 0;
if (!pr2)
{
printf("\nDetailed errors status:\n");
pr2 = true;
}
printf("Message %u: %s\n", p, text);
}
catch (const FbException& error)
{
// handle error
fprintf(stderr, "\nError describing message %u\n", p);
errPrint(error.getStatus());
fprintf(stderr, "\n");
}
}
if (s2)
s2->dispose();
}
int main()
{
int rc = 0;
// set default password if none specified in environment
setenv("ISC_USER", "sysdba", 0);
setenv("ISC_PASSWORD", "masterkey", 0);
// With ThrowStatusWrapper passed as status interface FbException will be thrown on error
ThrowStatusWrapper status(master->getStatus());
// Declare pointers to required interfaces
IProvider* prov = master->getDispatcher();
IUtil* utl = master->getUtilInterface();
IAttachment* att = NULL;
ITransaction* tra = NULL;
IBatch* batch = NULL;
IBatchCompletionState* cs = NULL;
IXpbBuilder* pb = NULL;
unsigned char streamBuf[10240]; // big enough for demo
unsigned char* stream = NULL;
try
{
// attach employee db
att = prov->attachDatabase(&status, "employee", 0, NULL);
tra = att->startTransaction(&status, 0, NULL);
// cleanup
att->execute(&status, tra, 0, "delete from project where proj_id like 'BAT%'", SAMPLES_DIALECT,
NULL, NULL, NULL, NULL);
//
printf("\nPart 1. Simple messages. Adding one by one or by groups of messages, cancel batch.\n");
//
// Message to store in a table
FB_MESSAGE(Msg1, ThrowStatusWrapper,
(FB_VARCHAR(5), id)
(FB_VARCHAR(10), name)
) project1(&status, master);
project1.clear();
IMessageMetadata* meta = project1.getMetadata();
// sizes & alignments
unsigned mesAlign = meta->getAlignment(&status);
unsigned mesLength = meta->getMessageLength(&status);
unsigned char* streamStart = align(streamBuf, mesAlign);
// set batch parameters
pb = utl->getXpbBuilder(&status, IXpbBuilder::BATCH, NULL, 0);
// collect per-message statistics
pb->insertInt(&status, IBatch::TAG_RECORD_COUNTS, 1);
// create batch
const char* sqlStmt1 = "insert into project(proj_id, proj_name) values(?, ?)";
batch = att->createBatch(&status, tra, 0, sqlStmt1, SAMPLES_DIALECT, meta,
pb->getBufferLength(&status), pb->getBuffer(&status));
// fill batch with data record by record
project1->id.set("BAT11");
project1->name.set("SNGL_REC1");
batch->add(&status, 1, project1.getData());
project1->id.set("BAT12");
project1->name.set("SNGL_REC2");
batch->add(&status, 1, project1.getData());
// execute it
cs = batch->execute(&status, tra);
print_cs(status, cs, utl);
// add a big set of same records ...
for (int i = 0; i < 100000; ++i)
{
project1->id.set("BAT11");
project1->name.set("SNGL_REC");
batch->add(&status, 1, project1.getData());
}
// check batch state
printInfo(status, "Info when added many records", batch, utl);
// ... and cancel that records
batch->cancel(&status);
// fill batch with data using many records at once
stream = streamStart;
project1->id.set("BAT13");
project1->name.set("STRM_REC_A");
putMsg(stream, project1.getData(), mesLength, mesAlign);
project1->id.set("BAT14");
project1->name.set("STRM_REC_B");
putMsg(stream, project1.getData(), mesLength, mesAlign);
project1->id.set("BAT15");
project1->name.set("STRM_REC_C");
putMsg(stream, project1.getData(), mesLength, mesAlign);
batch->add(&status, 3, streamStart);
stream = streamStart;
project1->id.set("BAT15"); // constraint violation
project1->name.set("STRM_REC_D");
putMsg(stream, project1.getData(), mesLength, mesAlign);
project1->id.set("BAT16"); // will not be processed due to return on single error
project1->name.set("STRM_REC_E");
putMsg(stream, project1.getData(), mesLength, mesAlign);
batch->add(&status, 2, streamStart);
// execute it
cs = batch->execute(&status, tra);
print_cs(status, cs, utl);
// close batch
batch->close(&status);
batch = NULL;
//
printf("\nPart 2. Simple BLOBs. Multiple errors return.\n");
//
// Message to store in a table
FB_MESSAGE(Msg2, ThrowStatusWrapper,
(FB_VARCHAR(5), id)
(FB_VARCHAR(10), name)
(FB_BLOB, desc)
) project2(&status, master);
project2.clear();
meta = project2.getMetadata();
mesAlign = meta->getAlignment(&status);
mesLength = meta->getMessageLength(&status);
streamStart = align(streamBuf, mesAlign);
// set batch parameters
pb->clear(&status);
// continue batch processing in case of errors in some messages
pb->insertInt(&status, IBatch::TAG_MULTIERROR, 1);
// enable blobs processing - IDs generated by firebird engine
pb->insertInt(&status, IBatch::TAG_BLOB_POLICY, IBatch::BLOB_ID_ENGINE);
// create batch
const char* sqlStmt2 = "insert into project(proj_id, proj_name, proj_desc) values(?, ?, ?)";
batch = att->createBatch(&status, tra, 0, sqlStmt2, SAMPLES_DIALECT, meta,
pb->getBufferLength(&status), pb->getBuffer(&status));
// fill batch with data
project2->id.set("BAT21");
project2->name.set("SNGL_BLOB");
batch->addBlob(&status, strlen(sqlStmt2), sqlStmt2, &project2->desc, 0, NULL);
batch->appendBlobData(&status, 1, "\n");
batch->appendBlobData(&status, strlen(sqlStmt1), sqlStmt1);
batch->add(&status, 1, project2.getData());
printInfo(status, "Info with blob", batch, utl);
// execute it
cs = batch->execute(&status, tra);
print_cs(status, cs, utl);
// fill batch with data
project2->id.set("BAT22");
project2->name.set("SNGL_REC1");
batch->addBlob(&status, strlen(sqlStmt2), sqlStmt2, &project2->desc, 0, NULL);
batch->add(&status, 1, project2.getData());
project2->id.set("BAT22");
project2->name.set("SNGL_REC2"); // constraint violation
batch->addBlob(&status, 2, "r2", &project2->desc, 0, NULL);
batch->add(&status, 1, project2.getData());
project2->id.set("BAT23");
project2->name.set("SNGL_REC3");
batch->addBlob(&status, 2, "r3", &project2->desc, 0, NULL);
batch->add(&status, 1, project2.getData());
project2->id.set("BAT23"); // constraint violation
project2->name.set("SNGL_REC4");
batch->addBlob(&status, 2, "r4", &project2->desc, 0, NULL);
batch->add(&status, 1, project2.getData());
// execute it
cs = batch->execute(&status, tra);
print_cs(status, cs, utl);
// close batch
batch->close(&status);
batch = NULL;
//
printf("\nPart 3. BLOB stream, including segmented BLOB.\n");
//
// use Msg2/project2/sqlStmt2 to store in a table
// set batch parameters
pb->clear(&status);
// enable blobs processing - blobs are placed in a stream
pb->insertInt(&status, IBatch::TAG_BLOB_POLICY, IBatch::BLOB_STREAM);
// create batch
batch = att->createBatch(&status, tra, 0, sqlStmt2, SAMPLES_DIALECT, meta,
pb->getBufferLength(&status), pb->getBuffer(&status));
unsigned blobAlign = batch->getBlobAlignment(&status);
// prepare blob IDs
ISC_QUAD v1={0,1}, v2={0,2}, v3={0,3};
// send messages to batch
project2->id.set("BAT31");
project2->name.set("STRM_BLB_A");
project2->desc = v1;
batch->add(&status, 1, project2.getData());
project2->id.set("BAT32");
project2->name.set("STRM_BLB_B");
project2->desc = v2;
batch->add(&status, 1, project2.getData());
project2->id.set("BAT33");
project2->name.set("STRM_BLB_C");
project2->desc = v3;
batch->add(&status, 1, project2.getData());
// prepare blobs in the stream buffer
const char* d1 = "1111111111111111111";
const char* d2 = "22222222222222222222";
const char* d3 = "33333333333333333333333333333333333333333333333333333";
stream = streamStart;
putBlob(stream, d1, strlen(d1), blobAlign, &v1);
putBlob(stream, d2, strlen(d2), blobAlign, &v2);
putBlob(stream, d3, strlen(d3), blobAlign, &v3);
batch->addBlobStream(&status, stream - streamStart, streamStart);
// Continue last blob
stream = streamStart;
ISC_QUAD nullId = {0,0};
unsigned* size = putBlobHdr(stream, blobAlign, &nullId, 0, NULL);
const char* d4 = " 444444444444444444444444";
unsigned ld4 = strlen(d4);
memcpy(stream, d4, ld4);
*size += ld4;
stream += ld4;
stream = align(stream, blobAlign);
stream = align(stream, blobAlign);
batch->addBlobStream(&status, stream - streamStart, streamStart);
// Put segmented Blob in the stream
// add message
ISC_QUAD vSeg={0,10};
project2->id.set("BAT35");
project2->name.set("STRM_B_SEG");
project2->desc = vSeg;
batch->add(&status, 1, project2.getData());
// build BPB
pb->dispose();
pb = NULL;
pb = utl->getXpbBuilder(&status, IXpbBuilder::BPB, NULL, 0);
pb->insertInt(&status, isc_bpb_type, isc_bpb_type_segmented);
// make stream
stream = streamStart;
size = putBlobHdr(stream, blobAlign, &vSeg, pb->getBufferLength(&status), pb->getBuffer(&status));
*size += putSegment(stream, d1);
*size += putSegment(stream, "\n");
*size += putSegment(stream, d2);
*size += putSegment(stream, "\n");
*size += putSegment(stream, d3);
// add stream to the batch
stream = align(stream, blobAlign);
batch->addBlobStream(&status, stream - streamStart, streamStart);
// execute batch
cs = batch->execute(&status, tra);
print_cs(status, cs, utl);
//
printf("\nPart 4. BLOB created using IBlob interface.\n");
//
// use Msg2/project2/sqlStmt2 to store in a table
// registerBlob() may be called in BLOB_STREAM batch, ID should be generated by user in this case
// also demonstrates execution of same batch multiple times
// create blob
ISC_QUAD realId;
IBlob* blob = att->createBlob(&status, tra, &realId, 0, NULL);
const char* text = "Blob created using traditional API";
blob->putSegment(&status, strlen(text), text);
blob->close(&status);
// add message
project2->id.set("BAT38");
project2->name.set("FRGN_BLB");
project2->desc = v1; // after execute may reuse IDs
batch->registerBlob(&status, &realId, &project2->desc);
batch->add(&status, 1, project2.getData());
// execute it
cs = batch->execute(&status, tra);
print_cs(status, cs, utl);
// cleanup
batch->close(&status);
batch = NULL;
tra->commit(&status);
tra = NULL;
att->detach(&status);
att = NULL;
}
catch (const FbException& error)
{
// handle error
rc = 1;
errPrint(error.getStatus());
}
// release interfaces after error caught
if (cs)
cs->dispose();
if (batch)
batch->release();
if (tra)
tra->release();
if (att)
att->release();
// cleanup
if (pb)
pb->dispose();
status.dispose();
prov->release();
return rc;
}