Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: pipeline update unittest #1991

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ class AppConfig {
friend class InputPrometheusUnittest;
friend class InputContainerStdioUnittest;
friend class BatcherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
6 changes: 5 additions & 1 deletion core/common/http/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class HttpResponse {
HttpResponse()
: mHeader(compareHeader),
mBody(new std::string(), [](void* p) { delete static_cast<std::string*>(p); }),
mWriteCallback(DefaultWriteCallback) {};
mWriteCallback(DefaultWriteCallback){};
HttpResponse(void* body,
const std::function<void(void*)>& bodyDeleter,
size_t (*callback)(char*, size_t, size_t, void*))
Expand Down Expand Up @@ -155,6 +155,10 @@ class HttpResponse {
std::map<std::string, std::string, decltype(compareHeader)*> mHeader;
std::unique_ptr<void, std::function<void(void*)>> mBody;
size_t (*mWriteCallback)(char*, size_t, size_t, void*) = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class HttpSinkMock;
#endif
};

} // namespace logtail
9 changes: 8 additions & 1 deletion core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ bool PipelineConfig::Parse() {
if (pluginType == "input_file" || pluginType == "input_container_stdio") {
hasFileInput = true;
}
#ifdef APSARA_UNIT_TEST_MAIN
if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) {
hasFileInput = true;
}
#endif
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
}
// TODO: remove these special restrictions
if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
Expand Down Expand Up @@ -530,7 +535,9 @@ bool PipelineConfig::Parse() {
}
mRouter.emplace_back(i, itr);
} else {
mRouter.emplace_back(i, nullptr);
if (!IsFlushingThroughGoPipelineExisted()) {
mRouter.emplace_back(i, nullptr);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {

void LogInput::UpdateCriticalMetric(int32_t curTime) {
mLastRunTime->Set(mLastReadEventTime.load());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
mRegisterdHandlersTotal->Set(EventDispatcher::GetInstance()->GetHandlerCount());
mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount());
mEventProcessCount = 0;
Expand Down Expand Up @@ -529,6 +530,7 @@ Event* LogInput::PopEventQueue() {
#ifdef APSARA_UNIT_TEST_MAIN
void LogInput::CleanEnviroments() {
mIdleFlag = true;
mInteruptFlag = true;
usleep(100 * 1000);
while (true) {
Event* ev = PopEventQueue();
Expand Down
15 changes: 15 additions & 0 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include "pipeline/PipelineManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "provider/Provider.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/LogtailPluginMock.h"
#endif

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
Expand Down Expand Up @@ -145,6 +148,7 @@ void LogtailPlugin::StopAllPipelines(bool withInputFlag) {
}

void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStopFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop", "starts")("config", configName));
auto stopStart = GetCurrentTimeInMilliSeconds();
Expand All @@ -159,6 +163,9 @@ void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
HOLD_ON_TOO_SLOW_ALARM, "Stopping Go pipeline " + configName + " took " + ToString(stopCost) + "ms");
}
}
#else
LogtailPluginMock::GetInstance()->Stop(configName, removedFlag);
#endif
}

void LogtailPlugin::StopBuiltInModules() {
Expand All @@ -170,6 +177,7 @@ void LogtailPlugin::StopBuiltInModules() {
}

void LogtailPlugin::Start(const std::string& configName) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStartFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines start", "starts")("config name", configName));
GoString goConfigName;
Expand All @@ -178,6 +186,9 @@ void LogtailPlugin::Start(const std::string& configName) {
mStartFun(goConfigName);
LOG_INFO(sLogger, ("Go pipelines start", "succeeded")("config name", configName));
}
#else
LogtailPluginMock::GetInstance()->Start(configName);
#endif
}

int LogtailPlugin::IsValidToSend(long long logstoreKey) {
Expand Down Expand Up @@ -503,6 +514,7 @@ void LogtailPlugin::ProcessLog(const std::string& configName,
void LogtailPlugin::ProcessLogGroup(const std::string& configName,
const std::string& logGroup,
const std::string& packId) {
#ifndef APSARA_UNIT_TEST_MAIN
if (logGroup.empty() || !(mPluginValid && mProcessLogsFun != NULL)) {
return;
}
Expand All @@ -521,6 +533,9 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName,
if (rst != (GoInt)0) {
LOG_WARNING(sLogger, ("process loggroup error", configName)("result", rst));
}
#else
LogtailPluginMock::GetInstance()->ProcessLogGroup(configName, logGroup, packId);
#endif
}

void LogtailPlugin::GetGoMetrics(std::vector<std::map<std::string, std::string>>& metircsList,
Expand Down
7 changes: 5 additions & 2 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,9 @@ bool Pipeline::Send(vector<PipelineEventGroup>&& groupList) {
auto res = mRouter.Route(group);
for (auto& item : res) {
if (item.first >= mFlushers.size()) {
LOG_ERROR(sLogger,
("unexpected error", "invalid flusher index")("flusher index", item.first)("config", mName));
LOG_WARNING(sLogger,
("pipeline send", "discard data")("config", mName)(
"reason", "invalid flusher index or config update flusher from C++ to Go"));
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
allSucceeded = false;
continue;
}
Expand Down Expand Up @@ -488,6 +489,7 @@ void Pipeline::CopyNativeGlobalParamToGoPipeline(Json::Value& pipeline) {
}

bool Pipeline::LoadGoPipelines() const {
#ifndef APSARA_UNIT_TEST_MAIN
if (!mGoPipelineWithoutInput.isNull()) {
string content = mGoPipelineWithoutInput.toStyledString();
if (!LogtailPlugin::GetInstance()->LoadPipeline(GetConfigNameOfGoPipelineWithoutInput(),
Expand Down Expand Up @@ -529,6 +531,7 @@ bool Pipeline::LoadGoPipelines() const {
return false;
}
}
#endif
return true;
}

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Pipeline {
PipelineContext& GetContext() const { return mContext; }
const Json::Value& GetConfig() const { return *mConfig; }
const std::optional<std::string>& GetSingletonInput() const { return mSingletonInput; }
const std::vector<std::unique_ptr<ProcessorInstance>>& GetProcessors() const { return mProcessorLine; }
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
const std::vector<std::unique_ptr<FlusherInstance>>& GetFlushers() const { return mFlushers; }
bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); }
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& GetPluginStatistics() const {
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class PipelineManager {
friend class CircularProcessQueueUnittest;
friend class CommonConfigProviderUnittest;
friend class FlusherUnittest;
friend class PipelineUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/instance/ProcessorInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ProcessorInstance : public PluginInstance {
ProcessorInstance(Processor* plugin, const PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {}

const std::string& Name() const override { return mPlugin->Name(); };
const Processor* GetPlugin() const { return mPlugin.get(); }
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

bool Init(const Json::Value& config, PipelineContext& context);
void Process(std::vector<PipelineEventGroup>& logGroupList);
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Flusher : public Plugin {
friend class FlusherInstanceUnittest;
friend class FlusherRunnerUnittest;
friend class FlusherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/BoundedProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class BoundedProcessQueue : public BoundedQueueInterface<std::unique_ptr<Process
friend class ProcessQueueManagerUnittest;
friend class ExactlyOnceQueueManagerUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
10 changes: 3 additions & 7 deletions core/pipeline/queue/ProcessQueueItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@ struct ProcessQueueItem {
ProcessQueueItem(PipelineEventGroup&& group, size_t index) : mEventGroup(std::move(group)), mInputIndex(index) {}

void AddPipelineInProcessCnt(const std::string& configName) {
if (mPipeline) {
mPipeline->AddInProcessCnt();
} else {
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
p->AddInProcessCnt();
}
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
p->AddInProcessCnt();
}
}
};
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/ProcessQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ProcessQueueManager : public FeedbackInterface {
void Clear();
friend class ProcessQueueManagerUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/SenderQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class SenderQueueManager : public FeedbackInterface {
#ifdef APSARA_UNIT_TEST_MAIN
friend class SenderQueueManagerUnittest;
friend class FlusherRunnerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
8 changes: 6 additions & 2 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,11 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)

auto data = static_cast<SLSSenderQueueItem*>(item);
string configName = HasContext() ? GetContext().GetConfigName() : "";
#ifndef APSARA_UNIT_TEST_MAIN
bool isProfileData = GetProfileSender()->IsProfileData(mRegion, mProject, data->mLogstore);
#else
bool isProfileData = false;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
#endif
int32_t curTime = time(NULL);
auto curSystemTime = chrono::system_clock::now();
bool hasAuthError = false;
Expand Down Expand Up @@ -810,8 +814,8 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)
// the possibility of hash key conflict is very low, so data is
// dropped here.
cpt->Commit();
failDetail << ", drop exactly once log group and commit checkpoint"
<< " checkpointKey:" << cpt->key << " checkpoint:" << cpt->data.DebugString();
failDetail << ", drop exactly once log group and commit checkpoint" << " checkpointKey:" << cpt->key
<< " checkpoint:" << cpt->data.DebugString();
suggestion << "no suggestion";
AlarmManager::GetInstance()->SendAlarm(
EXACTLY_ONCE_ALARM,
Expand Down
16 changes: 11 additions & 5 deletions core/runner/FlusherRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
#include "pipeline/queue/SenderQueueItem.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "runner/sink/http/HttpSink.h"
// TODO: temporarily used here
#include "plugin/flusher/sls/PackIdManager.h"
#include "plugin/flusher/sls/SLSClientManager.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/HttpSinkMock.h"
#endif

DEFINE_FLAG_INT32(flusher_runner_exit_timeout_secs, "", 60);
DEFINE_FLAG_INT32(check_send_client_timeout_interval, "", 600);
Expand Down Expand Up @@ -59,6 +61,7 @@ bool FlusherRunner::Init() {

mThreadRes = async(launch::async, &FlusherRunner::Run, this);
mLastCheckSendClientTime = time(nullptr);
mIsFlush = false;

return true;
}
Expand Down Expand Up @@ -139,12 +142,16 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) {
}

req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now();
HttpSink::GetInstance()->AddRequest(std::move(req));
++mHttpSendingCnt;
LOG_DEBUG(sLogger,
("send item to http sink, item address", item)("config-flusher-dst",
QueueKeyManager::GetInstance()->GetName(item->mQueueKey))(
"sending cnt", ToString(mHttpSendingCnt.load())));
"sending cnt", ToString(mHttpSendingCnt.load() + 1)));
#ifndef APSARA_UNIT_TEST_MAIN
HttpSink::GetInstance()->AddRequest(std::move(req));
#else
HttpSinkMock::GetInstance()->AddRequest(std::move(req));
#endif
++mHttpSendingCnt;
}

void FlusherRunner::Run() {
Expand Down Expand Up @@ -195,7 +202,6 @@ void FlusherRunner::Run() {
PackIdManager::GetInstance()->CleanTimeoutEntry();
mLastCheckSendClientTime = time(NULL);
}

if (mIsFlush && SenderQueueManager::GetInstance()->IsAllQueueEmpty()) {
break;
}
Expand Down
2 changes: 2 additions & 0 deletions core/runner/FlusherRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "pipeline/plugin/interface/Flusher.h"
#include "pipeline/queue/SenderQueueItem.h"
#include "runner/sink/SinkType.h"
#include "runner/sink/http/HttpSink.h"

namespace logtail {

Expand Down Expand Up @@ -83,6 +84,7 @@ class FlusherRunner {
friend class PluginRegistryUnittest;
friend class FlusherRunnerUnittest;
friend class InstanceConfigManagerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
5 changes: 3 additions & 2 deletions core/runner/ProcessorRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "monitor/AlarmManager.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "pipeline/PipelineManager.h"
#include "queue/ExactlyOnceQueueManager.h"
#include "queue/ProcessQueueManager.h"
#include "queue/QueueKeyManager.h"

Expand All @@ -49,6 +48,7 @@ void ProcessorRunner::Init() {
for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) {
mThreadRes[threadNo] = async(launch::async, &ProcessorRunner::Run, this, threadNo);
}
mIsFlush = false;
}

void ProcessorRunner::Stop() {
Expand Down Expand Up @@ -141,8 +141,9 @@ void ProcessorRunner::Run(uint32_t threadNo) {
eventGroupList.emplace_back(std::move(item->mEventGroup));
pipeline->Process(eventGroupList, item->mInputIndex);
// if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline
shared_ptr<Pipeline> oldPipeline;
if (hasOldPipeline) {
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName);
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); // update to new pipeline
if (!pipeline) {
LOG_INFO(sLogger,
("pipeline not found during processing, perhaps due to config deletion",
Expand Down
2 changes: 1 addition & 1 deletion core/runner/sink/Sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Sink {
public:
virtual bool Init() = 0;
virtual void Stop() = 0;

bool AddRequest(std::unique_ptr<T>&& request) {
mQueue.Push(std::move(request));
return true;
Expand Down
3 changes: 2 additions & 1 deletion core/runner/sink/http/HttpSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
#include <future>
#include <mutex>

#include "monitor/MetricManager.h"
#include "runner/sink/Sink.h"
#include "runner/sink/http/HttpSinkRequest.h"
#include "monitor/MetricManager.h"

namespace logtail {

Expand Down Expand Up @@ -68,6 +68,7 @@ class HttpSink : public Sink<HttpSinkRequest> {

#ifdef APSARA_UNIT_TEST_MAIN
friend class FlusherRunnerUnittest;
friend class HttpSinkMock;
#endif
};

Expand Down
6 changes: 3 additions & 3 deletions core/unittest/config/PipelineManagerMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ class PipelineMock : public Pipeline {
mContext.SetCreateTime(config.mCreateTime);
return (*mConfig)["valid"].asBool();
}

bool Start() { return true; }
void Stop(bool isRemoving) {}
};

class PipelineManagerMock : public PipelineManager {
Expand All @@ -44,6 +41,9 @@ class PipelineManagerMock : public PipelineManager {
}

void ClearEnvironment() {
for (auto& it : mPipelineNameEntityMap) {
it.second->Stop(true);
}
mPipelineNameEntityMap.clear();
mPluginCntMap.clear();
}
Expand Down
Loading
Loading