Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat[Storagetool]: Add support of queueOp and journalOp records #531

Merged
merged 56 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ca179b8
Add seqnum and offset input args, add composite sequence number support
alexander-e1off Nov 1, 2024
f36e2f6
Debug binary search for all range types
alexander-e1off Nov 7, 2024
2ed441f
Reemove debug
alexander-e1off Nov 7, 2024
23ca939
Simplify getValue template
alexander-e1off Nov 8, 2024
33d990e
Add search result decorators for seq.number and offset, add UTs
alexander-e1off Nov 8, 2024
1bcf853
Fix hasCache() call
alexander-e1off Nov 11, 2024
25a805a
Fix code style
alexander-e1off Nov 11, 2024
9898b5d
Fix code style
alexander-e1off Nov 11, 2024
d00d17c
Add search for specific sequence numbers and offsets
alexander-e1off Nov 12, 2024
1435ed4
Cleanup
alexander-e1off Nov 12, 2024
f1d28fa
Fix README.md
alexander-e1off Nov 13, 2024
f69ee41
Fix typo in README.md
alexander-e1off Nov 13, 2024
6897db6
Update header caption
alexander-e1off Nov 14, 2024
08e4983
Fix review comments
alexander-e1off Nov 19, 2024
27194b8
Fix code style
alexander-e1off Nov 19, 2024
c8758ad
Reset errorDescr
alexander-e1off Nov 29, 2024
f76f2c4
Add support of queueOp and journalOp records
alexander-e1off Nov 21, 2024
dde22e7
Add UTs, update footer printing
alexander-e1off Nov 22, 2024
679a767
Move printRecord methods to RecordPrinter namespace
alexander-e1off Nov 22, 2024
2884c6f
Add printRecord method for queueOp record
alexander-e1off Nov 25, 2024
07ad0d0
Add flag to stop search when reached higher bound
alexander-e1off Nov 26, 2024
7e54445
Update README.md
alexander-e1off Nov 26, 2024
1277673
Revert "Add flag to stop search when reached higher bound"
alexander-e1off Nov 27, 2024
6badd16
Add flag to stop search when reached higher bound
alexander-e1off Nov 26, 2024
0b88430
Update README.md, fix code style
alexander-e1off Nov 27, 2024
2007548
Fix typo
alexander-e1off Nov 27, 2024
af2dad9
Cosmetic change: move validators to CommandLineArguments
alexander-e1off Nov 28, 2024
78480d7
Cleanup
alexander-e1off Nov 28, 2024
b32a02e
Update README.md
alexander-e1off Nov 28, 2024
b9a7b2d
Fix comment
alexander-e1off Nov 29, 2024
20f2dc6
Fix typo
alexander-e1off Nov 29, 2024
7fca78b
Fix search by exact offset/seqnum, add UTs
alexander-e1off Nov 29, 2024
7c234b9
Merge branch 'main' into storagetool-add-seqnum
alexander-e1off Dec 3, 2024
768b164
Fix merge conflicts
alexander-e1off Dec 3, 2024
465e609
Fix merge conflicts from storagetool-add-seqnum branch
alexander-e1off Dec 3, 2024
6ecd167
Fix merge conflicts
alexander-e1off Dec 3, 2024
10b24b5
Fix formatting
alexander-e1off Dec 3, 2024
70ffcb2
Fix merge conflicts from storagetool-add-seqnum branch
alexander-e1off Dec 3, 2024
8e63247
Rename k_JOURNALOP_TYPE constant
alexander-e1off Dec 3, 2024
eab1197
Merge remote-tracking branch 'upstream/main' into storagetool-add-seqnum
alexander-e1off Dec 4, 2024
3c7d12f
Merge remote-tracking branch 'upstream/main' into storagetool-add-seqnum
alexander-e1off Dec 5, 2024
228296e
Merge remote-tracking branch 'origin/storagetool-add-seqnum' into sup…
alexander-e1off Dec 5, 2024
5cba6f6
Fix rename typo
alexander-e1off Dec 5, 2024
561f4da
Merge remote-tracking branch 'upstream/main' into storagetool-add-seqnum
alexander-e1off Dec 6, 2024
a290ba0
Merge from main, fix conflicts
alexander-e1off Dec 16, 2024
e9cf8dd
Fix assert macro names in compositesequencenumber.t
alexander-e1off Dec 16, 2024
d17230a
Merge remote-tracking branch 'upstream/main' into storagetool-add-seqnum
alexander-e1off Dec 18, 2024
73cb8d0
Fix review comments: typos, doxygen comments
alexander-e1off Dec 19, 2024
b09207a
Merge from storagetool-add-seqnum branch, fix conflicts
alexander-e1off Dec 19, 2024
50056ec
Fix printer queueOp/journalOp record caption to be the same as for me…
alexander-e1off Dec 19, 2024
65a571d
Merge upstream/main, fix conflicts
alexander-e1off Dec 20, 2024
a8cf8e1
Cleanup
alexander-e1off Dec 20, 2024
15b5930
Fix stopSearch flag update
alexander-e1off Dec 27, 2024
53de424
Merge remote-tracking branch 'upstream/main' into support_all_records
alexander-e1off Jan 7, 2025
ac87fe5
Merge branch 'main' into support_all_records
chrisbeard Jan 7, 2025
101aa44
Merge branch 'main' into support_all_records
pniedzielski Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/applications/bmqstoragetool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ BMQStorageTool
==============

BMQStorageTool is a command-line tool for analyzing of BlazingMQ Broker storage
files. It allows to search messages in `journal` file with set of different
filters and output found message GUIDs or message detail information.
files. It allows to search records in `journal` file with set of different
filters and output found results in the short or detail form.
As input, a `journal` file (*.bmq_journal) is *always* required. To dump
payload, `data` file (*.bmq_data) is required. To filter by queue Uri, cluster
state ledger (CSL) file (*.bmq_csl) is required.
Expand All @@ -13,7 +13,8 @@ the project. From the command-line, there are a few options you can use when
invoking the tool.

```bash
Usage: bmqstoragetool [--journal-path <journal path>]
Usage: bmqstoragetool [-r|record-type <record type>]*
[--journal-path <journal path>]
[--journal-file <journal file>]
[--data-file <data file>]
[--csl-file <csl file>]
Expand All @@ -37,6 +38,8 @@ Usage: bmqstoragetool [--journal-path <journal path>]
[--summary]
[-h|help]
Where:
-r | --record-type <record type>
record type to search {message|queue-op|journal-op} (default: message)
--journal-path <pattern>
'*'-ended file path pattern, where the tool will try to find journal
and data files
Expand Down Expand Up @@ -104,6 +107,15 @@ Example:
bmqstoragetool --journal-file=<path>
```

Search and otput all queueOp/journalOp records or all records in journal file
--------------------------------------------------
Example:
```bash
bmqstoragetool --journal-file=<path> --record-type=queue-op
bmqstoragetool --journal-file=<path> --record-type=journal-op
bmqstoragetool --journal-file=<path> --record-type=journal-op --record-type=queue-op --record-type=message
```

Search and otput all messages details in journal file
-----------------------------------------------------
Example:
Expand Down
20 changes: 15 additions & 5 deletions src/applications/bmqstoragetool/bmqstoragetool.m.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

// BDE
#include <balcl_commandline.h>
#include <bdls_filesystemutil.h>
#include <bsl_iostream.h>
#include <bslma_managedptr.h>

Expand All @@ -38,6 +39,13 @@ static bool parseArgs(CommandLineArguments& arguments,
bool showHelp = false;

balcl::OptionInfo specTable[] = {
{"r|record-type",
"record type",
"record type to search {message|queue-op|journal-op}",
balcl::TypeInfo(&arguments.d_recordType,
CommandLineArguments::isValidRecordType),
balcl::OccurrenceInfo(bsl::vector<bsl::string>(
{CommandLineArguments::k_MESSAGE_TYPE}))},
{"journal-path",
"journal path",
"'*'-ended file path pattern, where the tool will try to find "
Expand All @@ -47,17 +55,20 @@ static bool parseArgs(CommandLineArguments& arguments,
{"journal-file",
"journal file",
"path to a .bmq_journal file",
balcl::TypeInfo(&arguments.d_journalFile),
balcl::TypeInfo(&arguments.d_journalFile,
CommandLineArguments::isValidFileName),
balcl::OccurrenceInfo::e_OPTIONAL},
{"data-file",
"data file",
"path to a .bmq_data file",
balcl::TypeInfo(&arguments.d_dataFile),
balcl::TypeInfo(&arguments.d_dataFile,
CommandLineArguments::isValidFileName),
balcl::OccurrenceInfo::e_OPTIONAL},
{"csl-file",
"csl file",
"path to a .bmq_csl file",
balcl::TypeInfo(&arguments.d_cslFile),
balcl::TypeInfo(&arguments.d_cslFile,
CommandLineArguments::isValidFileName),
balcl::OccurrenceInfo::e_OPTIONAL},
{"guid",
"guid",
Expand Down Expand Up @@ -148,8 +159,7 @@ static bool parseArgs(CommandLineArguments& arguments,
balcl::OccurrenceInfo(1024)},
{"summary",
"summary",
"summary of all matching messages (number of outstanding messages "
"and other statistics)",
"summary of all matching records",
balcl::TypeInfo(&arguments.d_summary),
balcl::OccurrenceInfo::e_OPTIONAL},
{"h|help",
Expand Down
43 changes: 28 additions & 15 deletions src/applications/bmqstoragetool/m_bmqstoragetool_filters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,20 @@ Filters::Filters(const bsl::vector<bsl::string>& queueKeys,
}
}

bool Filters::apply(const mqbs::MessageRecord& record,
bsls::Types::Uint64 offset) const
bool Filters::apply(const mqbs::RecordHeader& recordHeader,
bsls::Types::Uint64 recordOffset,
const mqbu::StorageKey& queueKey,
bool* highBoundReached_p) const
{
if (highBoundReached_p) {
*highBoundReached_p = false; // by default
}

// Apply `queue key` filter
if (!d_queueKeys.empty()) {
if (queueKey != mqbu::StorageKey::k_NULL_KEY && !d_queueKeys.empty()) {
// Match by queueKey
bsl::unordered_set<mqbu::StorageKey>::const_iterator it = bsl::find(
d_queueKeys.cbegin(),
d_queueKeys.cend(),
record.queueKey());
bsl::unordered_set<mqbu::StorageKey>::const_iterator it =
bsl::find(d_queueKeys.cbegin(), d_queueKeys.cend(), queueKey);
if (it == d_queueKeys.cend()) {
// Not matched
return false; // RETURN
Expand All @@ -71,32 +75,41 @@ bool Filters::apply(const mqbs::MessageRecord& record,
bsls::Types::Uint64 value, valueGt, valueLt;
switch (d_range.d_type) {
case Parameters::Range::e_TIMESTAMP:
value = record.header().timestamp();
value = recordHeader.timestamp();
valueGt = d_range.d_timestampGt;
valueLt = d_range.d_timestampLt;
break;
case Parameters::Range::e_OFFSET:
value = offset;
value = recordOffset;
valueGt = d_range.d_offsetGt;
valueLt = d_range.d_offsetLt;
break;
case Parameters::Range::e_SEQUENCE_NUM: {
CompositeSequenceNumber seqNum(record.header().primaryLeaseId(),
record.header().sequenceNumber());
CompositeSequenceNumber seqNum(recordHeader.primaryLeaseId(),
recordHeader.sequenceNumber());
const bool greaterOrEqualToHigherBound = d_range.d_seqNumLt.isSet() &&
d_range.d_seqNumLt <= seqNum;
if (highBoundReached_p && greaterOrEqualToHigherBound) {
*highBoundReached_p = true;
}

return !(
(d_range.d_seqNumGt.isSet() && seqNum <= d_range.d_seqNumGt) ||
(d_range.d_seqNumLt.isSet() &&
d_range.d_seqNumLt <= seqNum)); // RETURN
greaterOrEqualToHigherBound); // RETURN
} break;
default:
// No range filter defined
return true; // RETURN
}
if ((valueGt > 0 && value <= valueGt) ||
(valueLt > 0 && value >= valueLt)) {
const bool greaterOrEqualToHigherBound = valueLt > 0 && value >= valueLt;
if ((valueGt > 0 && value <= valueGt) || greaterOrEqualToHigherBound) {
if (highBoundReached_p && greaterOrEqualToHigherBound) {
*highBoundReached_p = true;
}
// Not inside range
return false; // RETURN
}

return true;
}

Expand Down
14 changes: 10 additions & 4 deletions src/applications/bmqstoragetool/m_bmqstoragetool_filters.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <m_bmqstoragetool_parameters.h>

// MQB
#include <mqbs_filestoreprotocol.h>
#include <mqbu_storagekey.h>

// BDE
Expand Down Expand Up @@ -57,10 +58,15 @@ class Filters {

// ACCESSORS

/// Apply filters at specified 'record' and return true if all filters
/// are matched, false otherwise.
bool apply(const mqbs::MessageRecord& record,
bsls::Types::Uint64 offset) const;
/// Apply filters at the record with the specified `recordHeader`,
/// `recordOffset` and `queueKey`. Return true if all filters are matched,
/// false otherwise. If the specified `highBoundReached_p` pointer is
/// present, pointed value is set to true if higher bound value is reached,
/// false otherwise.
bool apply(const mqbs::RecordHeader& recordHeader,
bsls::Types::Uint64 recordOffset,
const mqbu::StorageKey& queueKey,
bool* highBoundReached_p = 0) const;
};

} // close package namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,33 +197,70 @@ void JournalFileProcessor::process()
needMoveToLowerBound = false;
}

// MessageRecord
if (iter->recordType() == mqbs::RecordType::e_MESSAGE) {
const mqbs::MessageRecord& record = iter->asMessageRecord();
// Process Message records
if (d_parameters->d_processRecordTypes.d_message) {
// MessageRecord
if (iter->recordType() == mqbs::RecordType::e_MESSAGE) {
const mqbs::MessageRecord& record = iter->asMessageRecord();

// Apply filters
if (filters.apply(record, iter->recordOffset())) {
stopSearch = d_searchResult_p->processMessageRecord(
// Apply filters
if (filters.apply(iter->recordHeader(),
iter->recordOffset(),
record.queueKey())) {
stopSearch = d_searchResult_p->processMessageRecord(
record,
iter->recordIndex(),
iter->recordOffset());
}
}
// ConfirmRecord
else if (iter->recordType() == mqbs::RecordType::e_CONFIRM) {
const mqbs::ConfirmRecord& record = iter->asConfirmRecord();
stopSearch = d_searchResult_p->processConfirmRecord(
record,
iter->recordIndex(),
iter->recordOffset());
}
// DeletionRecord
else if (iter->recordType() == mqbs::RecordType::e_DELETION) {
const mqbs::DeletionRecord& record = iter->asDeletionRecord();
stopSearch = d_searchResult_p->processDeletionRecord(
record,
iter->recordIndex(),
iter->recordOffset());
}
}
// ConfirmRecord
else if (iter->recordType() == mqbs::RecordType::e_CONFIRM) {
const mqbs::ConfirmRecord& record = iter->asConfirmRecord();
stopSearch = d_searchResult_p->processConfirmRecord(
record,
iter->recordIndex(),
iter->recordOffset());
// Process QueueOp record
if (d_parameters->d_processRecordTypes.d_queueOp &&
iter->recordType() == mqbs::RecordType::e_QUEUE_OP) {
const mqbs::QueueOpRecord& record = iter->asQueueOpRecord();

// Apply filters
if (filters.apply(iter->recordHeader(),
iter->recordOffset(),
record.queueKey(),
&stopSearch)) {
stopSearch = d_searchResult_p->processQueueOpRecord(
record,
iter->recordIndex(),
iter->recordOffset());
}
}
// DeletionRecord
else if (iter->recordType() == mqbs::RecordType::e_DELETION) {
const mqbs::DeletionRecord& record = iter->asDeletionRecord();
stopSearch = d_searchResult_p->processDeletionRecord(
record,
iter->recordIndex(),
iter->recordOffset());
// Process JournalOp record
if (d_parameters->d_processRecordTypes.d_journalOp &&
iter->recordType() == mqbs::RecordType::e_JOURNAL_OP) {
const mqbs::JournalOpRecord& record = iter->asJournalOpRecord();

// Apply filters
if (filters.apply(iter->recordHeader(),
iter->recordOffset(),
mqbu::StorageKey::k_NULL_KEY,
&stopSearch)) {
stopSearch = d_searchResult_p->processJournalOpRecord(
record,
iter->recordIndex(),
iter->recordOffset());
}
}
}
}
Expand Down
Loading
Loading