Skip to content

Commit

Permalink
fix the issue where flusher_sls does not work when coexisting with ex…
Browse files Browse the repository at this point in the history
…tended flushers (#2015)
  • Loading branch information
henryzhx8 committed Jan 13, 2025
1 parent 3e83e53 commit 0890f3d
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 72 deletions.
10 changes: 5 additions & 5 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ bool Pipeline::Init(PipelineConfig&& config) {
bool hasFlusherSLS = false;

// to send alarm and init MetricsRecord before flusherSLS is built, a temporary object is made, which will be
unique_ptr<FlusherSLS> SLSTmp = make_unique<FlusherSLS>();
FlusherSLS SLSTmp;
if (!config.mProject.empty()) {
SLSTmp->mProject = config.mProject;
SLSTmp->mLogstore = config.mLogstore;
SLSTmp->mRegion = config.mRegion;
mContext.SetSLSInfo(SLSTmp.get());
SLSTmp.mProject = config.mProject;
SLSTmp.mLogstore = config.mLogstore;
SLSTmp.mRegion = config.mRegion;
mContext.SetSLSInfo(&SLSTmp);
}

mPluginID.store(0);
Expand Down
2 changes: 1 addition & 1 deletion core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ void FlusherSLS::GenerateGoPlugin(const Json::Value& config, Json::Value& res) c
detail[itr.name()] = *itr;
}
}
if (!detail.empty()) {
if (mContext->IsFlushingThroughGoPipeline()) {
Json::Value plugin(Json::objectValue);
plugin["type"] = Pipeline::GenPluginTypeWithID("flusher_sls", mContext->GetPipeline().GetNowPluginID());
plugin["detail"] = detail;
Expand Down
52 changes: 26 additions & 26 deletions core/unittest/config/PipelineConfigUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
],
"extensions": [
Expand Down Expand Up @@ -496,7 +496,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -533,7 +533,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -566,7 +566,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand All @@ -591,7 +591,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -623,7 +623,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -663,7 +663,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -691,7 +691,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -731,7 +731,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -767,7 +767,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand All @@ -787,7 +787,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand All @@ -814,7 +814,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -849,7 +849,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -877,7 +877,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -917,7 +917,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -953,7 +953,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -981,7 +981,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -1016,7 +1016,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -1059,7 +1059,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -1090,7 +1090,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -1133,7 +1133,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -1172,7 +1172,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand All @@ -1195,7 +1195,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -1225,7 +1225,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -1263,7 +1263,7 @@ void PipelineConfigUnittest::HandleValidConfig() const {
"Type": "flusher_sls"
},
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down Expand Up @@ -1932,7 +1932,7 @@ void PipelineConfigUnittest::HandleInvalidAggregators() const {
],
"flushers": [
{
"Type": "flusher_kafka_v2"
"Type": "flusher_http"
}
]
}
Expand Down
33 changes: 32 additions & 1 deletion core/unittest/flusher/FlusherSLSUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,37 @@ void FlusherSLSUnittest::OnSuccessfulInit() {
ctx.SetExactlyOnceFlag(false);
SenderQueueManager::GetInstance()->Clear();

// additional param
// go param
ctx.SetIsFlushingThroughGoPipelineFlag(true);
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "cn-hangzhou",
"Endpoint": "cn-hangzhou.log.aliyuncs.com",
}
)";
optionalGoPipelineStr = R"(
{
"flushers": [
{
"type": "flusher_sls/4",
"detail": {}
}
]
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(optionalGoPipelineStr, optionalGoPipelineJson, errorMsg));
pipeline.mPluginID.store(4);
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_TRUE(optionalGoPipelineJson == optionalGoPipeline);
optionalGoPipeline.clear();

configStr = R"(
{
"Type": "flusher_sls",
Expand Down Expand Up @@ -475,6 +505,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() {
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL(optionalGoPipelineJson.toStyledString(), optionalGoPipeline.toStyledString());
ctx.SetIsFlushingThroughGoPipelineFlag(false);
SenderQueueManager::GetInstance()->Clear();
}

Expand Down
Loading

0 comments on commit 0890f3d

Please sign in to comment.