Skip to content

Commit

Permalink
fix unittest
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Oct 15, 2024
1 parent f00f1af commit fb4e67e
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 152 deletions.
1 change: 0 additions & 1 deletion core/file_server/ContainerInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ bool ContainerInfo::ParseAllByJSONObj(const Json::Value& paramsAll,
}

bool ContainerInfo::ParseByJSONObj(const Json::Value& params, ContainerInfo& containerInfo, std::string& errorMsg) {
bool isOldCheckpoint = !params.isMember("MetaDatas");
containerInfo.mJson = params;
if (params.isMember("ID") && params["ID"].isString()) {
if (params["ID"].empty()) {
Expand Down
20 changes: 11 additions & 9 deletions core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2463,16 +2463,18 @@ void LogFileReader::SetEventGroupMetaAndTag(PipelineEventGroup& group) {
}
// 3. container name tag, external k8s env/label tag
auto containerExtraTags = GetContainerExtraTags();
for (size_t i = 0; i < containerExtraTags->size(); ++i) {
auto key = ContainerInfo::GetFileTagKey((*containerExtraTags)[i].key());
if (key != TagKey::UNKOWN) { // container name tag
auto keyName = mTagConfig.first->GetFileTagKeyName(key);
if (!keyName.empty()) {
StringBuffer b = group.GetSourceBuffer()->CopyString((*containerExtraTags)[i].value());
group.SetTagNoCopy(keyName, StringView(b.data, b.size));
if (containerExtraTags) {
for (size_t i = 0; i < containerExtraTags->size(); ++i) {
auto key = ContainerInfo::GetFileTagKey((*containerExtraTags)[i].key());
if (key != TagKey::UNKOWN) { // container name tag
auto keyName = mTagConfig.first->GetFileTagKeyName(key);
if (!keyName.empty()) {
StringBuffer b = group.GetSourceBuffer()->CopyString((*containerExtraTags)[i].value());
group.SetTagNoCopy(keyName, StringView(b.data, b.size));
}
} else { // external k8s env/label tag
group.SetTag((*containerExtraTags)[i].key(), (*containerExtraTags)[i].value());
}
} else { // external k8s env/label tag
group.SetTag((*containerExtraTags)[i].key(), (*containerExtraTags)[i].value());
}
}
// 4. inode
Expand Down
5 changes: 3 additions & 2 deletions core/pipeline/GlobalConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <cstdint>
#include <string>
#include <unordered_map>
#include <unordered_set>

namespace logtail {
Expand All @@ -38,8 +39,8 @@ struct GlobalConfig {
uint32_t mProcessPriority = 0;
bool mEnableTimestampNanosecond = false;
bool mUsingOldContentTag = false;
Json::Value mPipelineMetaTagKey;
Json::Value mAgentEnvMetaTagKey;
std::unordered_map<std::string, std::string> mPipelineMetaTagKey;
std::unordered_map<std::string, std::string> mAgentEnvMetaTagKey;
};

} // namespace logtail
12 changes: 10 additions & 2 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,16 @@ void Pipeline::CopyNativeGlobalParamToGoPipeline(Json::Value& pipeline) {
Json::Value& global = pipeline["global"];
global["EnableTimestampNanosecond"] = mContext.GetGlobalConfig().mEnableTimestampNanosecond;
global["UsingOldContentTag"] = mContext.GetGlobalConfig().mUsingOldContentTag;
global["PipelineMetaTagKey"] = mContext.GetGlobalConfig().mPipelineMetaTagKey;
global["AgentEnvMetaTagKey"] = mContext.GetGlobalConfig().mAgentEnvMetaTagKey;
Json::Value pipelineMetaTagKey;
for (const auto& kv : mContext.GetGlobalConfig().mPipelineMetaTagKey) {
pipelineMetaTagKey[kv.first] = kv.second;
}
global["PipelineMetaTagKey"] = pipelineMetaTagKey;
Json::Value agentEnvMetaTagKey;
for (const auto& kv : mContext.GetGlobalConfig().mAgentEnvMetaTagKey) {
agentEnvMetaTagKey[kv.first] = kv.second;
}
global["AgentEnvMetaTagKey"] = agentEnvMetaTagKey;
}
}

Expand Down
1 change: 0 additions & 1 deletion core/pipeline/serializer/SLSSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include "pipeline/serializer/SLSSerializer.h"

#include "application/Application.h"
#include "common/Flags.h"
#include "common/TimeUtil.h"
#include "common/compression/CompressType.h"
Expand Down
7 changes: 5 additions & 2 deletions core/pipeline/serializer/SLSSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <string>
#include <vector>

#include "common/TagConstants.h"
#include "pipeline/serializer/Serializer.h"

namespace logtail {
Expand All @@ -38,8 +39,10 @@ struct CompressedLogGroup {
CompressedLogGroup(std::string&& data, size_t rawSize) : mData(std::move(data)), mRawSize(rawSize) {}
};

template<>
bool Serializer<std::vector<CompressedLogGroup>>::DoSerialize(std::vector<CompressedLogGroup>&& p, std::string& output, std::string& errorMsg);
template <>
bool Serializer<std::vector<CompressedLogGroup>>::DoSerialize(std::vector<CompressedLogGroup>&& p,
std::string& output,
std::string& errorMsg);

class SLSEventGroupListSerializer : public Serializer<std::vector<CompressedLogGroup>> {
public:
Expand Down
46 changes: 36 additions & 10 deletions core/unittest/event_handler/ModifyHandlerUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ class ModifyHandlerUnittest : public ::testing::Test {
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(0, 0, ctx);

// build a reader
mReaderPtr = std::make_shared<LogFileReader>(
gRootDir, gLogName, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
mReaderPtr = std::make_shared<LogFileReader>(gRootDir,
gLogName,
DevInode(),
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
mReaderPtr->UpdateReaderManual();
APSARA_TEST_TRUE_FATAL(mReaderPtr->CheckFileSignatureAndOffset(true));

Expand All @@ -140,6 +144,7 @@ class ModifyHandlerUnittest : public ::testing::Test {
FileDiscoveryOptions discoveryOpts;
FileReaderOptions readerOpts;
MultilineOptions multilineOpts;
FileTagOptions tagOpts;
PipelineContext ctx;
FileDiscoveryConfig mConfig;

Expand Down Expand Up @@ -214,17 +219,25 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::string logPath1 = logPath + ".1";
writeLog(logPath1, "a sample log\n");
auto devInode1 = GetFileDevInode(logPath1);
auto reader1 = std::make_shared<LogFileReader>(
gRootDir, basicLogName, devInode1, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
auto reader1 = std::make_shared<LogFileReader>(gRootDir,
basicLogName,
devInode1,
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader1->mRealLogPath = logPath1;
reader1->mLastFileSignatureSize = sigSize;
reader1->mLastFileSignatureHash = sigHash;

std::string logPath2 = logPath + ".2";
writeLog(logPath2, "a sample log\n");
auto devInode2 = GetFileDevInode(logPath2);
auto reader2 = std::make_shared<LogFileReader>(
gRootDir, basicLogName, devInode2, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
auto reader2 = std::make_shared<LogFileReader>(gRootDir,
basicLogName,
devInode2,
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader2->mRealLogPath = logPath2;
reader2->mLastFileSignatureSize = sigSize;
reader2->mLastFileSignatureHash = sigHash;
Expand All @@ -240,17 +253,25 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::string logPath3 = logPath + ".3";
writeLog(logPath3, "a sample log\n");
auto devInode3 = GetFileDevInode(logPath3);
auto reader3 = std::make_shared<LogFileReader>(
gRootDir, basicLogName, devInode3, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
auto reader3 = std::make_shared<LogFileReader>(gRootDir,
basicLogName,
devInode3,
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader3->mRealLogPath = logPath3;
reader3->mLastFileSignatureSize = sigSize;
reader3->mLastFileSignatureHash = sigHash;

std::string logPath4 = logPath + ".4";
writeLog(logPath4, "a sample log\n");
auto devInode4 = GetFileDevInode(logPath4);
auto reader4 = std::make_shared<LogFileReader>(
gRootDir, basicLogName, devInode4, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
auto reader4 = std::make_shared<LogFileReader>(gRootDir,
basicLogName,
devInode4,
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader4->mRealLogPath = logPath4;
reader4->mLastFileSignatureSize = sigSize;
reader4->mLastFileSignatureHash = sigHash;
Expand All @@ -269,6 +290,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&discoveryOpts, &ctx),
std::make_pair(&tagOpts, &ctx),
0,
false);
// recover reader from checkpoint, random order
Expand All @@ -278,6 +300,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&discoveryOpts, &ctx),
std::make_pair(&tagOpts, &ctx),
0,
false);
handlerPtr->CreateLogFileReaderPtr(gRootDir,
Expand All @@ -286,6 +309,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&discoveryOpts, &ctx),
std::make_pair(&tagOpts, &ctx),
0,
false);
handlerPtr->CreateLogFileReaderPtr(gRootDir,
Expand All @@ -294,6 +318,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&discoveryOpts, &ctx),
std::make_pair(&tagOpts, &ctx),
0,
false);
handlerPtr->CreateLogFileReaderPtr(gRootDir,
Expand All @@ -302,6 +327,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&discoveryOpts, &ctx),
std::make_pair(&tagOpts, &ctx),
0,
false);
APSARA_TEST_EQUAL_FATAL(handlerPtr->mNameReaderMap.size(), 1);
Expand Down
20 changes: 10 additions & 10 deletions core/unittest/flusher/FlusherSLSUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ void FlusherSLSUnittest::TestSend() {
// replayed group
PipelineEventGroup group(make_shared<SourceBuffer>());
group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname");
group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname");
group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
Expand Down Expand Up @@ -632,7 +632,7 @@ void FlusherSLSUnittest::TestSend() {
APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
APSARA_TEST_EQUAL(2, logGroup.logtags_size());
APSARA_TEST_EQUAL("__hostname__", logGroup.logtags(0).key());
APSARA_TEST_EQUAL(TagDefaultKey[TagKey::HOST_NAME], logGroup.logtags(0).key());
APSARA_TEST_EQUAL("hostname", logGroup.logtags(0).value());
APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(1).key());
APSARA_TEST_EQUAL(1, logGroup.logs_size());
Expand All @@ -648,7 +648,7 @@ void FlusherSLSUnittest::TestSend() {
flusher.mBatcher.GetEventFlushStrategy().SetMaxCnt(1);
PipelineEventGroup group(make_shared<SourceBuffer>());
group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname");
group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname");
group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
Expand Down Expand Up @@ -686,7 +686,7 @@ void FlusherSLSUnittest::TestSend() {
APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
APSARA_TEST_EQUAL(2, logGroup.logtags_size());
APSARA_TEST_EQUAL("__hostname__", logGroup.logtags(0).key());
APSARA_TEST_EQUAL(TagDefaultKey[TagKey::HOST_NAME], logGroup.logtags(0).key());
APSARA_TEST_EQUAL("hostname", logGroup.logtags(0).value());
APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(1).key());
APSARA_TEST_EQUAL(1, logGroup.logs_size());
Expand Down Expand Up @@ -733,7 +733,7 @@ void FlusherSLSUnittest::TestSend() {
flusher.mBatcher.GetEventFlushStrategy().SetMaxCnt(1);
PipelineEventGroup group(make_shared<SourceBuffer>());
group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname");
group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname");
group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
Expand Down Expand Up @@ -766,7 +766,7 @@ void FlusherSLSUnittest::TestSend() {
APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
APSARA_TEST_EQUAL(3, logGroup.logtags_size());
APSARA_TEST_EQUAL("__hostname__", logGroup.logtags(0).key());
APSARA_TEST_EQUAL(TagDefaultKey[TagKey::HOST_NAME], logGroup.logtags(0).key());
APSARA_TEST_EQUAL("hostname", logGroup.logtags(0).value());
APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(1).key());
APSARA_TEST_EQUAL("tag_key", logGroup.logtags(2).key());
Expand Down Expand Up @@ -815,7 +815,7 @@ void FlusherSLSUnittest::TestSend() {

PipelineEventGroup group(make_shared<SourceBuffer>());
group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname");
group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname");
group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
Expand Down Expand Up @@ -870,7 +870,7 @@ void FlusherSLSUnittest::TestSend() {
APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
APSARA_TEST_EQUAL(2, logGroup.logtags_size());
APSARA_TEST_EQUAL("__hostname__", logGroup.logtags(0).key());
APSARA_TEST_EQUAL(TagDefaultKey[TagKey::HOST_NAME], logGroup.logtags(0).key());
APSARA_TEST_EQUAL("hostname", logGroup.logtags(0).value());
APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(1).key());
APSARA_TEST_EQUAL(1, logGroup.logs_size());
Expand Down Expand Up @@ -917,7 +917,7 @@ void FlusherSLSUnittest::TestFlush() {

PipelineEventGroup group(make_shared<SourceBuffer>());
group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname");
group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname");
group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
Expand Down Expand Up @@ -961,7 +961,7 @@ void FlusherSLSUnittest::TestFlushAll() {

PipelineEventGroup group(make_shared<SourceBuffer>());
group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname");
group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname");
group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
Expand Down
Loading

0 comments on commit fb4e67e

Please sign in to comment.