diff --git a/CMakeLists.txt b/CMakeLists.txt index ce0cedd06..c099323db 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -197,16 +197,6 @@ else() endif() endif() -# TBD: TEMPORARY >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -if (NOT installBMQ) - # Enable MSG GroupId public APIs ONLY if not doing a DPKG build (i.e., a - # release) of libbmq; until the feature is fully implemented. - add_definitions("-DBMQ_ENABLE_MSG_GROUPID") -else() - message(STATUS "Message GroupId APIs *NOT* exposed!") -endif() -# TBD: TEMPORARY <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< - # ----------------------------------------------------------------------------- # PROJECTS # ----------------------------------------------------------------------------- diff --git a/src/groups/bmq/bmqimp/bmqimp_event.t.cpp b/src/groups/bmq/bmqimp/bmqimp_event.t.cpp index afa06357d..579a501f1 100644 --- a/src/groups/bmq/bmqimp/bmqimp_event.t.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_event.t.cpp @@ -1077,7 +1077,9 @@ static void test8_putEventBuilder() bmqp::Crc32c::initialize(); bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); +#ifdef BMQ_ENABLE_MSG_GROUPID const bmqp::Protocol::MsgGroupId k_MSG_GROUP_ID("gid:0", s_allocator_p); +#endif const int k_PROPERTY_VAL_ENCODING = 3; const bsl::string k_PROPERTY_VAL_ID = "myCoolId"; const unsigned int k_CRC32 = 123; @@ -1106,8 +1108,10 @@ static void test8_putEventBuilder() builder.startMessage(); builder.setMessagePayload(k_PAYLOAD, k_PAYLOAD_LEN) - .setMessageProperties(&msgProps) - .setMsgGroupId(k_MSG_GROUP_ID); + .setMessageProperties(&msgProps); +#ifdef BMQ_ENABLE_MSG_GROUPID + builder.setMsgGroupId(k_MSG_GROUP_ID); +#endif struct Test { int d_line; @@ -1149,8 +1153,10 @@ static void test8_putEventBuilder() s_allocator_p, bmqt::CompressionAlgorithmType::e_NONE); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(builder.msgGroupId().isNull(), false); ASSERT_EQ(builder.msgGroupId().value(), k_MSG_GROUP_ID); +#endif ASSERT_EQ(builder.unpackedMessageSize(), k_PAYLOAD_LEN); @@ -1232,11 +1238,13 @@ static void test8_putEventBuilder() ASSERT_EQ(prop.getPropertyAsInt64("timestamp"), test.d_timeStamp); } +#ifdef BMQ_ENABLE_MSG_GROUPID bmqp::Protocol::MsgGroupId msgGroupId(s_allocator_p); ASSERT_EQ(putIter.hasMsgGroupId(), true); ASSERT_EQ(putIter.extractMsgGroupId(&msgGroupId), true); ASSERT_EQ(msgGroupId, k_MSG_GROUP_ID); ASSERT_EQ(putIter.isValid(), true); +#endif } ASSERT_EQ(true, putIter.isValid()); diff --git a/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp b/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp index 52c6c05da..b6480aab2 100644 --- a/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp @@ -228,9 +228,14 @@ struct Tester BSLS_CPP11_FINAL { /// is undefined unless a queue with the `uri` was created and inserted /// using a call to `insertQueue`, or if packing the message is /// unsuccessful. +#ifdef BMQ_ENABLE_MSG_GROUPID void packPutMessage(const bslstl::StringRef& uri, const bslstl::StringRef& payload, const bslstl::StringRef& msgGroupId = ""); +#else + void packPutMessage(const bslstl::StringRef& uri, + const bslstl::StringRef& payload); +#endif /// Append to the CONFIRM event being built a CONFIRM message associated /// with the queue corresponding to the specified `uri`. The behavior @@ -460,9 +465,14 @@ void Tester::appendAckMessage(const bslstl::StringRef& uri, BSLS_ASSERT_OPT(rc == 0); } +#ifdef BMQ_ENABLE_MSG_GROUPID void Tester::packPutMessage(const bslstl::StringRef& uri, const bslstl::StringRef& payload, const bslstl::StringRef& msgGroupId) +#else +void Tester::packPutMessage(const bslstl::StringRef& uri, + const bslstl::StringRef& payload) +#endif { // PRECONDITIONS BSLS_ASSERT_OPT(d_queueIdsByUri.find(uri) != d_queueIdsByUri.end() && @@ -481,10 +491,12 @@ void Tester::packPutMessage(const bslstl::StringRef& uri, d_putEventBuilder.startMessage(); d_putEventBuilder.setMessageGUID(guid).setMessagePayload(&msgPayload); +#ifdef BMQ_ENABLE_MSG_GROUPID if (!msgGroupId.empty()) { bmqp::Protocol::MsgGroupId groupId(msgGroupId, d_allocator_p); d_putEventBuilder.setMsgGroupId(msgGroupId); } +#endif int rc = d_putEventBuilder.packMessage(queueId.id()); BSLS_ASSERT_OPT(rc == 0); @@ -1659,8 +1671,13 @@ static void test8_dumpPutEvent() // messages. tester.processDumpCommand("PUT ON"); +#ifdef BMQ_ENABLE_MSG_GROUPID tester.packPutMessage("bmq://bmq.test.mmap.fanout/q1", "abcd", "Group 1"); tester.packPutMessage("bmq://bmq.test.mmap.fanout/q1", "abcd", "Group 2"); +#else + tester.packPutMessage("bmq://bmq.test.mmap.fanout/q1", "abcd"); + tester.packPutMessage("bmq://bmq.test.mmap.fanout/q1", "abcd"); +#endif tester.packPutMessage("bmq://bmq.test.mmap.priority/q1", "efgh"); tester.packPutMessage("bmq://bmq.test.mmap.priority/q2", "ijkl"); @@ -1674,6 +1691,7 @@ static void test8_dumpPutEvent() PVV(L_ << ": PUT event dump: " << out.str()); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(regexMatch(out.str(), "PUT Message #1:.*" "queue: bmq://bmq.test.mmap.fanout/q1.*" @@ -1688,6 +1706,20 @@ static void test8_dumpPutEvent() "abcd.*", s_allocator_p), true); +#else + ASSERT_EQ(regexMatch(out.str(), + "PUT Message #1:.*" + "queue: bmq://bmq.test.mmap.fanout/q1.*" + "abcd.*", + s_allocator_p), + true); + ASSERT_EQ(regexMatch(out.str(), + "PUT Message #2:.*" + "queue: bmq://bmq.test.mmap.fanout/q1.*" + "abcd.*", + s_allocator_p), + true); +#endif ASSERT_EQ(regexMatch(out.str(), "PUT Message #3:.*" "queue: bmq://bmq.test.mmap.priority/q1.*" diff --git a/src/groups/bmq/bmqp/bmqp_optionutil.t.cpp b/src/groups/bmq/bmqp/bmqp_optionutil.t.cpp index d59da1c23..938f1e7af 100644 --- a/src/groups/bmq/bmqp/bmqp_optionutil.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_optionutil.t.cpp @@ -388,6 +388,7 @@ static void test3_checkOptionsBlobSegment() } } +#ifdef BMQ_ENABLE_MSG_GROUPID static void test4_isValidMsgGroupId() // ------------------------------------------------------------------------ // VALIDATE GROUPID LENGTH @@ -426,6 +427,7 @@ static void test4_isValidMsgGroupId() ASSERT_EQ(Result::e_SUCCESS, bmqp::OptionUtil::isValidMsgGroupId(maxLength)); } +#endif // ============================================================================ // MAIN PROGRAM @@ -439,7 +441,9 @@ int main(int argc, char* argv[]) switch (_testCase) { case 0: +#ifdef BMQ_ENABLE_MSG_GROUPID case 4: test4_isValidMsgGroupId(); break; +#endif case 3: test3_checkOptionsBlobSegment(); break; case 2: test2_basicOptionsBoxCanAdd(); break; case 1: test1_basicOptionMetaProperties(); break; diff --git a/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp b/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp index 3d7e9f456..7bd65da44 100644 --- a/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp @@ -58,13 +58,17 @@ using namespace bsl; // ---------------------------------------------------------------------------- namespace { +#ifdef BMQ_ENABLE_MSG_GROUPID typedef bdlb::NullableValue NullableMsgGroupId; +#endif struct Data { bmqt::MessageGUID d_guid; int d_qid; bmqp::Protocol::SubQueueInfosArray d_subQueueInfos; +#ifdef BMQ_ENABLE_MSG_GROUPID NullableMsgGroupId d_msgGroupId; +#endif bdlbb::Blob d_payload; int d_flags; bmqt::CompressionAlgorithmType::Enum d_compressionAlgorithmType; @@ -82,7 +86,9 @@ Data::Data(bdlbb::BlobBufferFactory* bufferFactory, bslma::Allocator* allocator) : d_qid(-1) , d_subQueueInfos(allocator) +#ifdef BMQ_ENABLE_MSG_GROUPID , d_msgGroupId(allocator) +#endif , d_payload(bufferFactory, allocator) , d_flags(0) , d_compressionAlgorithmType(bmqt::CompressionAlgorithmType::e_NONE) @@ -94,7 +100,9 @@ Data::Data(const Data& other, bslma::Allocator* allocator) : d_guid(other.d_guid) , d_qid(other.d_qid) , d_subQueueInfos(other.d_subQueueInfos, allocator) +#ifdef BMQ_ENABLE_MSG_GROUPID , d_msgGroupId(other.d_msgGroupId, allocator) +#endif , d_payload(other.d_payload, allocator) , d_flags(other.d_flags) , d_compressionAlgorithmType(other.d_compressionAlgorithmType) @@ -142,6 +150,7 @@ void generateSubQueueInfos(bmqp::Protocol::SubQueueInfosArray* subQueueInfos, static_cast(numSubQueueInfos)); } +#ifdef BMQ_ENABLE_MSG_GROUPID /// Populate the specified `msgGroupId` with a random Group Id. static void generateMsgGroupId(bmqp::Protocol::MsgGroupId* msgGroupId) { @@ -152,6 +161,7 @@ static void generateMsgGroupId(bmqp::Protocol::MsgGroupId* msgGroupId) oss << "gid:" << generateRandomInteger(0, 120); *msgGroupId = oss.str(); } +#endif /// Append at least `atLeastLen` bytes to the specified `blob` and populate /// the specified `payloadLen` with the number of bytes appended. @@ -200,6 +210,7 @@ appendMessage(size_t iteration, return rc; // RETURN } +#ifdef BMQ_ENABLE_MSG_GROUPID // Every 3rd iteration we don't add a Group Id. if (iteration % 3) { generateMsgGroupId(&data.d_msgGroupId.makeValue()); @@ -208,11 +219,14 @@ appendMessage(size_t iteration, return rc; // RETURN } } +#endif bdlbb::Blob payload(bufferFactory, allocator); const int blobSize = generateRandomInteger(0, 1024); +#ifdef BMQ_ENABLE_MSG_GROUPID const bmqp::Protocol::MsgGroupId str(blobSize, 'x', allocator); bdlbb::BlobUtil::append(&payload, str.c_str(), blobSize); +#endif data.d_payload = payload; @@ -651,6 +665,7 @@ static void test4_buildEventWithMultipleMessages() ASSERT_EQ_D(i, D.d_subQueueInfos[i], retrievedSQInfos[i]); } } +#ifdef BMQ_ENABLE_MSG_GROUPID const bool hasMsgGroupId = !D.d_msgGroupId.isNull(); ASSERT_EQ(hasMsgGroupId, optionsView.find(bmqp::OptionType::e_MSG_GROUP_ID) != @@ -661,6 +676,7 @@ static void test4_buildEventWithMultipleMessages() optionsView.loadMsgGroupIdOption(&retrievedMsgGroupId)); ASSERT_EQ(D.d_msgGroupId.value(), retrievedMsgGroupId); } +#endif ++dataIndex; } @@ -900,6 +916,7 @@ static void test7_buildEventOptionTooBig() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_OPTION_TOO_BIG); +#ifdef BMQ_ENABLE_MSG_GROUPID // Add another option larger than maximum allowed bmqp::Protocol::MsgGroupId msgGrIdBig1(bmqp::OptionHeader::k_MAX_SIZE + 1, 'x', @@ -916,7 +933,6 @@ static void test7_buildEventOptionTooBig() rc = peb.addMsgGroupIdOption(msgGrIdBig2); -#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(rc, bmqt::EventBuilderResult::e_INVALID_MSG_GROUP_ID); #endif diff --git a/src/groups/bmq/bmqp/bmqp_puteventbuilder.t.cpp b/src/groups/bmq/bmqp/bmqp_puteventbuilder.t.cpp index 4396d0d99..815fb43e7 100644 --- a/src/groups/bmq/bmqp/bmqp_puteventbuilder.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_puteventbuilder.t.cpp @@ -96,6 +96,7 @@ Data::Data(const Data& other, bslma::Allocator* allocator) // NOTHING } +#ifdef BMQ_ENABLE_MSG_GROUPID void setMsgGroupId(bmqp::PutEventBuilder* peb, const size_t iteration) { mwcu::MemOutStream oss(s_allocator_p); @@ -113,6 +114,7 @@ void validateGroupId(const size_t iteration, oss << "gid:" << iteration; ASSERT_EQ(oss.str(), msgGroupId); } +#endif bmqt::EventBuilderResult::Enum appendMessage(size_t iteration, @@ -137,7 +139,9 @@ appendMessage(size_t iteration, peb->startMessage(); } +#ifdef BMQ_ENABLE_MSG_GROUPID setMsgGroupId(peb, iteration); +#endif peb->setMessagePayload(&data.d_payload); peb->setMessageGUID(data.d_guid); @@ -211,7 +215,9 @@ static void test1_breathingTest() mwctst::TestHelper::printTestName("BREATHING TEST"); bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); +#ifdef BMQ_ENABLE_MSG_GROUPID const bmqp::Protocol::MsgGroupId k_MSG_GROUP_ID("gid:0", s_allocator_p); +#endif const int k_PROPERTY_VAL_ENCODING = 3; const bsl::string k_PROPERTY_VAL_ID = "myCoolId"; const unsigned int k_CRC32 = 123; @@ -254,7 +260,9 @@ static void test1_breathingTest() obj.startMessage(); obj.setMessagePayload(k_PAYLOAD_BIGGER, k_PAYLOAD_BIGGER_LEN); obj.setMessageProperties(&msgProps); +#ifdef BMQ_ENABLE_MSG_GROUPID obj.setMsgGroupId(k_MSG_GROUP_ID); +#endif struct Test { int d_line; @@ -303,8 +311,10 @@ static void test1_breathingTest() s_allocator_p, obj.compressionAlgorithmType()); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(obj.msgGroupId().isNull(), false); ASSERT_EQ(obj.msgGroupId().value(), k_MSG_GROUP_ID); +#endif ASSERT_EQ(obj.unpackedMessageSize(), k_PAYLOAD_BIGGER_LEN); @@ -391,11 +401,13 @@ static void test1_breathingTest() test.d_timeStamp); } +#ifdef BMQ_ENABLE_MSG_GROUPID bmqp::Protocol::MsgGroupId msgGroupId(s_allocator_p); ASSERT_EQ(putIter.hasMsgGroupId(), true); ASSERT_EQ(putIter.extractMsgGroupId(&msgGroupId), true); ASSERT_EQ(msgGroupId, k_MSG_GROUP_ID); ASSERT_EQ(putIter.isValid(), true); +#endif } ASSERT_EQ(true, putIter.isValid()); @@ -472,7 +484,9 @@ static void test1_breathingTest() obj.startMessage(); obj.setMessagePayload(k_PAYLOAD_BIGGER, k_PAYLOAD_BIGGER_LEN); obj.setMessageProperties(&msgProps); +#ifdef BMQ_ENABLE_MSG_GROUPID obj.setMsgGroupId(k_MSG_GROUP_ID); +#endif struct Test { int d_line; @@ -520,8 +534,10 @@ static void test1_breathingTest() s_allocator_p, obj.compressionAlgorithmType()); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(obj.msgGroupId().isNull(), false); ASSERT_EQ(obj.msgGroupId().value(), k_MSG_GROUP_ID); +#endif ASSERT_EQ(obj.unpackedMessageSize(), k_PAYLOAD_BIGGER_LEN); @@ -609,10 +625,12 @@ static void test1_breathingTest() test.d_timeStamp); } +#ifdef BMQ_ENABLE_MSG_GROUPID bmqp::Protocol::MsgGroupId msgGroupId(s_allocator_p); ASSERT_EQ(putIter.hasMsgGroupId(), true); ASSERT_EQ(putIter.extractMsgGroupId(&msgGroupId), true); ASSERT_EQ(msgGroupId, k_MSG_GROUP_ID); +#endif ASSERT_EQ(putIter.isValid(), true); } @@ -696,7 +714,9 @@ static void test1_breathingTest() obj.startMessage(); obj.setMessagePayload(k_PAYLOAD_BIGGER, k_PAYLOAD_BIGGER_LEN); obj.setMessageProperties(&msgProps); +#ifdef BMQ_ENABLE_MSG_GROUPID obj.setMsgGroupId(k_MSG_GROUP_ID); +#endif struct Test { int d_line; @@ -751,8 +771,10 @@ static void test1_breathingTest() s_allocator_p, obj.compressionAlgorithmType()); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(obj.msgGroupId().isNull(), false); ASSERT_EQ(obj.msgGroupId().value(), k_MSG_GROUP_ID); +#endif ASSERT_EQ(obj.unpackedMessageSize(), k_PAYLOAD_BIGGER_LEN); @@ -847,10 +869,12 @@ static void test1_breathingTest() test.d_timeStamp); } +#ifdef BMQ_ENABLE_MSG_GROUPID bmqp::Protocol::MsgGroupId msgGroupId(s_allocator_p); ASSERT_EQ(putIter.hasMsgGroupId(), true); ASSERT_EQ(putIter.extractMsgGroupId(&msgGroupId), true); ASSERT_EQ(msgGroupId, k_MSG_GROUP_ID); +#endif ASSERT_EQ(putIter.isValid(), true); } @@ -935,7 +959,9 @@ static void test1_breathingTest() obj.startMessage(); obj.setMessagePayload(k_PAYLOAD, k_PAYLOAD_LEN); obj.setMessageProperties(&msgProps); +#ifdef BMQ_ENABLE_MSG_GROUPID obj.setMsgGroupId(k_MSG_GROUP_ID); +#endif struct Test { int d_line; @@ -990,8 +1016,10 @@ static void test1_breathingTest() s_allocator_p, bmqt::CompressionAlgorithmType::e_NONE); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(obj.msgGroupId().isNull(), false); ASSERT_EQ(obj.msgGroupId().value(), k_MSG_GROUP_ID); +#endif ASSERT_EQ(obj.unpackedMessageSize(), k_PAYLOAD_LEN); @@ -1079,10 +1107,12 @@ static void test1_breathingTest() test.d_timeStamp); } +#ifdef BMQ_ENABLE_MSG_GROUPID bmqp::Protocol::MsgGroupId msgGroupId(s_allocator_p); ASSERT_EQ(putIter.hasMsgGroupId(), true); ASSERT_EQ(putIter.extractMsgGroupId(&msgGroupId), true); ASSERT_EQ(msgGroupId, k_MSG_GROUP_ID); +#endif ASSERT_EQ(putIter.isValid(), true); } @@ -1162,7 +1192,9 @@ static void test1_breathingTest() obj.startMessage(); obj.setMessagePayload(k_PAYLOAD_BIGGER, k_PAYLOAD_BIGGER_LEN); obj.setMessageProperties(&msgProps); +#ifdef BMQ_ENABLE_MSG_GROUPID obj.setMsgGroupId(k_MSG_GROUP_ID); +#endif struct Test { int d_line; @@ -1211,8 +1243,10 @@ static void test1_breathingTest() s_allocator_p, bmqt::CompressionAlgorithmType::e_NONE); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(obj.msgGroupId().isNull(), false); ASSERT_EQ(obj.msgGroupId().value(), k_MSG_GROUP_ID); +#endif ASSERT_EQ(obj.unpackedMessageSize(), k_PAYLOAD_BIGGER_LEN); @@ -1299,10 +1333,12 @@ static void test1_breathingTest() test.d_timeStamp); } +#ifdef BMQ_ENABLE_MSG_GROUPID bmqp::Protocol::MsgGroupId msgGroupId(s_allocator_p); ASSERT_EQ(putIter.hasMsgGroupId(), true); ASSERT_EQ(putIter.extractMsgGroupId(&msgGroupId), true); ASSERT_EQ(msgGroupId, k_MSG_GROUP_ID); +#endif ASSERT_EQ(putIter.isValid(), true); } @@ -1382,7 +1418,9 @@ static void test1_breathingTest() s_allocator_p); obj.setMessagePayload(&payload); +#ifdef BMQ_ENABLE_MSG_GROUPID obj.setMsgGroupId(k_MSG_GROUP_ID); +#endif struct Test { int d_line; @@ -1410,8 +1448,10 @@ static void test1_breathingTest() ASSERT_EQ(obj.crc32c(), k_CRC32); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(obj.msgGroupId().isNull(), false); ASSERT_EQ(obj.msgGroupId().value(), k_MSG_GROUP_ID); +#endif ASSERT_EQ(obj.unpackedMessageSize(), payload.length()); @@ -1476,10 +1516,12 @@ static void test1_breathingTest() ASSERT_EQ(0, putIter.loadMessageProperties(&prop)); ASSERT_EQ(0, prop.numProperties()); +#ifdef BMQ_ENABLE_MSG_GROUPID bmqp::Protocol::MsgGroupId msgGroupId(s_allocator_p); ASSERT_EQ(putIter.hasMsgGroupId(), true); ASSERT_EQ(putIter.extractMsgGroupId(&msgGroupId), true); ASSERT_EQ(msgGroupId, k_MSG_GROUP_ID); +#endif ASSERT_EQ(putIter.isValid(), true); } @@ -1636,7 +1678,9 @@ static void test2_manipulators_one() ASSERT_EQ_D(dataIdx, obj.unpackedMessageSize(), 0); +#ifdef BMQ_ENABLE_MSG_GROUPID setMsgGroupId(&obj, dataIdx); +#endif obj.setMessagePayload(data.d_payload, data.d_payloadLen); ASSERT_EQ(0, msgProps.setPropertyAsInt64("timestamp", dataIdx * 10LL)); @@ -1720,7 +1764,9 @@ static void test2_manipulators_one() ASSERT_EQ(bmqt::PropertyType::e_INT64, ptype); ASSERT_EQ(dataIndex * 10LL, p.getPropertyAsInt64("timestamp")); +#ifdef BMQ_ENABLE_MSG_GROUPID validateGroupId(dataIndex, putIter); +#endif ++dataIndex; } @@ -1747,7 +1793,9 @@ static void test3_eventTooBig() bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); bdlbb::Blob bigMsgPayload(&bufferFactory, s_allocator_p); +#ifdef BMQ_ENABLE_MSG_GROUPID bmqp::Protocol::MsgGroupId k_MSG_GROUP_ID("gid:0", s_allocator_p); +#endif const int k_QID = 4321; bmqt::MessageGUID guid = bmqp::MessageGUIDGenerator::testGUID(); @@ -1762,7 +1810,9 @@ static void test3_eventTooBig() bmqp::PutEventBuilder obj(&bufferFactory, s_allocator_p); obj.startMessage(); +#ifdef BMQ_ENABLE_MSG_GROUPID obj.setMsgGroupId(k_MSG_GROUP_ID); +#endif obj.setMessageGUID(guid); obj.setMessagePayload(&bigMsgPayload); @@ -1774,7 +1824,9 @@ static void test3_eventTooBig() const int k_PAYLOAD_LEN = bsl::strlen(k_PAYLOAD); // Now append a "regular"-sized message +#ifdef BMQ_ENABLE_MSG_GROUPID obj.setMsgGroupId(k_MSG_GROUP_ID); +#endif obj.setMessageGUID(guid); obj.setMessagePayload(k_PAYLOAD, k_PAYLOAD_LEN); rc = obj.packMessage(k_QID); @@ -1814,11 +1866,13 @@ static void test3_eventTooBig() ASSERT_EQ(res, 0); ASSERT_EQ(compareResult, 0); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT(putIter.hasMsgGroupId()); bmqp::Protocol::MsgGroupId msgGroupId(s_allocator_p); ASSERT_EQ(putIter.extractMsgGroupId(&msgGroupId), true); ASSERT_EQ(msgGroupId, k_MSG_GROUP_ID); +#endif ASSERT_EQ(0, putIter.next()); // we added only 1 msg } @@ -1890,7 +1944,9 @@ static void test4_manipulators_two() 0, bdlbb::BlobUtil::compare(payloadBlob, D.d_payload)); +#ifdef BMQ_ENABLE_MSG_GROUPID validateGroupId(dataIndex, putIter); +#endif ++dataIndex; } @@ -1963,7 +2019,9 @@ static void test6_emptyBuilder() mwctst::TestHelper::printTestName("EMPTY BUILDER"); bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); +#ifdef BMQ_ENABLE_MSG_GROUPID bmqp::Protocol::MsgGroupId k_MSG_GROUP_ID("gid:0", s_allocator_p); +#endif unsigned char zeroGuidBuf[bmqt::MessageGUID::e_SIZE_BINARY]; bsl::memset(zeroGuidBuf, 0, bmqt::MessageGUID::e_SIZE_BINARY); @@ -1981,7 +2039,9 @@ static void test6_emptyBuilder() ASSERT_EQ(obj.unpackedMessageSize(), 0); ASSERT_SAFE_FAIL(obj.setFlags(0)); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_SAFE_FAIL(obj.setMsgGroupId(k_MSG_GROUP_ID)); +#endif ASSERT_SAFE_FAIL(obj.setMessageGUID(zeroGuid)); ASSERT_SAFE_FAIL(obj.setCrc32c(0)); ASSERT_SAFE_FAIL(obj.setMessagePayload(k_PAYLOAD, bsl::strlen(k_PAYLOAD))); @@ -1992,7 +2052,9 @@ static void test6_emptyBuilder() const int evtSize = sizeof(bmqp::EventHeader); ASSERT_EQ(obj.messageGUID(), zeroGuid); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(obj.msgGroupId().isNull(), true); +#endif ASSERT_EQ(obj.unpackedMessageSize(), 0); ASSERT_EQ(obj.eventSize(), evtSize); ASSERT_EQ(obj.flags(), 0); @@ -2004,6 +2066,7 @@ static void test6_emptyBuilder() ASSERT_EQ(obj.messageGUID(), onesGuid); +#ifdef BMQ_ENABLE_MSG_GROUPID obj.setMsgGroupId(k_MSG_GROUP_ID); ASSERT_EQ(obj.msgGroupId().isNull(), false); @@ -2012,6 +2075,7 @@ static void test6_emptyBuilder() obj.clearMsgGroupId(); ASSERT_EQ(obj.msgGroupId().isNull(), true); +#endif static_cast(k_PAYLOAD); // suppress 'unused-variable' warning in // prod build @@ -2040,7 +2104,9 @@ static void test7_multiplePackMessage() mwctst::TestHelper::printTestName("TEST MULTIPLE CALLS TO PACK MESSAGE"); bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); +#ifdef BMQ_ENABLE_MSG_GROUPID const bmqp::Protocol::MsgGroupId k_MSG_GROUP_ID("gid:0", s_allocator_p); +#endif const int k_PROPERTY_VAL_ENCODING = 3; const bsl::string k_PROPERTY_VAL_ID = "myCoolId"; const unsigned int k_CRC32 = 123; @@ -2080,7 +2146,9 @@ static void test7_multiplePackMessage() obj.setMessagePayload(k_PAYLOAD_BIGGER, k_PAYLOAD_BIGGER_LEN) .setMessageProperties(&msgProps) .setMessageGUID(bmqp::MessageGUIDGenerator::testGUID()) +#ifdef BMQ_ENABLE_MSG_GROUPID .setMsgGroupId(k_MSG_GROUP_ID) +#endif .setCompressionAlgorithmType(bmqt::CompressionAlgorithmType::e_ZLIB); int d_q1 = 9876; @@ -2102,8 +2170,10 @@ static void test7_multiplePackMessage() s_allocator_p, obj.compressionAlgorithmType()); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(obj.msgGroupId().isNull(), false); ASSERT_EQ(obj.msgGroupId().value(), k_MSG_GROUP_ID); +#endif ASSERT_EQ(obj.unpackedMessageSize(), k_PAYLOAD_BIGGER_LEN); @@ -2185,10 +2255,12 @@ static void test7_multiplePackMessage() ASSERT_EQ(bmqt::PropertyType::e_INT64, ptype); ASSERT_EQ(prop.getPropertyAsInt64("timestamp"), k_TIME_STAMP); +#ifdef BMQ_ENABLE_MSG_GROUPID bmqp::Protocol::MsgGroupId msgGroupId(s_allocator_p); ASSERT_EQ(putIter.hasMsgGroupId(), true); ASSERT_EQ(putIter.extractMsgGroupId(&msgGroupId), true); ASSERT_EQ(msgGroupId, k_MSG_GROUP_ID); +#endif ASSERT_EQ(putIter.isValid(), true); } @@ -2211,7 +2283,9 @@ static void test7_multiplePackMessage() ASSERT_EQ(bmqt::EventBuilderResult::e_SUCCESS, rc); ASSERT_GT(obj.eventSize(), k_PAYLOAD_BIGGER_LEN); ASSERT_EQ(obj.messageCount(), 3); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(obj.msgGroupId().isNull(), true); +#endif ASSERT_EQ(obj.compressionAlgorithmType(), bmqt::CompressionAlgorithmType::e_NONE); rawEvent.reset(&obj.blob()); @@ -2239,7 +2313,9 @@ static void test7_multiplePackMessage() ASSERT_EQ(0, res); ASSERT_EQ(0, compareResult); ASSERT_EQ(false, putIter.hasMessageProperties()); +#ifdef BMQ_ENABLE_MSG_GROUPID ASSERT_EQ(false, putIter.hasMsgGroupId()); +#endif ASSERT_EQ(0, putIter.loadMessageProperties(&prop)); ASSERT_EQ(0, prop.numProperties()); ASSERT_EQ(true, putIter.isValid()); diff --git a/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp b/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp index 9c5445d42..42e0ee528 100644 --- a/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp @@ -77,82 +77,83 @@ struct DomainQueueStats { e_STAT_NB_PRODUCER , + /// Value: Current number of clients who opened the queue with + /// the 'READ' flag e_STAT_NB_CONSUMER - // Value: Current number of clients who opened the queue with - // the 'READ' flag , + /// Value: Current number of messages in the queue e_STAT_MESSAGES - // Value: Current number of messages in the queue , + /// Value: Accumulated bytes of all messages currently in the + /// queue e_STAT_BYTES - // Value: Accumulated bytes of all messages currently in the - // queue , + /// Value: Number of ack messages delivered by this queue e_STAT_ACK - // Value: Number of ack messages delivered by this queue , + /// Value: The time between PUT and ACK (in nanoseconds). e_STAT_ACK_TIME - // Value: The time between PUT and ACK (in nanoseconds). , + /// Value: Number of NACK messages generated for this queue e_STAT_NACK - // Value: Number of NACK messages generated for this queue , + /// Value: Number of CONFIRM messages received by this queue e_STAT_CONFIRM - // Value: Number of CONFIRM messages received by this queue , + /// Value: The time between PUSH and CONFIRM (in nanoseconds). e_STAT_CONFIRM_TIME - // Value: The time between PUSH and CONFIRM (in nanoseconds). , + /// Value: Number of messages rejected by this queue (RDA + /// reaching zero) e_STAT_REJECT - // Value: Number of messages rejected by this queue (RDA - // reaching zero) , + /// Value: The time spent by the message in the queue (in + /// nanoseconds). e_STAT_QUEUE_TIME - // Value: The time spent by the message in the queue (in - // nanoseconds). , + /// Value: Accumulated bytes of all messages ever pushed from + /// the queue + /// Increment: Number of messages ever pushed from the queue e_STAT_PUSH - // Value: Accumulated bytes of all messages ever pushed from - // the queue - // Increment: Number of messages ever pushed from the queue , + /// Value: Accumulated bytes of all messages ever put in the + /// queue + /// Increment: Number of messages ever put in the queue e_STAT_PUT - // Value: Accumulated bytes of all messages ever put in the - // queue - // Increment: Number of messages ever put in the queue , + /// Value: Accumulated number of messages ever GC'ed in the + /// queue e_STAT_GC_MSGS - // Value: Accumulated number of messages ever GC'ed in the - // queue , + /// Value: Role (Unknown, Primary, Replica, Proxy) e_STAT_ROLE - // Value: Role (Unknown, Primary, Replica, Proxy) , + /// Value: The configured queue messages capacity e_CFG_MSGS - // Value: The configured queue messages capacity , + /// Value: The configured queue bytes capacity e_CFG_BYTES - // Value: The configured queue bytes capacity + , + /// Value: Accumulated number of messages in the strong + /// consistency queue expired before receiving quorum + /// Receipts e_STAT_NO_SC_MSGS - // Value: Accumulated number of messages in the strong - // consistency queue expired before receiving quorum - // Receipts }; }; @@ -168,20 +169,20 @@ struct ClientStats { e_STAT_ACK , + /// Value: Number of confirm messages delivered to the client e_STAT_CONFIRM - // Value: Number of confirm messages delivered to the client , + /// Value: Accumulated bytes of all messages ever pushed to + /// the client + /// Increments: Number of messages ever pushed to the client e_STAT_PUSH - // Value: Accumulated bytes of all messages ever pushed to - // the client - // Increments: Number of messages ever pushed to the client , + /// Value: Accumulated bytes of all messages ever received from + /// the client + /// Increments: Number of messages ever received from the client e_STAT_PUT - // Value: Accumulated bytes of all messages ever received from - // the client - // Increments: Number of messages ever received from the client }; };