Skip to content

Commit

Permalink
fix unittest
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Oct 15, 2024
1 parent f00f1af commit 58d36f4
Show file tree
Hide file tree
Showing 28 changed files with 565 additions and 253 deletions.
7 changes: 6 additions & 1 deletion core/file_server/ContainerInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const std::vector<std::string> containerNameTag = {
"_namespace_",
"_pod_uid_",
"_container_ip_",
"_k8s_image_name_",
"_k8s_container_name_",
"_k8s_container_ip_",
};

const std::vector<TagKey> containerNameTagKey = {
Expand All @@ -37,6 +40,9 @@ const std::vector<TagKey> containerNameTagKey = {
TagKey::K8S_NAMESPACE_TAG_KEY,
TagKey::K8S_POD_UID_TAG_KEY,
TagKey::CONTAINER_IP_TAG_KEY,
TagKey::K8S_CONTAINER_IMAGE_NAME_TAG_KEY,
TagKey::K8S_CONTAINER_NAME_TAG_KEY,
TagKey::K8S_CONTAINER_IP_TAG_KEY,
};

bool ContainerInfo::ParseAllByJSONObj(const Json::Value& paramsAll,
Expand Down Expand Up @@ -68,7 +74,6 @@ bool ContainerInfo::ParseAllByJSONObj(const Json::Value& paramsAll,
}

bool ContainerInfo::ParseByJSONObj(const Json::Value& params, ContainerInfo& containerInfo, std::string& errorMsg) {
bool isOldCheckpoint = !params.isMember("MetaDatas");
containerInfo.mJson = params;
if (params.isMember("ID") && params["ID"].isString()) {
if (params["ID"].empty()) {
Expand Down
13 changes: 11 additions & 2 deletions core/file_server/FileTagOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ bool FileTagOptions::Init(const Json::Value& config,
}
}

// the priority of FileOffsetKey and FileOffsetTagKey is higher than appendingLogPositionMeta
// the priority of FileOffsetKey and FileInodeTagKey is higher than appendingLogPositionMeta
if (config.isMember("FileOffsetKey") || tagConfig->isMember("FileOffsetTagKey")) {
parseDefaultNotAddTag(config, "FileOffsetKey", TagKey::FILE_OFFSET_KEY, context, pluginType);
parseDefaultNotAddTag(config, "FileOffsetTagKey", TagKey::FILE_OFFSET_KEY, context, pluginType);
parseDefaultNotAddTag(config, "FileInodeTagKey", TagKey::FILE_INODE_TAG_KEY, context, pluginType);
} else if (appendingLogPositionMeta) {
mFileTags[TagKey::FILE_OFFSET_KEY] = TagDefaultKey[TagKey::FILE_OFFSET_KEY];
mFileTags[TagKey::FILE_INODE_TAG_KEY] = TagDefaultKey[TagKey::FILE_INODE_TAG_KEY];
Expand All @@ -82,6 +82,15 @@ bool FileTagOptions::Init(const Json::Value& config,
return true;
}

StringView FileTagOptions::GetFileTagKeyName(TagKey key) const {
auto it = mFileTags.find(key);
if (it != mFileTags.end()) {
// FileTagOption will not be deconstructed or changed before all event be sent
return StringView(it->second.c_str(), it->second.size());
}
return StringView();
}

void FileTagOptions::parseDefaultAddTag(const Json::Value& config,
const std::string& keyName,
const TagKey& keyEnum,
Expand Down
8 changes: 1 addition & 7 deletions core/file_server/FileTagOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ class FileTagOptions {
const PipelineContext& context,
const std::string& pluginType,
bool enableContainerDiscovery);
StringView GetFileTagKeyName(TagKey key) const {
if (mFileTags.find(key) != mFileTags.end()) {
// FileTagOption will not be deconstructed or changed before all event be sent
return StringView(mFileTags.at(key).c_str(), mFileTags.at(key).size());
}
return StringView();
}
StringView GetFileTagKeyName(TagKey key) const;


private:
Expand Down
30 changes: 19 additions & 11 deletions core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2446,9 +2446,11 @@ void LogFileReader::SetEventGroupMetaAndTag(PipelineEventGroup& group) {
group.SetMetadata(EventGroupMetaKey::LOG_FILE_INODE, ToString(GetDevInode().inode));
}
group.SetMetadata(EventGroupMetaKey::SOURCE_ID, GetSourceId());
auto offsetKey = mTagConfig.first->GetFileTagKeyName(TagKey::FILE_OFFSET_KEY);
if (!offsetKey.empty()) {
group.SetMetadata(EventGroupMetaKey::LOG_FILE_OFFSET_KEY, offsetKey);
if (mTagConfig.first != nullptr) {
auto offsetKey = mTagConfig.first->GetFileTagKeyName(TagKey::FILE_OFFSET_KEY);
if (!offsetKey.empty()) {
group.SetMetadata(EventGroupMetaKey::LOG_FILE_OFFSET_KEY, offsetKey);
}
}

// we store info which users can see in tags
Expand All @@ -2463,16 +2465,22 @@ void LogFileReader::SetEventGroupMetaAndTag(PipelineEventGroup& group) {
}
// 3. container name tag, external k8s env/label tag
auto containerExtraTags = GetContainerExtraTags();
for (size_t i = 0; i < containerExtraTags->size(); ++i) {
auto key = ContainerInfo::GetFileTagKey((*containerExtraTags)[i].key());
if (key != TagKey::UNKOWN) { // container name tag
auto keyName = mTagConfig.first->GetFileTagKeyName(key);
if (!keyName.empty()) {
if (containerExtraTags) {
for (size_t i = 0; i < containerExtraTags->size(); ++i) {
auto key = ContainerInfo::GetFileTagKey((*containerExtraTags)[i].key());
if (key != TagKey::UNKOWN) { // container name tag
StringBuffer b = group.GetSourceBuffer()->CopyString((*containerExtraTags)[i].value());
group.SetTagNoCopy(keyName, StringView(b.data, b.size));
if (mTagConfig.first == nullptr) { // no tag config
group.SetTagNoCopy(TagDefaultKey[key], StringView(b.data, b.size));
} else {
auto keyName = mTagConfig.first->GetFileTagKeyName(key);
if (!keyName.empty()) {
group.SetTagNoCopy(keyName, StringView(b.data, b.size));
}
}
} else { // external k8s env/label tag
group.SetTag((*containerExtraTags)[i].key(), (*containerExtraTags)[i].value());
}
} else { // external k8s env/label tag
group.SetTag((*containerExtraTags)[i].key(), (*containerExtraTags)[i].value());
}
}
// 4. inode
Expand Down
3 changes: 3 additions & 0 deletions core/models/PipelineEventGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ const string EVENT_GROUP_META_LOG_FILE_PATH_RESOLVED = "log.file.path_resolved";
const string EVENT_GROUP_META_LOG_FILE_INODE = "log.file.inode";
const string EVENT_GROUP_META_CONTAINER_TYPE = "container.type";
const string EVENT_GROUP_META_HAS_PART_LOG = "has.part.log";
const string EVENT_GROUP_META_LOG_FILE_OFFSET = "log.file.offset";

const string EVENT_GROUP_META_K8S_CLUSTER_ID = "k8s.cluster.id";
const string EVENT_GROUP_META_K8S_NODE_NAME = "k8s.node.name";
Expand Down Expand Up @@ -219,6 +220,8 @@ const string& EventGroupMetaKeyToString(EventGroupMetaKey key) {
return EVENT_GROUP_META_CONTAINER_TYPE;
case EventGroupMetaKey::HAS_PART_LOG:
return EVENT_GROUP_META_HAS_PART_LOG;
case EventGroupMetaKey::LOG_FILE_OFFSET_KEY:
return EVENT_GROUP_META_LOG_FILE_OFFSET;
default:
static string sEmpty = "unknown";
return sEmpty;
Expand Down
5 changes: 3 additions & 2 deletions core/pipeline/GlobalConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <cstdint>
#include <string>
#include <unordered_map>
#include <unordered_set>

namespace logtail {
Expand All @@ -38,8 +39,8 @@ struct GlobalConfig {
uint32_t mProcessPriority = 0;
bool mEnableTimestampNanosecond = false;
bool mUsingOldContentTag = false;
Json::Value mPipelineMetaTagKey;
Json::Value mAgentEnvMetaTagKey;
std::unordered_map<std::string, std::string> mPipelineMetaTagKey;
std::unordered_map<std::string, std::string> mAgentEnvMetaTagKey;
};

} // namespace logtail
12 changes: 10 additions & 2 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,16 @@ void Pipeline::CopyNativeGlobalParamToGoPipeline(Json::Value& pipeline) {
Json::Value& global = pipeline["global"];
global["EnableTimestampNanosecond"] = mContext.GetGlobalConfig().mEnableTimestampNanosecond;
global["UsingOldContentTag"] = mContext.GetGlobalConfig().mUsingOldContentTag;
global["PipelineMetaTagKey"] = mContext.GetGlobalConfig().mPipelineMetaTagKey;
global["AgentEnvMetaTagKey"] = mContext.GetGlobalConfig().mAgentEnvMetaTagKey;
Json::Value pipelineMetaTagKey;
for (const auto& kv : mContext.GetGlobalConfig().mPipelineMetaTagKey) {
pipelineMetaTagKey[kv.first] = kv.second;
}
global["PipelineMetaTagKey"] = pipelineMetaTagKey;
Json::Value agentEnvMetaTagKey;
for (const auto& kv : mContext.GetGlobalConfig().mAgentEnvMetaTagKey) {
agentEnvMetaTagKey[kv.first] = kv.second;
}
global["AgentEnvMetaTagKey"] = agentEnvMetaTagKey;
}
}

Expand Down
1 change: 0 additions & 1 deletion core/pipeline/serializer/SLSSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include "pipeline/serializer/SLSSerializer.h"

#include "application/Application.h"
#include "common/Flags.h"
#include "common/TimeUtil.h"
#include "common/compression/CompressType.h"
Expand Down
7 changes: 5 additions & 2 deletions core/pipeline/serializer/SLSSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <string>
#include <vector>

#include "common/TagConstants.h"
#include "pipeline/serializer/Serializer.h"

namespace logtail {
Expand All @@ -38,8 +39,10 @@ struct CompressedLogGroup {
CompressedLogGroup(std::string&& data, size_t rawSize) : mData(std::move(data)), mRawSize(rawSize) {}
};

template<>
bool Serializer<std::vector<CompressedLogGroup>>::DoSerialize(std::vector<CompressedLogGroup>&& p, std::string& output, std::string& errorMsg);
template <>
bool Serializer<std::vector<CompressedLogGroup>>::DoSerialize(std::vector<CompressedLogGroup>&& p,
std::string& output,
std::string& errorMsg);

class SLSEventGroupListSerializer : public Serializer<std::vector<CompressedLogGroup>> {
public:
Expand Down
17 changes: 15 additions & 2 deletions core/plugin/input/InputFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,21 @@ bool InputFile::Init(const Json::Value& config, Json::Value& optionalGoPipeline)
}

// Tag
if (!mFileTag.Init(config, *mContext, sName, mEnableContainerDiscovery)) {
return false;
const char* tagKey = "Tags";
const Json::Value* tagItr = config.find(tagKey, tagKey + strlen(tagKey));
if (tagItr) {
if (!tagItr->isObject()) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
"param Tags is not of type object",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
} else if (!mFileTag.Init(*tagItr, *mContext, sName, mEnableContainerDiscovery)) {
return false;
}
}

// MaxCheckpointDirSearchDepth
Expand Down
46 changes: 36 additions & 10 deletions core/unittest/event_handler/ModifyHandlerUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ class ModifyHandlerUnittest : public ::testing::Test {
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(0, 0, ctx);

// build a reader
mReaderPtr = std::make_shared<LogFileReader>(
gRootDir, gLogName, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
mReaderPtr = std::make_shared<LogFileReader>(gRootDir,
gLogName,
DevInode(),
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
mReaderPtr->UpdateReaderManual();
APSARA_TEST_TRUE_FATAL(mReaderPtr->CheckFileSignatureAndOffset(true));

Expand All @@ -140,6 +144,7 @@ class ModifyHandlerUnittest : public ::testing::Test {
FileDiscoveryOptions discoveryOpts;
FileReaderOptions readerOpts;
MultilineOptions multilineOpts;
FileTagOptions tagOpts;
PipelineContext ctx;
FileDiscoveryConfig mConfig;

Expand Down Expand Up @@ -214,17 +219,25 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::string logPath1 = logPath + ".1";
writeLog(logPath1, "a sample log\n");
auto devInode1 = GetFileDevInode(logPath1);
auto reader1 = std::make_shared<LogFileReader>(
gRootDir, basicLogName, devInode1, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
auto reader1 = std::make_shared<LogFileReader>(gRootDir,
basicLogName,
devInode1,
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader1->mRealLogPath = logPath1;
reader1->mLastFileSignatureSize = sigSize;
reader1->mLastFileSignatureHash = sigHash;

std::string logPath2 = logPath + ".2";
writeLog(logPath2, "a sample log\n");
auto devInode2 = GetFileDevInode(logPath2);
auto reader2 = std::make_shared<LogFileReader>(
gRootDir, basicLogName, devInode2, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
auto reader2 = std::make_shared<LogFileReader>(gRootDir,
basicLogName,
devInode2,
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader2->mRealLogPath = logPath2;
reader2->mLastFileSignatureSize = sigSize;
reader2->mLastFileSignatureHash = sigHash;
Expand All @@ -240,17 +253,25 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::string logPath3 = logPath + ".3";
writeLog(logPath3, "a sample log\n");
auto devInode3 = GetFileDevInode(logPath3);
auto reader3 = std::make_shared<LogFileReader>(
gRootDir, basicLogName, devInode3, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
auto reader3 = std::make_shared<LogFileReader>(gRootDir,
basicLogName,
devInode3,
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader3->mRealLogPath = logPath3;
reader3->mLastFileSignatureSize = sigSize;
reader3->mLastFileSignatureHash = sigHash;

std::string logPath4 = logPath + ".4";
writeLog(logPath4, "a sample log\n");
auto devInode4 = GetFileDevInode(logPath4);
auto reader4 = std::make_shared<LogFileReader>(
gRootDir, basicLogName, devInode4, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
auto reader4 = std::make_shared<LogFileReader>(gRootDir,
basicLogName,
devInode4,
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader4->mRealLogPath = logPath4;
reader4->mLastFileSignatureSize = sigSize;
reader4->mLastFileSignatureHash = sigHash;
Expand All @@ -269,6 +290,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&discoveryOpts, &ctx),
std::make_pair(&tagOpts, &ctx),
0,
false);
// recover reader from checkpoint, random order
Expand All @@ -278,6 +300,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&discoveryOpts, &ctx),
std::make_pair(&tagOpts, &ctx),
0,
false);
handlerPtr->CreateLogFileReaderPtr(gRootDir,
Expand All @@ -286,6 +309,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&discoveryOpts, &ctx),
std::make_pair(&tagOpts, &ctx),
0,
false);
handlerPtr->CreateLogFileReaderPtr(gRootDir,
Expand All @@ -294,6 +318,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&discoveryOpts, &ctx),
std::make_pair(&tagOpts, &ctx),
0,
false);
handlerPtr->CreateLogFileReaderPtr(gRootDir,
Expand All @@ -302,6 +327,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() {
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&discoveryOpts, &ctx),
std::make_pair(&tagOpts, &ctx),
0,
false);
APSARA_TEST_EQUAL_FATAL(handlerPtr->mNameReaderMap.size(), 1);
Expand Down
Loading

0 comments on commit 58d36f4

Please sign in to comment.