Skip to content

Commit

Permalink
disable group batch when flusher_sls.telemetrytype equals metrics (#2029
Browse files Browse the repository at this point in the history
)
  • Loading branch information
henryzhx8 committed Jan 13, 2025
1 parent 0890f3d commit 969f80c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
12 changes: 8 additions & 4 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,13 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)),
static_cast<uint32_t>(INT32_FLAG(merge_log_count_limit)),
static_cast<uint32_t>(INT32_FLAG(batch_send_interval))};
if (!mBatcher.Init(
itr ? *itr : Json::Value(), this, strategy, !mContext->IsExactlyOnceEnabled() && mShardHashKeys.empty())) {
// when either exactly once is enabled or ShardHashKeys is not empty, we don't enable group batch
if (!mBatcher.Init(itr ? *itr : Json::Value(),
this,
strategy,
!mContext->IsExactlyOnceEnabled() && mShardHashKeys.empty()
&& mTelemetryType != sls_logs::SLS_TELEMETRY_TYPE_METRICS)) {
// when either exactly once is enabled or ShardHashKeys is not empty or telemetry type is metrics, we don't
// enable group batch
return false;
}

Expand Down Expand Up @@ -901,7 +905,7 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)
}
}
#ifdef __ENTERPRISE__
bool hasNetworkError = (sendResult == SEND_NETWORK_ERROR || sendResult == SEND_SERVER_ERROR);
bool hasNetworkError = sendResult == SEND_NETWORK_ERROR;
EnterpriseSLSClientManager::GetInstance()->UpdateHostStatus(
mProject, mCandidateHostsInfo->GetMode(), data->mCurrentHost, !hasNetworkError);
mCandidateHostsInfo->SelectBestHost();
Expand Down
18 changes: 18 additions & 0 deletions core/unittest/flusher/FlusherSLSUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,24 @@ void FlusherSLSUnittest::OnSuccessfulInit() {
ctx.SetExactlyOnceFlag(false);
SenderQueueManager::GetInstance()->Clear();

configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"TelemetryType": "metrics"
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
SenderQueueManager::GetInstance()->Clear();

// go param
ctx.SetIsFlushingThroughGoPipelineFlag(true);
configStr = R"(
Expand Down

0 comments on commit 969f80c

Please sign in to comment.