Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: pipeline update unittest #1991

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ class AppConfig {
friend class InputPrometheusUnittest;
friend class InputContainerStdioUnittest;
friend class BatcherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
6 changes: 5 additions & 1 deletion core/common/http/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class HttpResponse {
HttpResponse()
: mHeader(compareHeader),
mBody(new std::string(), [](void* p) { delete static_cast<std::string*>(p); }),
mWriteCallback(DefaultWriteCallback) {};
mWriteCallback(DefaultWriteCallback){};
HttpResponse(void* body,
const std::function<void(void*)>& bodyDeleter,
size_t (*callback)(char*, size_t, size_t, void*))
Expand Down Expand Up @@ -155,6 +155,10 @@ class HttpResponse {
std::map<std::string, std::string, decltype(compareHeader)*> mHeader;
std::unique_ptr<void, std::function<void(void*)>> mBody;
size_t (*mWriteCallback)(char*, size_t, size_t, void*) = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class HttpSinkMock;
#endif
};

} // namespace logtail
9 changes: 8 additions & 1 deletion core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ bool PipelineConfig::Parse() {
if (pluginType == "input_file" || pluginType == "input_container_stdio") {
hasFileInput = true;
}
#ifdef APSARA_UNIT_TEST_MAIN
if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) {
hasFileInput = true;
}
#endif
}
// TODO: remove these special restrictions
if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
Expand Down Expand Up @@ -530,7 +535,9 @@ bool PipelineConfig::Parse() {
}
mRouter.emplace_back(i, itr);
} else {
mRouter.emplace_back(i, nullptr);
if (!IsFlushingThroughGoPipelineExisted()) {
mRouter.emplace_back(i, nullptr);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {

void LogInput::UpdateCriticalMetric(int32_t curTime) {
mLastRunTime->Set(mLastReadEventTime.load());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
mRegisterdHandlersTotal->Set(EventDispatcher::GetInstance()->GetHandlerCount());
mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount());
mEventProcessCount = 0;
Expand Down Expand Up @@ -529,6 +530,7 @@ Event* LogInput::PopEventQueue() {
#ifdef APSARA_UNIT_TEST_MAIN
void LogInput::CleanEnviroments() {
mIdleFlag = true;
mInteruptFlag = true;
usleep(100 * 1000);
while (true) {
Event* ev = PopEventQueue();
Expand Down
28 changes: 28 additions & 0 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include "pipeline/PipelineManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "provider/Provider.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/LogtailPluginMock.h"
#endif

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
Expand Down Expand Up @@ -86,6 +89,7 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
const std::string& logstore,
const std::string& region,
logtail::QueueKey logstoreKey) {
#ifndef APSARA_UNIT_TEST_MAIN
if (!mPluginValid) {
LoadPluginBase();
}
Expand All @@ -110,9 +114,14 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
}

return false;
#else
return LogtailPluginMock::GetInstance()->LoadPipeline(
pipelineName, pipeline, project, logstore, region, logstoreKey);
#endif
}

bool LogtailPlugin::UnloadPipeline(const std::string& pipelineName) {
#ifndef APSARA_UNIT_TEST_MAIN
if (!mPluginValid) {
LOG_ERROR(sLogger, ("UnloadPipeline", "plugin not valid"));
return false;
Expand All @@ -128,9 +137,13 @@ bool LogtailPlugin::UnloadPipeline(const std::string& pipelineName) {
}

return false;
#else
return LogtailPluginMock::GetInstance()->UnloadPipeline(pipelineName);
#endif
}

void LogtailPlugin::StopAllPipelines(bool withInputFlag) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStopAllPipelinesFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop all", "starts"));
auto stopAllStart = GetCurrentTimeInMilliSeconds();
Expand All @@ -142,9 +155,13 @@ void LogtailPlugin::StopAllPipelines(bool withInputFlag) {
"Stopping all Go pipelines took " + ToString(stopAllCost) + "ms");
}
}
#else
LogtailPluginMock::GetInstance()->StopAllPipelines(withInputFlag);
#endif
}

void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStopFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop", "starts")("config", configName));
auto stopStart = GetCurrentTimeInMilliSeconds();
Expand All @@ -159,6 +176,9 @@ void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
HOLD_ON_TOO_SLOW_ALARM, "Stopping Go pipeline " + configName + " took " + ToString(stopCost) + "ms");
}
}
#else
LogtailPluginMock::GetInstance()->Stop(configName, removedFlag);
#endif
}

void LogtailPlugin::StopBuiltInModules() {
Expand All @@ -170,6 +190,7 @@ void LogtailPlugin::StopBuiltInModules() {
}

void LogtailPlugin::Start(const std::string& configName) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStartFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines start", "starts")("config name", configName));
GoString goConfigName;
Expand All @@ -178,6 +199,9 @@ void LogtailPlugin::Start(const std::string& configName) {
mStartFun(goConfigName);
LOG_INFO(sLogger, ("Go pipelines start", "succeeded")("config name", configName));
}
#else
LogtailPluginMock::GetInstance()->Start(configName);
#endif
}

int LogtailPlugin::IsValidToSend(long long logstoreKey) {
Expand Down Expand Up @@ -503,6 +527,7 @@ void LogtailPlugin::ProcessLog(const std::string& configName,
void LogtailPlugin::ProcessLogGroup(const std::string& configName,
const std::string& logGroup,
const std::string& packId) {
#ifndef APSARA_UNIT_TEST_MAIN
if (logGroup.empty() || !(mPluginValid && mProcessLogsFun != NULL)) {
return;
}
Expand All @@ -521,6 +546,9 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName,
if (rst != (GoInt)0) {
LOG_WARNING(sLogger, ("process loggroup error", configName)("result", rst));
}
#else
LogtailPluginMock::GetInstance()->ProcessLogGroup(configName, logGroup, packId);
#endif
}

void LogtailPlugin::GetGoMetrics(std::vector<std::map<std::string, std::string>>& metircsList,
Expand Down
8 changes: 6 additions & 2 deletions core/monitor/profile_sender/ProfileSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
#ifdef __ENTERPRISE__
#include "EnterpriseProfileSender.h"
#endif
#include "sdk/Exception.h"
#include "plugin/flusher/sls/SLSClientManager.h"
#include "app_config/AppConfig.h"
#include "plugin/flusher/sls/SLSClientManager.h"
#include "sdk/Exception.h"
// TODO: temporarily used
#include "common/compression/CompressorFactory.h"

Expand Down Expand Up @@ -119,12 +119,16 @@ FlusherSLS* ProfileSender::GetFlusher(const string& region) {
}

bool ProfileSender::IsProfileData(const string& region, const string& project, const string& logstore) {
#ifndef APSARA_UNIT_TEST_MAIN
if ((logstore == "shennong_log_profile" || logstore == "logtail_alarm" || logstore == "logtail_status_profile"
|| logstore == "logtail_suicide_profile")
&& (project == GetProfileProjectName(region) || region == ""))
return true;
else
return false;
#else
return false;
#endif
}

void ProfileSender::SendToProfileProject(const string& region, sls_logs::LogGroup& logGroup) {
Expand Down
5 changes: 3 additions & 2 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,9 @@ bool Pipeline::Send(vector<PipelineEventGroup>&& groupList) {
auto res = mRouter.Route(group);
for (auto& item : res) {
if (item.first >= mFlushers.size()) {
LOG_ERROR(sLogger,
("unexpected error", "invalid flusher index")("flusher index", item.first)("config", mName));
LOG_WARNING(sLogger,
("pipeline send", "discard data")("config", mName)(
"reason", "invalid flusher index or config update flusher from C++ to Go"));
allSucceeded = false;
continue;
}
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Pipeline {
PipelineContext& GetContext() const { return mContext; }
const Json::Value& GetConfig() const { return *mConfig; }
const std::optional<std::string>& GetSingletonInput() const { return mSingletonInput; }
const std::vector<std::unique_ptr<ProcessorInstance>>& GetProcessors() const { return mProcessorLine; }
const std::vector<std::unique_ptr<FlusherInstance>>& GetFlushers() const { return mFlushers; }
bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); }
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& GetPluginStatistics() const {
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class PipelineManager {
friend class CircularProcessQueueUnittest;
friend class CommonConfigProviderUnittest;
friend class FlusherUnittest;
friend class PipelineUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/instance/ProcessorInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ProcessorInstance : public PluginInstance {
ProcessorInstance(Processor* plugin, const PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {}

const std::string& Name() const override { return mPlugin->Name(); };
const Processor* GetPlugin() const { return mPlugin.get(); }

bool Init(const Json::Value& config, PipelineContext& context);
void Process(std::vector<PipelineEventGroup>& logGroupList);
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Flusher : public Plugin {
friend class FlusherInstanceUnittest;
friend class FlusherRunnerUnittest;
friend class FlusherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/BoundedProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class BoundedProcessQueue : public BoundedQueueInterface<std::unique_ptr<Process
friend class ProcessQueueManagerUnittest;
friend class ExactlyOnceQueueManagerUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
10 changes: 3 additions & 7 deletions core/pipeline/queue/ProcessQueueItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@ struct ProcessQueueItem {
ProcessQueueItem(PipelineEventGroup&& group, size_t index) : mEventGroup(std::move(group)), mInputIndex(index) {}

void AddPipelineInProcessCnt(const std::string& configName) {
if (mPipeline) {
mPipeline->AddInProcessCnt();
} else {
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
p->AddInProcessCnt();
}
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
p->AddInProcessCnt();
}
}
};
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/ProcessQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ProcessQueueManager : public FeedbackInterface {
void Clear();
friend class ProcessQueueManagerUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/SenderQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class SenderQueueManager : public FeedbackInterface {
#ifdef APSARA_UNIT_TEST_MAIN
friend class SenderQueueManagerUnittest;
friend class FlusherRunnerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
4 changes: 2 additions & 2 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,8 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)
// the possibility of hash key conflict is very low, so data is
// dropped here.
cpt->Commit();
failDetail << ", drop exactly once log group and commit checkpoint"
<< " checkpointKey:" << cpt->key << " checkpoint:" << cpt->data.DebugString();
failDetail << ", drop exactly once log group and commit checkpoint" << " checkpointKey:" << cpt->key
<< " checkpoint:" << cpt->data.DebugString();
suggestion << "no suggestion";
AlarmManager::GetInstance()->SendAlarm(
EXACTLY_ONCE_ALARM,
Expand Down
9 changes: 4 additions & 5 deletions core/runner/FlusherRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "pipeline/queue/SenderQueueItem.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "runner/sink/http/HttpSink.h"
// TODO: temporarily used here
#include "plugin/flusher/sls/PackIdManager.h"
#include "plugin/flusher/sls/SLSClientManager.h"
Expand Down Expand Up @@ -59,6 +58,7 @@ bool FlusherRunner::Init() {

mThreadRes = async(launch::async, &FlusherRunner::Run, this);
mLastCheckSendClientTime = time(nullptr);
mIsFlush = false;

return true;
}
Expand Down Expand Up @@ -139,12 +139,12 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) {
}

req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now();
HttpSink::GetInstance()->AddRequest(std::move(req));
++mHttpSendingCnt;
LOG_DEBUG(sLogger,
("send item to http sink, item address", item)("config-flusher-dst",
QueueKeyManager::GetInstance()->GetName(item->mQueueKey))(
"sending cnt", ToString(mHttpSendingCnt.load())));
"sending cnt", ToString(mHttpSendingCnt.load() + 1)));
HttpSink::GetInstance()->AddRequest(std::move(req));
++mHttpSendingCnt;
}

void FlusherRunner::Run() {
Expand Down Expand Up @@ -195,7 +195,6 @@ void FlusherRunner::Run() {
PackIdManager::GetInstance()->CleanTimeoutEntry();
mLastCheckSendClientTime = time(NULL);
}

if (mIsFlush && SenderQueueManager::GetInstance()->IsAllQueueEmpty()) {
break;
}
Expand Down
2 changes: 2 additions & 0 deletions core/runner/FlusherRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "pipeline/plugin/interface/Flusher.h"
#include "pipeline/queue/SenderQueueItem.h"
#include "runner/sink/SinkType.h"
#include "runner/sink/http/HttpSink.h"

namespace logtail {

Expand Down Expand Up @@ -83,6 +84,7 @@ class FlusherRunner {
friend class PluginRegistryUnittest;
friend class FlusherRunnerUnittest;
friend class InstanceConfigManagerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
4 changes: 2 additions & 2 deletions core/runner/ProcessorRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "monitor/AlarmManager.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "pipeline/PipelineManager.h"
#include "queue/ExactlyOnceQueueManager.h"
#include "queue/ProcessQueueManager.h"
#include "queue/QueueKeyManager.h"

Expand All @@ -49,6 +48,7 @@ void ProcessorRunner::Init() {
for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) {
mThreadRes[threadNo] = async(launch::async, &ProcessorRunner::Run, this, threadNo);
}
mIsFlush = false;
}

void ProcessorRunner::Stop() {
Expand Down Expand Up @@ -142,7 +142,7 @@ void ProcessorRunner::Run(uint32_t threadNo) {
pipeline->Process(eventGroupList, item->mInputIndex);
// if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline
if (hasOldPipeline) {
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName);
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); // update to new pipeline
if (!pipeline) {
LOG_INFO(sLogger,
("pipeline not found during processing, perhaps due to config deletion",
Expand Down
2 changes: 1 addition & 1 deletion core/runner/sink/Sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Sink {
public:
virtual bool Init() = 0;
virtual void Stop() = 0;

bool AddRequest(std::unique_ptr<T>&& request) {
mQueue.Push(std::move(request));
return true;
Expand Down
Loading
Loading