Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: pipeline update unittest #1991

Merged
merged 16 commits into from
Dec 30, 2024
6 changes: 5 additions & 1 deletion core/common/http/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class HttpResponse {
HttpResponse()
: mHeader(compareHeader),
mBody(new std::string(), [](void* p) { delete static_cast<std::string*>(p); }),
mWriteCallback(DefaultWriteCallback) {};
mWriteCallback(DefaultWriteCallback){};
HttpResponse(void* body,
const std::function<void(void*)>& bodyDeleter,
size_t (*callback)(char*, size_t, size_t, void*))
Expand Down Expand Up @@ -155,6 +155,10 @@ class HttpResponse {
std::map<std::string, std::string, decltype(compareHeader)*> mHeader;
std::unique_ptr<void, std::function<void(void*)>> mBody;
size_t (*mWriteCallback)(char*, size_t, size_t, void*) = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class HttpSinkMock;
#endif
};

} // namespace logtail
9 changes: 8 additions & 1 deletion core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ bool PipelineConfig::Parse() {
if (pluginType == "input_file" || pluginType == "input_container_stdio") {
hasFileInput = true;
}
#ifdef APSARA_UNIT_TEST_MAIN
if (pluginType.find("mock") != string::npos) {
hasFileInput = true;
}
#endif
}
// TODO: remove these special restrictions
if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
Expand Down Expand Up @@ -530,7 +535,9 @@ bool PipelineConfig::Parse() {
}
mRouter.emplace_back(i, itr);
} else {
mRouter.emplace_back(i, nullptr);
if (!IsFlushingThroughGoPipelineExisted()) {
mRouter.emplace_back(i, nullptr);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {

void LogInput::UpdateCriticalMetric(int32_t curTime) {
mLastRunTime->Set(mLastReadEventTime.load());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
mRegisterdHandlersTotal->Set(EventDispatcher::GetInstance()->GetHandlerCount());
mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount());
mEventProcessCount = 0;
Expand Down Expand Up @@ -529,6 +530,7 @@ Event* LogInput::PopEventQueue() {
#ifdef APSARA_UNIT_TEST_MAIN
void LogInput::CleanEnviroments() {
mIdleFlag = true;
mInteruptFlag = true;
usleep(100 * 1000);
while (true) {
Event* ev = PopEventQueue();
Expand Down
29 changes: 27 additions & 2 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "plugin/processor/ProcessorParseApsaraNative.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/LogtailPluginMock.h"
#endif

DECLARE_FLAG_INT32(default_plugin_log_queue_size);

Expand Down Expand Up @@ -338,18 +341,29 @@ bool Pipeline::Init(PipelineConfig&& config) {
void Pipeline::Start() {
// #ifndef APSARA_UNIT_TEST_MAIN
// TODO: 应该保证指定时间内返回,如果无法返回,将配置放入startDisabled里
LOG_WARNING(sLogger, ("debug", "8"));
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
for (const auto& flusher : mFlushers) {
flusher->Start();
}

LOG_WARNING(sLogger, ("debug", "9"));
if (!mGoPipelineWithoutInput.isNull()) {
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithoutInput());
#else
LogtailPluginMock::GetInstance()->Start(GetConfigNameOfGoPipelineWithoutInput());
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
#endif
}

LOG_WARNING(sLogger, ("debug", "10"));
ProcessQueueManager::GetInstance()->EnablePop(mName);

if (!mGoPipelineWithInput.isNull()) {
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithInput());
#else
LogtailPluginMock::GetInstance()->Start(GetConfigNameOfGoPipelineWithInput());
#endif
}

for (const auto& input : mInputs) {
Expand Down Expand Up @@ -395,8 +409,9 @@ bool Pipeline::Send(vector<PipelineEventGroup>&& groupList) {
auto res = mRouter.Route(group);
for (auto& item : res) {
if (item.first >= mFlushers.size()) {
LOG_ERROR(sLogger,
("unexpected error", "invalid flusher index")("flusher index", item.first)("config", mName));
LOG_WARNING(sLogger,
("pipeline send", "discard data")("config", mName)(
"reason", "invalid flusher index or config update flusher from C++ to Go"));
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
allSucceeded = false;
continue;
}
Expand Down Expand Up @@ -424,7 +439,11 @@ void Pipeline::Stop(bool isRemoving) {

if (!mGoPipelineWithInput.isNull()) {
// Go pipeline `Stop` will stop and delete
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithInput(), isRemoving);
#else
LogtailPluginMock::GetInstance()->Stop(GetConfigNameOfGoPipelineWithInput(), isRemoving);
#endif
}

ProcessQueueManager::GetInstance()->DisablePop(mName, isRemoving);
Expand All @@ -434,7 +453,11 @@ void Pipeline::Stop(bool isRemoving) {

if (!mGoPipelineWithoutInput.isNull()) {
// Go pipeline `Stop` will stop and delete
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithoutInput(), isRemoving);
#else
LogtailPluginMock::GetInstance()->Stop(GetConfigNameOfGoPipelineWithoutInput(), isRemoving);
#endif
}

for (const auto& flusher : mFlushers) {
Expand Down Expand Up @@ -488,6 +511,7 @@ void Pipeline::CopyNativeGlobalParamToGoPipeline(Json::Value& pipeline) {
}

bool Pipeline::LoadGoPipelines() const {
#ifndef APSARA_UNIT_TEST_MAIN
if (!mGoPipelineWithoutInput.isNull()) {
string content = mGoPipelineWithoutInput.toStyledString();
if (!LogtailPlugin::GetInstance()->LoadPipeline(GetConfigNameOfGoPipelineWithoutInput(),
Expand Down Expand Up @@ -529,6 +553,7 @@ bool Pipeline::LoadGoPipelines() const {
return false;
}
}
#endif
return true;
}

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Pipeline {
PipelineContext& GetContext() const { return mContext; }
const Json::Value& GetConfig() const { return *mConfig; }
const std::optional<std::string>& GetSingletonInput() const { return mSingletonInput; }
const std::vector<std::unique_ptr<ProcessorInstance>>& GetProcessors() const { return mProcessorLine; }
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
const std::vector<std::unique_ptr<FlusherInstance>>& GetFlushers() const { return mFlushers; }
bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); }
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& GetPluginStatistics() const {
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/instance/ProcessorInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ProcessorInstance : public PluginInstance {
ProcessorInstance(Processor* plugin, const PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {}

const std::string& Name() const override { return mPlugin->Name(); };
const Processor* GetPlugin() const { return mPlugin.get(); }
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

bool Init(const Json::Value& config, PipelineContext& context);
void Process(std::vector<PipelineEventGroup>& logGroupList);
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Flusher : public Plugin {
friend class FlusherInstanceUnittest;
friend class FlusherRunnerUnittest;
friend class FlusherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/ProcessQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ProcessQueueManager : public FeedbackInterface {
void Clear();
friend class ProcessQueueManagerUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/SenderQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class SenderQueueManager : public FeedbackInterface {
#ifdef APSARA_UNIT_TEST_MAIN
friend class SenderQueueManagerUnittest;
friend class FlusherRunnerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
6 changes: 6 additions & 0 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,9 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline

// CompressType
if (BOOL_FLAG(sls_client_send_compress)) {
#ifndef APSARA_UNIT_TEST_MAIN
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
mCompressor = CompressorFactory::GetInstance()->Create(config, *mContext, sName, mPluginID, CompressType::LZ4);
#endif
}

mGroupSerializer = make_unique<SLSEventGroupSerializer>(this);
Expand Down Expand Up @@ -672,7 +674,11 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)

auto data = static_cast<SLSSenderQueueItem*>(item);
string configName = HasContext() ? GetContext().GetConfigName() : "";
#ifndef APSARA_UNIT_TEST_MAIN
bool isProfileData = GetProfileSender()->IsProfileData(mRegion, mProject, data->mLogstore);
#else
bool isProfileData = false;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
#endif
int32_t curTime = time(NULL);
auto curSystemTime = chrono::system_clock::now();
bool hasAuthError = false;
Expand Down
9 changes: 9 additions & 0 deletions core/runner/FlusherRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "runner/sink/http/HttpSink.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/HttpSinkMock.h"
#endif
// TODO: temporarily used here
#include "plugin/flusher/sls/PackIdManager.h"
#include "plugin/flusher/sls/SLSClientManager.h"
Expand Down Expand Up @@ -59,6 +62,7 @@ bool FlusherRunner::Init() {

mThreadRes = async(launch::async, &FlusherRunner::Run, this);
mLastCheckSendClientTime = time(nullptr);
mIsFlush = false;

return true;
}
Expand Down Expand Up @@ -139,12 +143,17 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) {
}

req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now();
#ifndef APSARA_UNIT_TEST_MAIN
HttpSink::GetInstance()->AddRequest(std::move(req));
++mHttpSendingCnt;
LOG_DEBUG(sLogger,
("send item to http sink, item address", item)("config-flusher-dst",
QueueKeyManager::GetInstance()->GetName(item->mQueueKey))(
"sending cnt", ToString(mHttpSendingCnt.load())));
#else
HttpSinkMock::GetInstance()->AddRequest(std::move(req)); // release item here
++mHttpSendingCnt;
#endif
}

void FlusherRunner::Run() {
Expand Down
16 changes: 14 additions & 2 deletions core/runner/ProcessorRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include "monitor/AlarmManager.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "pipeline/PipelineManager.h"
#include "queue/ExactlyOnceQueueManager.h"
#include "queue/ProcessQueueManager.h"
#include "queue/QueueKeyManager.h"
#include "unittest/pipeline/LogtailPluginMock.h"

DEFINE_FLAG_INT32(default_flush_merged_buffer_interval, "default flush merged buffer, seconds", 1);
DEFINE_FLAG_INT32(processor_runner_exit_timeout_secs, "", 60);
Expand All @@ -49,6 +49,7 @@ void ProcessorRunner::Init() {
for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) {
mThreadRes[threadNo] = async(launch::async, &ProcessorRunner::Run, this, threadNo);
}
mIsFlush = false;
}

void ProcessorRunner::Stop() {
Expand Down Expand Up @@ -141,8 +142,11 @@ void ProcessorRunner::Run(uint32_t threadNo) {
eventGroupList.emplace_back(std::move(item->mEventGroup));
pipeline->Process(eventGroupList, item->mInputIndex);
// if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline
shared_ptr<Pipeline> oldPipeline;
if (hasOldPipeline) {
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName);
pipeline->SubInProcessCnt(); // old pipeline
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); // update to new pipeline
pipeline->AddInProcessCnt();
if (!pipeline) {
LOG_INFO(sLogger,
("pipeline not found during processing, perhaps due to config deletion",
Expand Down Expand Up @@ -175,11 +179,19 @@ void ProcessorRunner::Run(uint32_t threadNo) {
pipeline->GetContext().GetRegion());
continue;
}
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->ProcessLogGroup(
pipeline->GetContext().GetConfigName(),
res,
group.GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string());
}
#else
LogtailPluginMock::GetInstance()->ProcessLogGroup(
pipeline->GetContext().GetConfigName(),
res,
group.GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string());
}
#endif
}
} else {
pipeline->Send(std::move(eventGroupList));
Expand Down
3 changes: 2 additions & 1 deletion core/runner/sink/http/HttpSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
#include <future>
#include <mutex>

#include "monitor/MetricManager.h"
#include "runner/sink/Sink.h"
#include "runner/sink/http/HttpSinkRequest.h"
#include "monitor/MetricManager.h"

namespace logtail {

Expand Down Expand Up @@ -68,6 +68,7 @@ class HttpSink : public Sink<HttpSinkRequest> {

#ifdef APSARA_UNIT_TEST_MAIN
friend class FlusherRunnerUnittest;
friend class HttpSinkMock;
#endif
};

Expand Down
6 changes: 3 additions & 3 deletions core/unittest/config/PipelineManagerMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ class PipelineMock : public Pipeline {
mContext.SetCreateTime(config.mCreateTime);
return (*mConfig)["valid"].asBool();
}

bool Start() { return true; }
void Stop(bool isRemoving) {}
};

class PipelineManagerMock : public PipelineManager {
Expand All @@ -44,6 +41,9 @@ class PipelineManagerMock : public PipelineManager {
}

void ClearEnvironment() {
for (auto& it : mPipelineNameEntityMap) {
it.second->Stop(true);
}
mPipelineNameEntityMap.clear();
mPluginCntMap.clear();
}
Expand Down
Loading
Loading