From 8565ab59dc05d945e57d7c75121d8b4b5456cdef Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Wed, 8 Jan 2025 18:00:04 +0800 Subject: [PATCH 01/12] support subpath --- core/plugin/flusher/sls/DiskBufferWriter.cpp | 26 ++++++++ core/plugin/flusher/sls/FlusherSLS.cpp | 54 +++++++++++++++ core/plugin/flusher/sls/FlusherSLS.h | 6 ++ core/plugin/flusher/sls/SLSClientManager.cpp | 69 ++++++++++++++++++++ core/plugin/flusher/sls/SLSClientManager.h | 23 +++++++ core/protobuf/sls/logtail_buffer_meta.proto | 1 + core/protobuf/sls/sls_logs.proto | 3 + 7 files changed, 182 insertions(+) diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index ff94990e6d..d68ebe049c 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -80,6 +80,8 @@ static sls_logs::EndpointMode GetEndpointMode(EndpointMode mode) { static const string kAKErrorMsg = "can not get valid access key"; #endif +static const string kNoSubpathErrorMsg = "subpath not set"; + static const string kNoHostErrorMsg = "can not get available host"; static const string& GetSLSCompressTypeString(sls_logs::SlsCompressType compressType) { @@ -770,6 +772,7 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) { bufferMeta.set_shardhashkey(data->mShardHashKey); bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType())); bufferMeta.set_telemetrytype(flusher->mTelemetryType); + bufferMeta.set_subpath(flusher->mSubpath); #ifdef __ENTERPRISE__ bufferMeta.set_endpointmode(GetEndpointMode(flusher->mEndpointMode)); #endif @@ -877,6 +880,29 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe GetSLSCompressTypeString(bufferMeta.compresstype()), logData, bufferMeta.rawsize()); + } else if (bufferMeta.has_telemetrytype() && + (bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS || + bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES || + bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS)) { + if (!bufferMeta.has_subpath() || bufferMeta.subpath().empty()) { + LOG_ERROR(sLogger, ("don't set supath, telemetry type", static_cast(bufferMeta.telemetrytype()))); + SLSResponse response; + response.mErrorCode = LOGE_PARAMETER_INVALID; + response.mErrorMsg = kNoSubpathErrorMsg; + return response; + } + LOG_DEBUG(sLogger, ("subpath", bufferMeta.subpath()) ("telemetry type", static_cast(bufferMeta.telemetrytype()))); + return PostARMSBackendLogs(accessKeyId, + accessKeySecret, + type, + host, + httpsFlag, + bufferMeta.project(), + bufferMeta.logstore(), + GetSLSCompressTypeString(bufferMeta.compresstype()), + logData, + bufferMeta.rawsize(), + bufferMeta.subpath()); } else { return PostLogStoreLogs(accessKeyId, accessKeySecret, diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 676f5fbc9b..285ca2daf3 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -424,6 +424,21 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } else if (telemetryType == "metrics") { mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS : sls_logs::SLS_TELEMETRY_TYPE_LOGS; + }if (telemetryType == "arms_agentinfo") { + mSubpath = "/apm/meta/arms/v1/meta_log/AgentInfo"; + mLogstore = "__arms_default_agentinfo__"; + mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS; + LOG_DEBUG(sLogger, ("successfully set agentinfo subpath", mSubpath) ("logstore", mLogstore)); + } else if (telemetryType == "arms_metrics") { + mSubpath = "/apm/metric/arms/v1/metric_log"; + mLogstore = "__arms_default_metric__"; + mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS; + LOG_DEBUG(sLogger, ("successfully set metric subpath", mSubpath) ("logstore", mLogstore)); + } else if (telemetryType == "arms_traces") { + mSubpath = "/apm/trace/arms/v1/trace_log"; + mLogstore = "__arms_default_trace__"; + mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES; + LOG_DEBUG(sLogger, ("successfully set trace subpath", mSubpath) ("logstore", mLogstore)); } else if (!telemetryType.empty() && telemetryType != "logs") { PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), @@ -667,6 +682,11 @@ bool FlusherSLS::BuildRequest(SenderQueueItem* item, unique_ptr case sls_logs::SLS_TELEMETRY_TYPE_METRICS: req = CreatePostMetricStoreLogsRequest(accessKeyId, accessKeySecret, type, data); break; + case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS: + case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS: + case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES: + req = CreatePostArmsBackendRequest(accessKeyId, accessKeySecret, type, data, mSubpath); + break; default: break; } @@ -1245,6 +1265,40 @@ unique_ptr FlusherSLS::CreatePostMetricStoreLogsRequest(const s 1); } +unique_ptr FlusherSLS::CreatePostArmsBackendRequest(const string& accessKeyId, + const string& accessKeySecret, + SLSClientManager::AuthType type, + SLSSenderQueueItem* item, + const std::string& subPath) const { + + // string path; + map header; + PreparePostAPMBackendRequest(accessKeyId, + accessKeySecret, + type, + item->mCurrentHost, + item->mRealIpFlag, + mProject, + item->mLogstore, + CompressTypeToString(mCompressor->GetCompressType()), + item->mData, + item->mRawSize, + subPath, + header); + bool httpsFlag = SLSClientManager::GetInstance()->UsingHttps(mRegion); + return make_unique(HTTP_POST, + httpsFlag, + item->mCurrentHost, + httpsFlag ? 443 : 80, + subPath, + "", + header, + item->mData, + item, + INT32_FLAG(default_http_request_timeout_sec), + 1); +} + sls_logs::SlsCompressType ConvertCompressType(CompressType type) { sls_logs::SlsCompressType compressType = sls_logs::SLS_CMP_NONE; switch (type) { diff --git a/core/plugin/flusher/sls/FlusherSLS.h b/core/plugin/flusher/sls/FlusherSLS.h index a228e2567d..68bf54573e 100644 --- a/core/plugin/flusher/sls/FlusherSLS.h +++ b/core/plugin/flusher/sls/FlusherSLS.h @@ -81,6 +81,7 @@ class FlusherSLS : public HttpFlusher { std::string mProject; std::string mLogstore; + std::string mSubpath; std::string mRegion; std::string mAliuid; #ifdef __ENTERPRISE__ @@ -130,6 +131,11 @@ class FlusherSLS : public HttpFlusher { const std::string& accessKeySecret, SLSClientManager::AuthType type, SLSSenderQueueItem* item) const; + std::unique_ptr CreatePostArmsBackendRequest(const std::string& accessKeyId, + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + SLSSenderQueueItem* item, + const std::string& subPath) const; Batcher mBatcher; std::unique_ptr mGroupSerializer; diff --git a/core/plugin/flusher/sls/SLSClientManager.cpp b/core/plugin/flusher/sls/SLSClientManager.cpp index 9989d1991e..de486a5190 100644 --- a/core/plugin/flusher/sls/SLSClientManager.cpp +++ b/core/plugin/flusher/sls/SLSClientManager.cpp @@ -236,6 +236,45 @@ void PreparePostMetricStoreLogsRequest(const string& accessKeyId, header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature; } +void PreparePostAPMBackendRequest(const string& accessKeyId, + const string& accessKeySecret, + SLSClientManager::AuthType type, + const string& host, + bool isHostIp, + const string& project, + const string& logstore, + const string& compressType, + const string& body, + size_t rawSize, + const string& path, + map& header) { + + LOG_DEBUG(sLogger, ("entering, subpath", path) ("project", project)); + if (isHostIp) { + header[HOST] = project + "." + host; + } else { + header[HOST] = host; + } + header[USER_AGENT] = SLSClientManager::GetInstance()->GetUserAgent(); + header[DATE] = GetDateString(); + header[CONTENT_TYPE] = TYPE_LOG_PROTOBUF; + header[CONTENT_LENGTH] = to_string(body.size()); + header[CONTENT_MD5] = CalcMD5(body); + header[X_LOG_APIVERSION] = LOG_API_VERSION; + header[X_LOG_SIGNATUREMETHOD] = HMAC_SHA1; + if (!compressType.empty()) { + header[X_LOG_COMPRESSTYPE] = compressType; + } + header[X_LOG_BODYRAWSIZE] = to_string(rawSize); + if (type == SLSClientManager::AuthType::ANONYMOUS) { + header[X_LOG_KEYPROVIDER] = MD5_SHA1_SALT_KEYPROVIDER; + } + + map parameterList; + string signature = GetUrlSignature(HTTP_POST, path, header, parameterList, body, accessKeySecret); + header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature; +} + SLSResponse PostLogStoreLogs(const string& accessKeyId, const string& accessKeySecret, SLSClientManager::AuthType type, @@ -303,6 +342,36 @@ SLSResponse PostMetricStoreLogs(const string& accessKeyId, return ParseHttpResponse(response); } +SLSResponse PostARMSBackendLogs(const std::string& accessKeyId, + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + const std::string& host, + bool httpsFlag, + const std::string& project, + const std::string& logstore, + const std::string& compressType, + const std::string& body, + size_t rawSize, + const std::string& subpath) { + map header; + PreparePostAPMBackendRequest(accessKeyId, + accessKeySecret, + type, + host, + false, // sync request always uses vip + project, + logstore, + compressType, + body, + rawSize, + subpath, + header); + HttpResponse response; + SendHttpRequest(make_unique(HTTP_POST, httpsFlag, host, httpsFlag ? 443 : 80, subpath, "", header, body), + response); + return ParseHttpResponse(response); +} + SLSResponse PutWebTracking(const string& host, bool httpsFlag, const string& logstore, diff --git a/core/plugin/flusher/sls/SLSClientManager.h b/core/plugin/flusher/sls/SLSClientManager.h index 42384c8a1d..209af576a5 100644 --- a/core/plugin/flusher/sls/SLSClientManager.h +++ b/core/plugin/flusher/sls/SLSClientManager.h @@ -91,6 +91,18 @@ void PreparePostMetricStoreLogsRequest(const std::string& accessKeyId, size_t rawSize, std::string& path, std::map& header); +void PreparePostAPMBackendRequest(const std::string& accessKeyId, + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + const std::string& host, + bool isHostIp, + const std::string& project, + const std::string& logstore, + const std::string& compressType, + const std::string& body, + size_t rawSize, + const std::string& path, + std::map& header); SLSResponse PostLogStoreLogs(const std::string& accessKeyId, const std::string& accessKeySecret, SLSClientManager::AuthType type, @@ -113,6 +125,17 @@ SLSResponse PostMetricStoreLogs(const std::string& accessKeyId, const std::string& compressType, const std::string& body, size_t rawSize); +SLSResponse PostARMSBackendLogs(const std::string& accessKeyId, + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + const std::string& host, + bool httpsFlag, + const std::string& project, + const std::string& logstore, + const std::string& compressType, + const std::string& body, + size_t rawSize, + const std::string& subpath); SLSResponse PutWebTracking(const std::string& host, bool httpsFlag, const std::string& logstore, diff --git a/core/protobuf/sls/logtail_buffer_meta.proto b/core/protobuf/sls/logtail_buffer_meta.proto index 131e4099d7..93c9917f3f 100644 --- a/core/protobuf/sls/logtail_buffer_meta.proto +++ b/core/protobuf/sls/logtail_buffer_meta.proto @@ -37,4 +37,5 @@ message LogtailBufferMeta optional SlsTelemetryType telemetrytype = 9; optional EndpointMode endpointmode = 10; optional string endpoint = 11; + optional string subpath = 12; } diff --git a/core/protobuf/sls/sls_logs.proto b/core/protobuf/sls/sls_logs.proto index 9f3aad5856..5535e6209e 100644 --- a/core/protobuf/sls/sls_logs.proto +++ b/core/protobuf/sls/sls_logs.proto @@ -27,6 +27,9 @@ enum SlsTelemetryType { SLS_TELEMETRY_TYPE_LOGS = 0; SLS_TELEMETRY_TYPE_METRICS = 1; + SLS_TELEMETRY_TYPE_APM_METRICS = 2; + SLS_TELEMETRY_TYPE_APM_TRACES = 3; + SLS_TELEMETRY_TYPE_APM_AGENTINFOS = 4; } message Log From 9a4a271a76f0d87a237ef9d5e235c073b0dfb33b Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Wed, 8 Jan 2025 18:11:13 +0800 Subject: [PATCH 02/12] update --- core/plugin/flusher/sls/FlusherSLS.cpp | 15 ++--- core/plugin/flusher/sls/SLSConstant.cpp | 5 ++ core/plugin/flusher/sls/SLSConstant.h | 5 ++ core/unittest/pipeline/PipelineUnittest.cpp | 74 +++++++++++++++++++++ 4 files changed, 91 insertions(+), 8 deletions(-) diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 285ca2daf3..703a1fdc7c 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -424,19 +424,19 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } else if (telemetryType == "metrics") { mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS : sls_logs::SLS_TELEMETRY_TYPE_LOGS; - }if (telemetryType == "arms_agentinfo") { - mSubpath = "/apm/meta/arms/v1/meta_log/AgentInfo"; - mLogstore = "__arms_default_agentinfo__"; + } else if (telemetryType == "arms_agentinfo") { + mSubpath = ARMS_AGENTINFOS_URL; + mLogstore = DUMMY_LOG_STORE; mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS; LOG_DEBUG(sLogger, ("successfully set agentinfo subpath", mSubpath) ("logstore", mLogstore)); } else if (telemetryType == "arms_metrics") { - mSubpath = "/apm/metric/arms/v1/metric_log"; - mLogstore = "__arms_default_metric__"; + mSubpath = ARMS_METRICS_URL; + mLogstore = DUMMY_LOG_STORE; mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS; LOG_DEBUG(sLogger, ("successfully set metric subpath", mSubpath) ("logstore", mLogstore)); } else if (telemetryType == "arms_traces") { - mSubpath = "/apm/trace/arms/v1/trace_log"; - mLogstore = "__arms_default_trace__"; + mSubpath = ARMS_TRACES_URL; + mLogstore = DUMMY_LOG_STORE; mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES; LOG_DEBUG(sLogger, ("successfully set trace subpath", mSubpath) ("logstore", mLogstore)); } else if (!telemetryType.empty() && telemetryType != "logs") { @@ -1271,7 +1271,6 @@ unique_ptr FlusherSLS::CreatePostArmsBackendRequest(const strin SLSSenderQueueItem* item, const std::string& subPath) const { - // string path; map header; PreparePostAPMBackendRequest(accessKeyId, accessKeySecret, diff --git a/core/plugin/flusher/sls/SLSConstant.cpp b/core/plugin/flusher/sls/SLSConstant.cpp index d81dbf284e..b8f111b59f 100644 --- a/core/plugin/flusher/sls/SLSConstant.cpp +++ b/core/plugin/flusher/sls/SLSConstant.cpp @@ -22,6 +22,11 @@ const string LOGSTORES = "/logstores"; const string METRICSTORES = "/prometheus"; const string HEALTH = "/health"; +const string ARMS_METRICS_URL = "/apm/metric/arms/v1/metric_log"; +const string ARMS_TRACES_URL = "/apm/trace/arms/v1/trace_log"; +const string ARMS_AGENTINFOS_URL = "/apm/meta/arms/v1/meta_log/AgentInfo"; +const string DUMMY_LOG_STORE = "___dummy___"; + const string LOGTAIL_USER_AGENT = "ali-log-logtail"; const string CONTENT_MD5 = "Content-MD5"; diff --git a/core/plugin/flusher/sls/SLSConstant.h b/core/plugin/flusher/sls/SLSConstant.h index 5874d5f2ec..a929e1941b 100644 --- a/core/plugin/flusher/sls/SLSConstant.h +++ b/core/plugin/flusher/sls/SLSConstant.h @@ -24,6 +24,11 @@ extern const std::string LOGSTORES; extern const std::string METRICSTORES; extern const std::string HEALTH; +extern const std::string ARMS_METRICS_URL; +extern const std::string ARMS_TRACES_URL; +extern const std::string ARMS_AGENTINFOS_URL; +extern const std::string DUMMY_LOG_STORE; + extern const std::string CONTENT_MD5; extern const std::string LOGTAIL_USER_AGENT; diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index 20cc535975..4868a60f34 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -51,6 +51,7 @@ class PipelineUnittest : public ::testing::Test { void TestFlushBatch() const; void TestInProcessingCount() const; void TestWaitAllItemsInProcessFinished() const; + void TestMultiFlusherAndRouter() const; protected: static void SetUpTestCase() { @@ -2945,6 +2946,77 @@ void PipelineUnittest::TestWaitAllItemsInProcessFinished() const { APSARA_TEST_EQUAL(std::future_status::ready, future.wait_for(std::chrono::seconds(0))); } + +void PipelineUnittest::TestMultiFlusherAndRouter() const { + unique_ptr configJson; + string configStr, errorMsg; + unique_ptr config; + unique_ptr pipeline; + // new pipeline + configStr = R"( + { + "global": { + "ProcessPriority": 1 + }, + "inputs": [ + { + "Type": "input_file", + "FilePaths": [ + "/home/test.log" + ] + } + ], + "flushers": [ + { + "Type": "flusher_sls", + "TelemetryType": "arms_traces", + "Project": "test_project", + "Logstore": "dummy", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "trace" + } + }, + { + "Type": "flusher_sls", + "TelemetryType": "arms_metrics", + "Project": "test_project", + "Logstore": "dummy", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "metric" + } + }, + { + "Type": "flusher_sls", + "TelemetryType": "arms_agentinfo", + "Project": "test_project", + "Logstore": "dummy", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "agent_info" + } + } + ] + } + )"; + configJson.reset(new Json::Value()); + APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); + config.reset(new PipelineConfig(configName, std::move(configJson))); + APSARA_TEST_TRUE(config->Parse()); + pipeline.reset(new Pipeline()); + APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); +} + UNIT_TEST_CASE(PipelineUnittest, OnSuccessfulInit) UNIT_TEST_CASE(PipelineUnittest, OnFailedInit) UNIT_TEST_CASE(PipelineUnittest, TestProcessQueue) @@ -2956,6 +3028,8 @@ UNIT_TEST_CASE(PipelineUnittest, TestSend) UNIT_TEST_CASE(PipelineUnittest, TestFlushBatch) UNIT_TEST_CASE(PipelineUnittest, TestInProcessingCount) UNIT_TEST_CASE(PipelineUnittest, TestWaitAllItemsInProcessFinished) +UNIT_TEST_CASE(PipelineUnittest, TestMultiFlusherAndRouter) + } // namespace logtail From d8673c49f9f90f0918fa577534630ad24c216695 Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Thu, 9 Jan 2025 09:42:16 +0800 Subject: [PATCH 03/12] fix --- core/plugin/flusher/sls/DiskBufferWriter.cpp | 26 ++-- core/plugin/flusher/sls/FlusherSLS.cpp | 132 ++++++++++--------- core/plugin/flusher/sls/FlusherSLS.h | 5 +- core/plugin/flusher/sls/SLSClientManager.cpp | 93 ++++++++----- core/plugin/flusher/sls/SLSClientManager.h | 50 +++---- core/plugin/flusher/sls/SLSConstant.cpp | 6 +- core/plugin/flusher/sls/SLSConstant.h | 6 +- core/unittest/pipeline/PipelineUnittest.cpp | 3 - 8 files changed, 181 insertions(+), 140 deletions(-) diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index d68ebe049c..5f422b26ff 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -772,7 +772,7 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) { bufferMeta.set_shardhashkey(data->mShardHashKey); bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType())); bufferMeta.set_telemetrytype(flusher->mTelemetryType); - bufferMeta.set_subpath(flusher->mSubpath); + bufferMeta.set_subpath(flusher->GetSubpath()); #ifdef __ENTERPRISE__ bufferMeta.set_endpointmode(GetEndpointMode(flusher->mEndpointMode)); #endif @@ -892,17 +892,19 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe return response; } LOG_DEBUG(sLogger, ("subpath", bufferMeta.subpath()) ("telemetry type", static_cast(bufferMeta.telemetrytype()))); - return PostARMSBackendLogs(accessKeyId, - accessKeySecret, - type, - host, - httpsFlag, - bufferMeta.project(), - bufferMeta.logstore(), - GetSLSCompressTypeString(bufferMeta.compresstype()), - logData, - bufferMeta.rawsize(), - bufferMeta.subpath()); + return PostAPMBackendLogs(accessKeyId, + accessKeySecret, + type, + host, + httpsFlag, + bufferMeta.project(), + bufferMeta.logstore(), + GetSLSCompressTypeString(bufferMeta.compresstype()), + dataType, + logData, + bufferMeta.rawsize(), + bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : "", + bufferMeta.subpath()); } else { return PostLogStoreLogs(accessKeyId, accessKeySecret, diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 703a1fdc7c..6e05d72235 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -294,16 +294,57 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mContext->GetRegion()); } + // TelemetryType + string telemetryType; + if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) { + PARAM_WARNING_DEFAULT(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + "logs", + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } else if (telemetryType == "metrics") { + mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS + : sls_logs::SLS_TELEMETRY_TYPE_LOGS; + } else if (telemetryType == "arms_agentinfo") { + mSubpath = APM_AGENTINFOS_URL; + mLogstore = DUMMY_LOG_STORE; + mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS; + } else if (telemetryType == "arms_metrics") { + mSubpath = APM_METRICS_URL; + mLogstore = DUMMY_LOG_STORE; + mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS; + } else if (telemetryType == "arms_traces") { + mSubpath = APM_TRACES_URL; + mLogstore = DUMMY_LOG_STORE; + mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES; + } else if (!telemetryType.empty() && telemetryType != "logs") { + PARAM_WARNING_DEFAULT(mContext->GetLogger(), + mContext->GetAlarm(), + "string param TelemetryType is not valid", + "logs", + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } + // Logstore - if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { - PARAM_ERROR_RETURN(mContext->GetLogger(), - mContext->GetAlarm(), - errorMsg, - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); + if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS) { + if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { + PARAM_ERROR_RETURN(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } } // Region @@ -409,47 +450,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } #endif - // TelemetryType - string telemetryType; - if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) { - PARAM_WARNING_DEFAULT(mContext->GetLogger(), - mContext->GetAlarm(), - errorMsg, - "logs", - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); - } else if (telemetryType == "metrics") { - mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS - : sls_logs::SLS_TELEMETRY_TYPE_LOGS; - } else if (telemetryType == "arms_agentinfo") { - mSubpath = ARMS_AGENTINFOS_URL; - mLogstore = DUMMY_LOG_STORE; - mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS; - LOG_DEBUG(sLogger, ("successfully set agentinfo subpath", mSubpath) ("logstore", mLogstore)); - } else if (telemetryType == "arms_metrics") { - mSubpath = ARMS_METRICS_URL; - mLogstore = DUMMY_LOG_STORE; - mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS; - LOG_DEBUG(sLogger, ("successfully set metric subpath", mSubpath) ("logstore", mLogstore)); - } else if (telemetryType == "arms_traces") { - mSubpath = ARMS_TRACES_URL; - mLogstore = DUMMY_LOG_STORE; - mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES; - LOG_DEBUG(sLogger, ("successfully set trace subpath", mSubpath) ("logstore", mLogstore)); - } else if (!telemetryType.empty() && telemetryType != "logs") { - PARAM_WARNING_DEFAULT(mContext->GetLogger(), - mContext->GetAlarm(), - "string param TelemetryType is not valid", - "logs", - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); - } + // Batch const char* key = "Batch"; @@ -1271,19 +1272,28 @@ unique_ptr FlusherSLS::CreatePostArmsBackendRequest(const strin SLSSenderQueueItem* item, const std::string& subPath) const { + optional seqId; + if (item->mExactlyOnceCheckpoint) { + seqId = item->mExactlyOnceCheckpoint->data.sequence_id(); + } + string query; map header; PreparePostAPMBackendRequest(accessKeyId, - accessKeySecret, - type, - item->mCurrentHost, - item->mRealIpFlag, - mProject, - item->mLogstore, - CompressTypeToString(mCompressor->GetCompressType()), - item->mData, - item->mRawSize, - subPath, - header); + accessKeySecret, + type, + item->mCurrentHost, + item->mRealIpFlag, + mProject, + item->mLogstore, + CompressTypeToString(mCompressor->GetCompressType()), + item->mType, + item->mData, + item->mRawSize, + item->mShardHashKey, + seqId, + mSubpath, + query, + header); bool httpsFlag = SLSClientManager::GetInstance()->UsingHttps(mRegion); return make_unique(HTTP_POST, httpsFlag, diff --git a/core/plugin/flusher/sls/FlusherSLS.h b/core/plugin/flusher/sls/FlusherSLS.h index 68bf54573e..72e73d16c7 100644 --- a/core/plugin/flusher/sls/FlusherSLS.h +++ b/core/plugin/flusher/sls/FlusherSLS.h @@ -79,9 +79,10 @@ class FlusherSLS : public HttpFlusher { // for use of Go pipeline and shennong bool Send(std::string&& data, const std::string& shardHashKey, const std::string& logstore = ""); + std::string GetSubpath() const { return mSubpath; } + std::string mProject; std::string mLogstore; - std::string mSubpath; std::string mRegion; std::string mAliuid; #ifdef __ENTERPRISE__ @@ -137,6 +138,8 @@ class FlusherSLS : public HttpFlusher { SLSSenderQueueItem* item, const std::string& subPath) const; + std::string mSubpath; + Batcher mBatcher; std::unique_ptr mGroupSerializer; std::unique_ptr>> mGroupListSerializer; diff --git a/core/plugin/flusher/sls/SLSClientManager.cpp b/core/plugin/flusher/sls/SLSClientManager.cpp index de486a5190..dc2290aef9 100644 --- a/core/plugin/flusher/sls/SLSClientManager.cpp +++ b/core/plugin/flusher/sls/SLSClientManager.cpp @@ -237,19 +237,22 @@ void PreparePostMetricStoreLogsRequest(const string& accessKeyId, } void PreparePostAPMBackendRequest(const string& accessKeyId, - const string& accessKeySecret, - SLSClientManager::AuthType type, - const string& host, - bool isHostIp, - const string& project, - const string& logstore, - const string& compressType, - const string& body, - size_t rawSize, - const string& path, - map& header) { + const string& accessKeySecret, + SLSClientManager::AuthType type, + const string& host, + bool isHostIp, + const string& project, + const string& logstore, + const string& compressType, + RawDataType dataType, + const string& body, + size_t rawSize, + const string& shardHashKey, + optional seqId, + const string& path, + string& query, + map& header) { - LOG_DEBUG(sLogger, ("entering, subpath", path) ("project", project)); if (isHostIp) { header[HOST] = project + "." + host; } else { @@ -265,12 +268,25 @@ void PreparePostAPMBackendRequest(const string& accessKeyId, if (!compressType.empty()) { header[X_LOG_COMPRESSTYPE] = compressType; } - header[X_LOG_BODYRAWSIZE] = to_string(rawSize); + if (dataType == RawDataType::EVENT_GROUP) { + header[X_LOG_BODYRAWSIZE] = to_string(rawSize); + } else { + header[X_LOG_BODYRAWSIZE] = to_string(body.size()); + header[X_LOG_MODE] = LOG_MODE_BATCH_GROUP; + } if (type == SLSClientManager::AuthType::ANONYMOUS) { header[X_LOG_KEYPROVIDER] = MD5_SHA1_SALT_KEYPROVIDER; } map parameterList; + if (!shardHashKey.empty()) { + parameterList["key"] = shardHashKey; + if (seqId.has_value()) { + parameterList["seqid"] = to_string(seqId.value()); + } + } + query = GetQueryString(parameterList); + string signature = GetUrlSignature(HTTP_POST, path, header, parameterList, body, accessKeySecret); header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature; } @@ -342,30 +358,37 @@ SLSResponse PostMetricStoreLogs(const string& accessKeyId, return ParseHttpResponse(response); } -SLSResponse PostARMSBackendLogs(const std::string& accessKeyId, - const std::string& accessKeySecret, - SLSClientManager::AuthType type, - const std::string& host, - bool httpsFlag, - const std::string& project, - const std::string& logstore, - const std::string& compressType, - const std::string& body, - size_t rawSize, - const std::string& subpath) { +SLSResponse PostAPMBackendLogs(const string& accessKeyId, + const string& accessKeySecret, + SLSClientManager::AuthType type, + const string& host, + bool httpsFlag, + const string& project, + const string& logstore, + const string& compressType, + RawDataType dataType, + const string& body, + size_t rawSize, + const string& shardHashKey, + const std::string& subpath) { + string query; map header; PreparePostAPMBackendRequest(accessKeyId, - accessKeySecret, - type, - host, - false, // sync request always uses vip - project, - logstore, - compressType, - body, - rawSize, - subpath, - header); + accessKeySecret, + type, + host, + false, // sync request always uses vip + project, + logstore, + compressType, + dataType, + body, + rawSize, + shardHashKey, + nullopt, // sync request does not support exactly-once + subpath, + query, + header); HttpResponse response; SendHttpRequest(make_unique(HTTP_POST, httpsFlag, host, httpsFlag ? 443 : 80, subpath, "", header, body), response); diff --git a/core/plugin/flusher/sls/SLSClientManager.h b/core/plugin/flusher/sls/SLSClientManager.h index 209af576a5..52d46b3fba 100644 --- a/core/plugin/flusher/sls/SLSClientManager.h +++ b/core/plugin/flusher/sls/SLSClientManager.h @@ -92,17 +92,21 @@ void PreparePostMetricStoreLogsRequest(const std::string& accessKeyId, std::string& path, std::map& header); void PreparePostAPMBackendRequest(const std::string& accessKeyId, - const std::string& accessKeySecret, - SLSClientManager::AuthType type, - const std::string& host, - bool isHostIp, - const std::string& project, - const std::string& logstore, - const std::string& compressType, - const std::string& body, - size_t rawSize, - const std::string& path, - std::map& header); + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + const std::string& host, + bool isHostIp, + const std::string& project, + const std::string& logstore, + const std::string& compressType, + RawDataType dataType, + const std::string& body, + size_t rawSize, + const std::string& shardHashKey, + std::optional seqId, + const std::string& path, + std::string& query, + std::map& header); SLSResponse PostLogStoreLogs(const std::string& accessKeyId, const std::string& accessKeySecret, SLSClientManager::AuthType type, @@ -125,17 +129,19 @@ SLSResponse PostMetricStoreLogs(const std::string& accessKeyId, const std::string& compressType, const std::string& body, size_t rawSize); -SLSResponse PostARMSBackendLogs(const std::string& accessKeyId, - const std::string& accessKeySecret, - SLSClientManager::AuthType type, - const std::string& host, - bool httpsFlag, - const std::string& project, - const std::string& logstore, - const std::string& compressType, - const std::string& body, - size_t rawSize, - const std::string& subpath); +SLSResponse PostAPMBackendLogs(const std::string& accessKeyId, + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + const std::string& host, + bool httpsFlag, + const std::string& project, + const std::string& logstore, + const std::string& compressType, + RawDataType dataType, + const std::string& body, + size_t rawSize, + const std::string& shardHashKey, + const std::string& subpath); SLSResponse PutWebTracking(const std::string& host, bool httpsFlag, const std::string& logstore, diff --git a/core/plugin/flusher/sls/SLSConstant.cpp b/core/plugin/flusher/sls/SLSConstant.cpp index b8f111b59f..6b9da88fc9 100644 --- a/core/plugin/flusher/sls/SLSConstant.cpp +++ b/core/plugin/flusher/sls/SLSConstant.cpp @@ -22,9 +22,9 @@ const string LOGSTORES = "/logstores"; const string METRICSTORES = "/prometheus"; const string HEALTH = "/health"; -const string ARMS_METRICS_URL = "/apm/metric/arms/v1/metric_log"; -const string ARMS_TRACES_URL = "/apm/trace/arms/v1/trace_log"; -const string ARMS_AGENTINFOS_URL = "/apm/meta/arms/v1/meta_log/AgentInfo"; +const string APM_METRICS_URL = "/apm/metric/arms/v1/metric_log"; +const string APM_TRACES_URL = "/apm/trace/arms/v1/trace_log"; +const string APM_AGENTINFOS_URL = "/apm/meta/arms/v1/meta_log/AgentInfo"; const string DUMMY_LOG_STORE = "___dummy___"; const string LOGTAIL_USER_AGENT = "ali-log-logtail"; diff --git a/core/plugin/flusher/sls/SLSConstant.h b/core/plugin/flusher/sls/SLSConstant.h index a929e1941b..75a5d9e451 100644 --- a/core/plugin/flusher/sls/SLSConstant.h +++ b/core/plugin/flusher/sls/SLSConstant.h @@ -24,9 +24,9 @@ extern const std::string LOGSTORES; extern const std::string METRICSTORES; extern const std::string HEALTH; -extern const std::string ARMS_METRICS_URL; -extern const std::string ARMS_TRACES_URL; -extern const std::string ARMS_AGENTINFOS_URL; +extern const std::string APM_METRICS_URL; +extern const std::string APM_TRACES_URL; +extern const std::string APM_AGENTINFOS_URL; extern const std::string DUMMY_LOG_STORE; extern const std::string CONTENT_MD5; diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index 4868a60f34..fc17768739 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -2971,7 +2971,6 @@ void PipelineUnittest::TestMultiFlusherAndRouter() const { "Type": "flusher_sls", "TelemetryType": "arms_traces", "Project": "test_project", - "Logstore": "dummy", "Region": "test_region", "Endpoint": "test_endpoint", "Match": { @@ -2984,7 +2983,6 @@ void PipelineUnittest::TestMultiFlusherAndRouter() const { "Type": "flusher_sls", "TelemetryType": "arms_metrics", "Project": "test_project", - "Logstore": "dummy", "Region": "test_region", "Endpoint": "test_endpoint", "Match": { @@ -2997,7 +2995,6 @@ void PipelineUnittest::TestMultiFlusherAndRouter() const { "Type": "flusher_sls", "TelemetryType": "arms_agentinfo", "Project": "test_project", - "Logstore": "dummy", "Region": "test_region", "Endpoint": "test_endpoint", "Match": { From f8c89d025cdeb74c37f2caf84bfe5c1834fc4e28 Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Fri, 10 Jan 2025 11:44:02 +0800 Subject: [PATCH 04/12] update --- core/plugin/flusher/sls/DiskBufferWriter.cpp | 69 +++++++++----------- core/plugin/flusher/sls/FlusherSLS.cpp | 23 +++---- core/plugin/flusher/sls/FlusherSLS.h | 10 +-- 3 files changed, 49 insertions(+), 53 deletions(-) diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index 5f422b26ff..d746ea723f 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -869,43 +869,9 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe } else { dataType = RawDataType::EVENT_GROUP; } - if (bufferMeta.has_telemetrytype() && bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { - return PostMetricStoreLogs(accessKeyId, - accessKeySecret, - type, - host, - httpsFlag, - bufferMeta.project(), - bufferMeta.logstore(), - GetSLSCompressTypeString(bufferMeta.compresstype()), - logData, - bufferMeta.rawsize()); - } else if (bufferMeta.has_telemetrytype() && - (bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS || - bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES || - bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS)) { - if (!bufferMeta.has_subpath() || bufferMeta.subpath().empty()) { - LOG_ERROR(sLogger, ("don't set supath, telemetry type", static_cast(bufferMeta.telemetrytype()))); - SLSResponse response; - response.mErrorCode = LOGE_PARAMETER_INVALID; - response.mErrorMsg = kNoSubpathErrorMsg; - return response; - } - LOG_DEBUG(sLogger, ("subpath", bufferMeta.subpath()) ("telemetry type", static_cast(bufferMeta.telemetrytype()))); - return PostAPMBackendLogs(accessKeyId, - accessKeySecret, - type, - host, - httpsFlag, - bufferMeta.project(), - bufferMeta.logstore(), - GetSLSCompressTypeString(bufferMeta.compresstype()), - dataType, - logData, - bufferMeta.rawsize(), - bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : "", - bufferMeta.subpath()); - } else { + + if (!bufferMeta.has_telemetrytype() || bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_LOGS) { + // process logs return PostLogStoreLogs(accessKeyId, accessKeySecret, type, @@ -918,6 +884,35 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe logData, bufferMeta.rawsize(), bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : ""); + } else if (bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { + // process apm + return PostMetricStoreLogs(accessKeyId, + accessKeySecret, + type, + host, + httpsFlag, + bufferMeta.project(), + bufferMeta.logstore(), + GetSLSCompressTypeString(bufferMeta.compresstype()), + logData, + bufferMeta.rawsize()); + } else if (bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS + || bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES + || bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS) { + // process apm + return PostAPMBackendLogs(accessKeyId, + accessKeySecret, + type, + host, + httpsFlag, + bufferMeta.project(), + bufferMeta.logstore(), + GetSLSCompressTypeString(bufferMeta.compresstype()), + dataType, + logData, + bufferMeta.rawsize(), + bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : "", + bufferMeta.subpath()); } } diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 6e05d72235..db2b75d14a 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -297,6 +297,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline // TelemetryType string telemetryType; if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) { + // TelemetryType not set (for log scenarios) PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, @@ -307,21 +308,20 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mContext->GetLogstoreName(), mContext->GetRegion()); } else if (telemetryType == "metrics") { + // TelemetryType set to metrics mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS : sls_logs::SLS_TELEMETRY_TYPE_LOGS; } else if (telemetryType == "arms_agentinfo") { mSubpath = APM_AGENTINFOS_URL; - mLogstore = DUMMY_LOG_STORE; mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS; } else if (telemetryType == "arms_metrics") { mSubpath = APM_METRICS_URL; - mLogstore = DUMMY_LOG_STORE; mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS; } else if (telemetryType == "arms_traces") { mSubpath = APM_TRACES_URL; - mLogstore = DUMMY_LOG_STORE; mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES; } else if (!telemetryType.empty() && telemetryType != "logs") { + // TelemetryType invalid PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), "string param TelemetryType is not valid", @@ -334,7 +334,8 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } // Logstore - if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS) { + if (telemetryType.empty() || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { + // log and metric if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { PARAM_ERROR_RETURN(mContext->GetLogger(), mContext->GetAlarm(), @@ -345,6 +346,10 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mContext->GetLogstoreName(), mContext->GetRegion()); } + } else if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS || + mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES){ + // apm + mLogstore = DUMMY_LOG_STORE; } // Region @@ -686,7 +691,7 @@ bool FlusherSLS::BuildRequest(SenderQueueItem* item, unique_ptr case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS: case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS: case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES: - req = CreatePostArmsBackendRequest(accessKeyId, accessKeySecret, type, data, mSubpath); + req = CreatePostAPMBackendRequest(accessKeyId, accessKeySecret, type, data, mSubpath); break; default: break; @@ -1266,16 +1271,12 @@ unique_ptr FlusherSLS::CreatePostMetricStoreLogsRequest(const s 1); } -unique_ptr FlusherSLS::CreatePostArmsBackendRequest(const string& accessKeyId, +unique_ptr FlusherSLS::CreatePostAPMBackendRequest(const string& accessKeyId, const string& accessKeySecret, SLSClientManager::AuthType type, SLSSenderQueueItem* item, const std::string& subPath) const { - optional seqId; - if (item->mExactlyOnceCheckpoint) { - seqId = item->mExactlyOnceCheckpoint->data.sequence_id(); - } string query; map header; PreparePostAPMBackendRequest(accessKeyId, @@ -1290,7 +1291,7 @@ unique_ptr FlusherSLS::CreatePostArmsBackendRequest(const strin item->mData, item->mRawSize, item->mShardHashKey, - seqId, + nullopt, mSubpath, query, header); diff --git a/core/plugin/flusher/sls/FlusherSLS.h b/core/plugin/flusher/sls/FlusherSLS.h index 72e73d16c7..b26919e213 100644 --- a/core/plugin/flusher/sls/FlusherSLS.h +++ b/core/plugin/flusher/sls/FlusherSLS.h @@ -132,11 +132,11 @@ class FlusherSLS : public HttpFlusher { const std::string& accessKeySecret, SLSClientManager::AuthType type, SLSSenderQueueItem* item) const; - std::unique_ptr CreatePostArmsBackendRequest(const std::string& accessKeyId, - const std::string& accessKeySecret, - SLSClientManager::AuthType type, - SLSSenderQueueItem* item, - const std::string& subPath) const; + std::unique_ptr CreatePostAPMBackendRequest(const std::string& accessKeyId, + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + SLSSenderQueueItem* item, + const std::string& subPath) const; std::string mSubpath; From d669238337a134a697951a5a1949420ba2a2f84a Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Fri, 10 Jan 2025 12:15:54 +0800 Subject: [PATCH 05/12] update --- core/plugin/flusher/sls/DiskBufferWriter.cpp | 23 +++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index d746ea723f..038f71cd95 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -870,8 +870,10 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe dataType = RawDataType::EVENT_GROUP; } - if (!bufferMeta.has_telemetrytype() || bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_LOGS) { - // process logs + auto telemetryType = bufferMeta.has_telemetrytype() ? bufferMeta.telemetrytype() : sls_logs::SLS_TELEMETRY_TYPE_LOGS; + switch (telemetryType) + { + case sls_logs::SLS_TELEMETRY_TYPE_LOGS: return PostLogStoreLogs(accessKeyId, accessKeySecret, type, @@ -884,8 +886,7 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe logData, bufferMeta.rawsize(), bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : ""); - } else if (bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { - // process apm + case sls_logs::SLS_TELEMETRY_TYPE_METRICS: return PostMetricStoreLogs(accessKeyId, accessKeySecret, type, @@ -896,10 +897,9 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe GetSLSCompressTypeString(bufferMeta.compresstype()), logData, bufferMeta.rawsize()); - } else if (bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS - || bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES - || bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS) { - // process apm + case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS: + case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES: + case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS: return PostAPMBackendLogs(accessKeyId, accessKeySecret, type, @@ -913,6 +913,13 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe bufferMeta.rawsize(), bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : "", bufferMeta.subpath()); + default: + // should not happen + LOG_ERROR(sLogger, ("Unhandled telemetry type", " should not happen")); + SLSResponse response; + response.mErrorCode = LOGE_REQUEST_ERROR; + response.mErrorMsg = "Unhandled telemetry type"; + return response; } } From 8e2c90070ad9938e50d054bf0e0468cc742c9bbc Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Fri, 10 Jan 2025 12:17:47 +0800 Subject: [PATCH 06/12] update --- core/plugin/flusher/sls/DiskBufferWriter.cpp | 101 ++++++++++--------- 1 file changed, 51 insertions(+), 50 deletions(-) diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index 038f71cd95..352a83edb6 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -870,56 +870,57 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe dataType = RawDataType::EVENT_GROUP; } - auto telemetryType = bufferMeta.has_telemetrytype() ? bufferMeta.telemetrytype() : sls_logs::SLS_TELEMETRY_TYPE_LOGS; - switch (telemetryType) - { - case sls_logs::SLS_TELEMETRY_TYPE_LOGS: - return PostLogStoreLogs(accessKeyId, - accessKeySecret, - type, - host, - httpsFlag, - bufferMeta.project(), - bufferMeta.logstore(), - GetSLSCompressTypeString(bufferMeta.compresstype()), - dataType, - logData, - bufferMeta.rawsize(), - bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : ""); - case sls_logs::SLS_TELEMETRY_TYPE_METRICS: - return PostMetricStoreLogs(accessKeyId, - accessKeySecret, - type, - host, - httpsFlag, - bufferMeta.project(), - bufferMeta.logstore(), - GetSLSCompressTypeString(bufferMeta.compresstype()), - logData, - bufferMeta.rawsize()); - case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS: - case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES: - case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS: - return PostAPMBackendLogs(accessKeyId, - accessKeySecret, - type, - host, - httpsFlag, - bufferMeta.project(), - bufferMeta.logstore(), - GetSLSCompressTypeString(bufferMeta.compresstype()), - dataType, - logData, - bufferMeta.rawsize(), - bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : "", - bufferMeta.subpath()); - default: - // should not happen - LOG_ERROR(sLogger, ("Unhandled telemetry type", " should not happen")); - SLSResponse response; - response.mErrorCode = LOGE_REQUEST_ERROR; - response.mErrorMsg = "Unhandled telemetry type"; - return response; + auto telemetryType + = bufferMeta.has_telemetrytype() ? bufferMeta.telemetrytype() : sls_logs::SLS_TELEMETRY_TYPE_LOGS; + switch (telemetryType) { + case sls_logs::SLS_TELEMETRY_TYPE_LOGS: + return PostLogStoreLogs(accessKeyId, + accessKeySecret, + type, + host, + httpsFlag, + bufferMeta.project(), + bufferMeta.logstore(), + GetSLSCompressTypeString(bufferMeta.compresstype()), + dataType, + logData, + bufferMeta.rawsize(), + bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : ""); + case sls_logs::SLS_TELEMETRY_TYPE_METRICS: + return PostMetricStoreLogs(accessKeyId, + accessKeySecret, + type, + host, + httpsFlag, + bufferMeta.project(), + bufferMeta.logstore(), + GetSLSCompressTypeString(bufferMeta.compresstype()), + logData, + bufferMeta.rawsize()); + case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS: + case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES: + case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS: + return PostAPMBackendLogs(accessKeyId, + accessKeySecret, + type, + host, + httpsFlag, + bufferMeta.project(), + bufferMeta.logstore(), + GetSLSCompressTypeString(bufferMeta.compresstype()), + dataType, + logData, + bufferMeta.rawsize(), + bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : "", + bufferMeta.subpath()); + default: { + // should not happen + LOG_ERROR(sLogger, ("Unhandled telemetry type", " should not happen")); + SLSResponse response; + response.mErrorCode = LOGE_REQUEST_ERROR; + response.mErrorMsg = "Unhandled telemetry type"; + return response; + } } } From 1dd89df20074c28eb35bfb379634631ba00d97c4 Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Fri, 10 Jan 2025 13:51:58 +0800 Subject: [PATCH 07/12] clang format && add unittest --- core/plugin/flusher/sls/SLSClientManager.cpp | 90 +++--- core/plugin/flusher/sls/SLSClientManager.h | 54 ++-- core/unittest/flusher/FlusherSLSUnittest.cpp | 304 +++++++++++++++++++ 3 files changed, 376 insertions(+), 72 deletions(-) diff --git a/core/plugin/flusher/sls/SLSClientManager.cpp b/core/plugin/flusher/sls/SLSClientManager.cpp index dc2290aef9..0c2138fe4e 100644 --- a/core/plugin/flusher/sls/SLSClientManager.cpp +++ b/core/plugin/flusher/sls/SLSClientManager.cpp @@ -237,22 +237,21 @@ void PreparePostMetricStoreLogsRequest(const string& accessKeyId, } void PreparePostAPMBackendRequest(const string& accessKeyId, - const string& accessKeySecret, - SLSClientManager::AuthType type, - const string& host, - bool isHostIp, - const string& project, - const string& logstore, - const string& compressType, - RawDataType dataType, - const string& body, - size_t rawSize, - const string& shardHashKey, - optional seqId, - const string& path, - string& query, - map& header) { - + const string& accessKeySecret, + SLSClientManager::AuthType type, + const string& host, + bool isHostIp, + const string& project, + const string& logstore, + const string& compressType, + RawDataType dataType, + const string& body, + size_t rawSize, + const string& shardHashKey, + optional seqId, + const string& path, + string& query, + map& header) { if (isHostIp) { header[HOST] = project + "." + host; } else { @@ -359,39 +358,40 @@ SLSResponse PostMetricStoreLogs(const string& accessKeyId, } SLSResponse PostAPMBackendLogs(const string& accessKeyId, - const string& accessKeySecret, - SLSClientManager::AuthType type, - const string& host, - bool httpsFlag, - const string& project, - const string& logstore, - const string& compressType, - RawDataType dataType, - const string& body, - size_t rawSize, - const string& shardHashKey, - const std::string& subpath) { + const string& accessKeySecret, + SLSClientManager::AuthType type, + const string& host, + bool httpsFlag, + const string& project, + const string& logstore, + const string& compressType, + RawDataType dataType, + const string& body, + size_t rawSize, + const string& shardHashKey, + const std::string& subpath) { string query; map header; PreparePostAPMBackendRequest(accessKeyId, - accessKeySecret, - type, - host, - false, // sync request always uses vip - project, - logstore, - compressType, - dataType, - body, - rawSize, - shardHashKey, - nullopt, // sync request does not support exactly-once - subpath, - query, - header); + accessKeySecret, + type, + host, + false, // sync request always uses vip + project, + logstore, + compressType, + dataType, + body, + rawSize, + shardHashKey, + nullopt, // sync request does not support exactly-once + subpath, + query, + header); HttpResponse response; - SendHttpRequest(make_unique(HTTP_POST, httpsFlag, host, httpsFlag ? 443 : 80, subpath, "", header, body), - response); + SendHttpRequest( + make_unique(HTTP_POST, httpsFlag, host, httpsFlag ? 443 : 80, subpath, "", header, body), + response); return ParseHttpResponse(response); } diff --git a/core/plugin/flusher/sls/SLSClientManager.h b/core/plugin/flusher/sls/SLSClientManager.h index 52d46b3fba..9210be252e 100644 --- a/core/plugin/flusher/sls/SLSClientManager.h +++ b/core/plugin/flusher/sls/SLSClientManager.h @@ -92,21 +92,21 @@ void PreparePostMetricStoreLogsRequest(const std::string& accessKeyId, std::string& path, std::map& header); void PreparePostAPMBackendRequest(const std::string& accessKeyId, - const std::string& accessKeySecret, - SLSClientManager::AuthType type, - const std::string& host, - bool isHostIp, - const std::string& project, - const std::string& logstore, - const std::string& compressType, - RawDataType dataType, - const std::string& body, - size_t rawSize, - const std::string& shardHashKey, - std::optional seqId, - const std::string& path, - std::string& query, - std::map& header); + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + const std::string& host, + bool isHostIp, + const std::string& project, + const std::string& logstore, + const std::string& compressType, + RawDataType dataType, + const std::string& body, + size_t rawSize, + const std::string& shardHashKey, + std::optional seqId, + const std::string& path, + std::string& query, + std::map& header); SLSResponse PostLogStoreLogs(const std::string& accessKeyId, const std::string& accessKeySecret, SLSClientManager::AuthType type, @@ -130,18 +130,18 @@ SLSResponse PostMetricStoreLogs(const std::string& accessKeyId, const std::string& body, size_t rawSize); SLSResponse PostAPMBackendLogs(const std::string& accessKeyId, - const std::string& accessKeySecret, - SLSClientManager::AuthType type, - const std::string& host, - bool httpsFlag, - const std::string& project, - const std::string& logstore, - const std::string& compressType, - RawDataType dataType, - const std::string& body, - size_t rawSize, - const std::string& shardHashKey, - const std::string& subpath); + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + const std::string& host, + bool httpsFlag, + const std::string& project, + const std::string& logstore, + const std::string& compressType, + RawDataType dataType, + const std::string& body, + size_t rawSize, + const std::string& shardHashKey, + const std::string& subpath); SLSResponse PutWebTracking(const std::string& host, bool httpsFlag, const std::string& logstore, diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index 073b859441..26e5eebb02 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -14,6 +14,7 @@ #include #include +#include #include "json/json.h" @@ -63,6 +64,9 @@ class FlusherSLSUnittest : public testing::Test { void TestFlushAll(); void TestAddPackId(); void OnGoPipelineSend(); + void TestSendAPMMetrics(); + void TestSendAPMTraces(); + void TestSendAPMAgentInfos(); protected: static void SetUpTestCase() { @@ -1706,6 +1710,303 @@ void FlusherSLSUnittest::OnGoPipelineSend() { } } + +std::string GenerateRandomString(size_t length) { + const std::string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + std::random_device rd; + std::mt19937 generator(rd()); + std::uniform_int_distribution<> distribution(0, chars.size() - 1); + std::string result; + for (size_t i = 0; i < length; ++i) { + result += chars[distribution(generator)]; + } + return result; +} +void FlusherSLSUnittest::TestSendAPMTraces() { + Json::Value configJson, optionalGoPipeline; + string configStr, errorMsg; + configStr = R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_traces", + "Aliuid": "1108555361245511", + "Project": "proj-xtrace-505959c38776d9324945dbff709582-cn-hangzhou", + "Region": "cn-hangzhou", + "Endpoint": "pub-cn-hangzhou-staging.log.aliyuncs.com", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "trace" + } + } + )"; + ParseJsonTable(configStr, configJson, errorMsg); + FlusherSLS flusher; + flusher.SetContext(ctx); + flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); + flusher.Init(configJson, optionalGoPipeline); + SLSSenderQueueItem item("hello, world!", 100, &flusher, flusher.GetQueueKey(), flusher.mLogstore); + bool keepItem = false; + std::string errMsg = ""; + unique_ptr req; + APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); + { + auto now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + auto seconds = std::chrono::duration_cast(duration).count(); + auto nano = std::chrono::duration_cast(duration).count(); + std::vector> items; + // construct vector + // 1000 timeseries for app + std::vector app_ids = { + "eeeb8df999f59f569da84d27fa408a94", + "deddf8ef215107d8fd37540ac4e3291b", + "52abe1564d8ee3fea66e9302fc21d80d", + "87f79be5ab74d72b4a10b62c02dc7f34", + "1796627f8e0b7fbba042c145820311f9" + }; + std::vector service_name = { + "test-service-1", + "test-service-2", + "test-service-3", + "test-service-4", + "test-service-5" + }; + for (size_t i = 0; i < app_ids.size(); i ++) { + std::shared_ptr mSourceBuffer = std::make_shared();; + PipelineEventGroup mTestEventGroup(mSourceBuffer); + mTestEventGroup.SetTag(std::string("serviceName"), service_name[i]); + mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); + mTestEventGroup.SetTag(std::string("source_ip"), "10.54.0.55"); + mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); + mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); + mTestEventGroup.SetTag(std::string("data_type"), std::string("trace")); + for (size_t j = 0 ; j < 25; j ++) { + auto spanEvent = mTestEventGroup.AddSpanEvent(); + // spanEvent->SetScopeTag(); + spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); + spanEvent->SetTag(std::string("workloadKind"), std::string("faceless")); + spanEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); + spanEvent->SetTag(std::string("host"), std::string("10.54.0.33")); + spanEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local/" + std::to_string(j))); + spanEvent->SetTag(std::string("rpcType"), std::string("0")); + spanEvent->SetTag(std::string("callType"), std::string("http")); + spanEvent->SetTag(std::string("statusCode"), std::string("200")); + spanEvent->SetTag(std::string("version"), std::string("HTTP1.1")); + spanEvent->SetName("/oneagent/qianlu/local/" + std::to_string(j)); + spanEvent->SetKind(SpanEvent::Kind::Server); + std::string trace_id = GenerateRandomString(32); + std::string span_id = GenerateRandomString(16); + spanEvent->SetSpanId(span_id); + spanEvent->SetTraceId(trace_id); + spanEvent->SetStartTimeNs(nano - 5e6); + spanEvent->SetEndTimeNs(nano); + spanEvent->SetTimestamp(seconds); + } + for (size_t j = 0 ; j < 25; j ++) { + auto spanEvent = mTestEventGroup.AddSpanEvent(); + spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); + spanEvent->SetTag(std::string("workloadKind"), std::string("faceless")); + spanEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); + spanEvent->SetTag(std::string("host"), std::string("10.54.0.33")); + spanEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local/" + std::to_string(j))); + spanEvent->SetTag(std::string("rpcType"), std::string("25")); + spanEvent->SetTag(std::string("callType"), std::string("http-client")); + spanEvent->SetTag(std::string("statusCode"), std::string("200")); + spanEvent->SetTag(std::string("version"), std::string("HTTP1.1")); + spanEvent->SetName("/oneagent/qianlu/local/" + std::to_string(j)); + spanEvent->SetKind(SpanEvent::Kind::Client); + std::string trace_id = GenerateRandomString(32); + std::string span_id = GenerateRandomString(16); + spanEvent->SetSpanId(span_id); + spanEvent->SetTraceId(trace_id); + spanEvent->SetStartTimeNs(nano - 5e9); + spanEvent->SetEndTimeNs(nano); + spanEvent->SetTimestamp(seconds); + } + // TODO flush + flusher.Send(std::move(mTestEventGroup)); + } + } + flusher.FlushAll(); + vector res; + SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); + APSARA_TEST_EQUAL(1U, res.size()); +} +void FlusherSLSUnittest::TestSendAPMMetrics() { + Json::Value configJson, optionalGoPipeline; + string configStr, errorMsg; + configStr = R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_metrics", + "Aliuid": "1108555361245511", + "Project": "proj-xtrace-505959c38776d9324945dbff709582-cn-hangzhou", + "Region": "cn-hangzhou", + "Endpoint": "pub-cn-hangzhou-staging.log.aliyuncs.com", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "metric" + } + } + )"; + ParseJsonTable(configStr, configJson, errorMsg); + FlusherSLS flusher; + flusher.SetContext(ctx); + flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); + flusher.Init(configJson, optionalGoPipeline); + // generate apm metrics + { + const std::vector app_metric_names = { + "arms_rpc_requests_count", + "arms_rpc_requests_slow_count", + "arms_rpc_requests_error_count", + "arms_rpc_requests_seconds", + "arms_rpc_requests_by_status_count", + }; + const std::vector tcp_metrics_names = { + "arms_npm_tcp_rtt_avg", + "arms_npm_tcp_count_by_state", + "arms_npm_tcp_conn_stats_count", + "arms_npm_tcp_drop_count", + "arms_npm_tcp_retrans_total", + "arms_npm_recv_packets_total", + "arms_npm_sent_packets_total", + "arms_npm_recv_bytes_total", + "arms_npm_sent_bytes_total", + }; + auto now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + auto seconds = std::chrono::duration_cast(duration).count(); + std::vector> items; + // construct vector + // 1000 timeseries for app + std::vector app_ids = { + "eeeb8df999f59f569da84d27fa408a94", + "deddf8ef215107d8fd37540ac4e3291b", + "52abe1564d8ee3fea66e9302fc21d80d", + "87f79be5ab74d72b4a10b62c02dc7f34", + "1796627f8e0b7fbba042c145820311f9" + }; + for (size_t i = 0; i < app_ids.size(); i ++) { + std::shared_ptr mSourceBuffer = std::make_shared();; + PipelineEventGroup mTestEventGroup(mSourceBuffer); + mTestEventGroup.SetTag(std::string("pid"), std::string(app_ids[i])); + mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); + mTestEventGroup.SetTag(std::string("source_ip"), "10.54.0.55"); + mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); + mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); + mTestEventGroup.SetTag(std::string("data_type"), std::string("metric")); + for (size_t j = 0 ; j < app_metric_names.size(); j ++) { + for (size_t z = 0; z < 10; z ++ ) { + auto metricsEvent = mTestEventGroup.AddMetricEvent(); + metricsEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); + metricsEvent->SetTag(std::string("workloadKind"), std::string("faceless")); + metricsEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); + metricsEvent->SetTag(std::string("host"), std::string("10.54.0.33")); + metricsEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local" + std::to_string(z))); + metricsEvent->SetTag(std::string("rpcType"), std::string("0")); + metricsEvent->SetTag(std::string("callType"), std::string("http")); + metricsEvent->SetTag(std::string("statusCode"), std::string("200")); + metricsEvent->SetTag(std::string("version"), std::string("HTTP1.1")); + metricsEvent->SetName(app_metric_names[j]); + metricsEvent->SetValue(UntypedSingleValue{10.0}); + metricsEvent->SetTimestamp(seconds); + } + } + APSARA_TEST_TRUE(flusher.Send(std::move(mTestEventGroup))); + } + // tcp_metrics + for (size_t i = 0; i < app_ids.size(); i ++) { + std::shared_ptr mSourceBuffer = std::make_shared();; + PipelineEventGroup mTestEventGroup(mSourceBuffer); + mTestEventGroup.SetTag(std::string("pid"), std::string(app_ids[i])); + mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); + mTestEventGroup.SetTag(std::string("source_ip"), "10.54.0.44"); + mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); + mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); + mTestEventGroup.SetTag(std::string("data_type"), std::string("metric")); + for (size_t j = 0 ; j < tcp_metrics_names.size(); j ++) { + for (size_t z = 0; z < 20; z ++ ) { + auto metricsEvent = mTestEventGroup.AddMetricEvent(); + metricsEvent->SetName(tcp_metrics_names[j]); + metricsEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); + metricsEvent->SetTag(std::string("workloadKind"), std::string("qianlu")); + metricsEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); + metricsEvent->SetTag(std::string("host"), std::string("10.54.0.33")); + metricsEvent->SetTag(std::string("dest_ip"), std::string("10.54.0." + std::to_string(z))); + metricsEvent->SetTag(std::string("callType"), std::string("conn_stats")); + metricsEvent->SetValue(UntypedSingleValue{20.0}); + metricsEvent->SetTimestamp(seconds); + } + } + APSARA_TEST_TRUE(flusher.Send(std::move(mTestEventGroup))); + } + } + flusher.FlushAll(); + vector res; + SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); + APSARA_TEST_EQUAL(1U, res.size()); +} +void FlusherSLSUnittest::TestSendAPMAgentInfos() { + Json::Value configJson, optionalGoPipeline; + string configStr, errorMsg; + configStr = R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_agentinfo", + "Aliuid": "1108555361245511", + "Project": "proj-xtrace-505959c38776d9324945dbff709582-cn-hangzhou", + "Region": "cn-hangzhou", + "Endpoint": "pub-cn-hangzhou-staging.log.aliyuncs.com", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "agent_info" + } + } + )"; + ParseJsonTable(configStr, configJson, errorMsg); + FlusherSLS flusher; + flusher.SetContext(ctx); + flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); + flusher.Init(configJson, optionalGoPipeline); + std::shared_ptr sourceBuffer = std::make_shared(); + PipelineEventGroup eventGroup(sourceBuffer); + eventGroup.SetTag(std::string("data_type"), std::string("agent_info")); + const std::string app_id_key = "appId"; + const std::string agentIdKey = "agentId"; + const std::string app_prefix = "app-"; + const std::string agent_version = "1.0.0-rc"; + const std::string vmVersion = "xxxx"; + const std::string startTimestamp = "1729479979167"; // ms + const std::string startTimestampKey = "startTimeStamp"; + const std::string appNameKey = "appName"; + const std::string appNamePrefix = "test-ebpf-app-"; + const std::string ipKey = "ip"; + const std::string ip_prefix = "30.221.146."; + const std::string agentVersionKey = "agentVersion"; + for (int i = 0; i < 50; i ++) { + std::string app = app_prefix + std::to_string(i); + std::string ip = ip_prefix + std::to_string(i); + auto logEvent = eventGroup.AddLogEvent(); + logEvent->SetContent(app_id_key, app); + logEvent->SetContent(ipKey, ip); + logEvent->SetContent(agentIdKey, app); + logEvent->SetContent(appNameKey, appNamePrefix + std::to_string(i)); + logEvent->SetContent(startTimestampKey, startTimestamp); + logEvent->SetContent(agentVersionKey, "0.0.1"); + // auto now = std::chrono::steady_clock::now(); + logEvent->SetTimestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()); + } + APSARA_TEST_TRUE(flusher.Send(std::move(eventGroup))); + flusher.FlushAll(); + vector res; + SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); + APSARA_TEST_EQUAL(1U, res.size()); +} + UNIT_TEST_CASE(FlusherSLSUnittest, OnSuccessfulInit) UNIT_TEST_CASE(FlusherSLSUnittest, OnFailedInit) UNIT_TEST_CASE(FlusherSLSUnittest, OnPipelineUpdate) @@ -1715,6 +2016,9 @@ UNIT_TEST_CASE(FlusherSLSUnittest, TestFlush) UNIT_TEST_CASE(FlusherSLSUnittest, TestFlushAll) UNIT_TEST_CASE(FlusherSLSUnittest, TestAddPackId) UNIT_TEST_CASE(FlusherSLSUnittest, OnGoPipelineSend) +UNIT_TEST_CASE(FlusherSLSUnittest, TestSendAPMAgentInfos) +UNIT_TEST_CASE(FlusherSLSUnittest, TestSendAPMMetrics) +UNIT_TEST_CASE(FlusherSLSUnittest, TestSendAPMTraces) } // namespace logtail From b43d118f826190d3dcd2e877f15134d68182e3cb Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Fri, 10 Jan 2025 14:01:07 +0800 Subject: [PATCH 08/12] clang format --- core/unittest/flusher/FlusherSLSUnittest.cpp | 100 +++++++++---------- 1 file changed, 48 insertions(+), 52 deletions(-) diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index 26e5eebb02..b3bc22c99c 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -13,8 +13,8 @@ // limitations under the License. #include -#include #include +#include #include "json/json.h" @@ -1714,7 +1714,7 @@ void FlusherSLSUnittest::OnGoPipelineSend() { std::string GenerateRandomString(size_t length) { const std::string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; std::random_device rd; - std::mt19937 generator(rd()); + std::mt19937 generator(rd()); std::uniform_int_distribution<> distribution(0, chars.size() - 1); std::string result; for (size_t i = 0; i < length; ++i) { @@ -1758,22 +1758,16 @@ void FlusherSLSUnittest::TestSendAPMTraces() { std::vector> items; // construct vector // 1000 timeseries for app - std::vector app_ids = { - "eeeb8df999f59f569da84d27fa408a94", - "deddf8ef215107d8fd37540ac4e3291b", - "52abe1564d8ee3fea66e9302fc21d80d", - "87f79be5ab74d72b4a10b62c02dc7f34", - "1796627f8e0b7fbba042c145820311f9" - }; - std::vector service_name = { - "test-service-1", - "test-service-2", - "test-service-3", - "test-service-4", - "test-service-5" - }; - for (size_t i = 0; i < app_ids.size(); i ++) { - std::shared_ptr mSourceBuffer = std::make_shared();; + std::vector app_ids = {"eeeb8df999f59f569da84d27fa408a94", + "deddf8ef215107d8fd37540ac4e3291b", + "52abe1564d8ee3fea66e9302fc21d80d", + "87f79be5ab74d72b4a10b62c02dc7f34", + "1796627f8e0b7fbba042c145820311f9"}; + std::vector service_name + = {"test-service-1", "test-service-2", "test-service-3", "test-service-4", "test-service-5"}; + for (size_t i = 0; i < app_ids.size(); i++) { + std::shared_ptr mSourceBuffer = std::make_shared(); + ; PipelineEventGroup mTestEventGroup(mSourceBuffer); mTestEventGroup.SetTag(std::string("serviceName"), service_name[i]); mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); @@ -1781,7 +1775,7 @@ void FlusherSLSUnittest::TestSendAPMTraces() { mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); mTestEventGroup.SetTag(std::string("data_type"), std::string("trace")); - for (size_t j = 0 ; j < 25; j ++) { + for (size_t j = 0; j < 25; j++) { auto spanEvent = mTestEventGroup.AddSpanEvent(); // spanEvent->SetScopeTag(); spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); @@ -1803,7 +1797,7 @@ void FlusherSLSUnittest::TestSendAPMTraces() { spanEvent->SetEndTimeNs(nano); spanEvent->SetTimestamp(seconds); } - for (size_t j = 0 ; j < 25; j ++) { + for (size_t j = 0; j < 25; j++) { auto spanEvent = mTestEventGroup.AddSpanEvent(); spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); spanEvent->SetTag(std::string("workloadKind"), std::string("faceless")); @@ -1859,22 +1853,22 @@ void FlusherSLSUnittest::TestSendAPMMetrics() { // generate apm metrics { const std::vector app_metric_names = { - "arms_rpc_requests_count", - "arms_rpc_requests_slow_count", - "arms_rpc_requests_error_count", - "arms_rpc_requests_seconds", - "arms_rpc_requests_by_status_count", - }; + "arms_rpc_requests_count", + "arms_rpc_requests_slow_count", + "arms_rpc_requests_error_count", + "arms_rpc_requests_seconds", + "arms_rpc_requests_by_status_count", + }; const std::vector tcp_metrics_names = { - "arms_npm_tcp_rtt_avg", - "arms_npm_tcp_count_by_state", - "arms_npm_tcp_conn_stats_count", - "arms_npm_tcp_drop_count", - "arms_npm_tcp_retrans_total", - "arms_npm_recv_packets_total", - "arms_npm_sent_packets_total", - "arms_npm_recv_bytes_total", - "arms_npm_sent_bytes_total", + "arms_npm_tcp_rtt_avg", + "arms_npm_tcp_count_by_state", + "arms_npm_tcp_conn_stats_count", + "arms_npm_tcp_drop_count", + "arms_npm_tcp_retrans_total", + "arms_npm_recv_packets_total", + "arms_npm_sent_packets_total", + "arms_npm_recv_bytes_total", + "arms_npm_sent_bytes_total", }; auto now = std::chrono::system_clock::now(); auto duration = now.time_since_epoch(); @@ -1882,15 +1876,14 @@ void FlusherSLSUnittest::TestSendAPMMetrics() { std::vector> items; // construct vector // 1000 timeseries for app - std::vector app_ids = { - "eeeb8df999f59f569da84d27fa408a94", - "deddf8ef215107d8fd37540ac4e3291b", - "52abe1564d8ee3fea66e9302fc21d80d", - "87f79be5ab74d72b4a10b62c02dc7f34", - "1796627f8e0b7fbba042c145820311f9" - }; - for (size_t i = 0; i < app_ids.size(); i ++) { - std::shared_ptr mSourceBuffer = std::make_shared();; + std::vector app_ids = {"eeeb8df999f59f569da84d27fa408a94", + "deddf8ef215107d8fd37540ac4e3291b", + "52abe1564d8ee3fea66e9302fc21d80d", + "87f79be5ab74d72b4a10b62c02dc7f34", + "1796627f8e0b7fbba042c145820311f9"}; + for (size_t i = 0; i < app_ids.size(); i++) { + std::shared_ptr mSourceBuffer = std::make_shared(); + ; PipelineEventGroup mTestEventGroup(mSourceBuffer); mTestEventGroup.SetTag(std::string("pid"), std::string(app_ids[i])); mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); @@ -1898,8 +1891,8 @@ void FlusherSLSUnittest::TestSendAPMMetrics() { mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); mTestEventGroup.SetTag(std::string("data_type"), std::string("metric")); - for (size_t j = 0 ; j < app_metric_names.size(); j ++) { - for (size_t z = 0; z < 10; z ++ ) { + for (size_t j = 0; j < app_metric_names.size(); j++) { + for (size_t z = 0; z < 10; z++) { auto metricsEvent = mTestEventGroup.AddMetricEvent(); metricsEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); metricsEvent->SetTag(std::string("workloadKind"), std::string("faceless")); @@ -1918,8 +1911,9 @@ void FlusherSLSUnittest::TestSendAPMMetrics() { APSARA_TEST_TRUE(flusher.Send(std::move(mTestEventGroup))); } // tcp_metrics - for (size_t i = 0; i < app_ids.size(); i ++) { - std::shared_ptr mSourceBuffer = std::make_shared();; + for (size_t i = 0; i < app_ids.size(); i++) { + std::shared_ptr mSourceBuffer = std::make_shared(); + ; PipelineEventGroup mTestEventGroup(mSourceBuffer); mTestEventGroup.SetTag(std::string("pid"), std::string(app_ids[i])); mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); @@ -1927,8 +1921,8 @@ void FlusherSLSUnittest::TestSendAPMMetrics() { mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); mTestEventGroup.SetTag(std::string("data_type"), std::string("metric")); - for (size_t j = 0 ; j < tcp_metrics_names.size(); j ++) { - for (size_t z = 0; z < 20; z ++ ) { + for (size_t j = 0; j < tcp_metrics_names.size(); j++) { + for (size_t z = 0; z < 20; z++) { auto metricsEvent = mTestEventGroup.AddMetricEvent(); metricsEvent->SetName(tcp_metrics_names[j]); metricsEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); @@ -1987,7 +1981,7 @@ void FlusherSLSUnittest::TestSendAPMAgentInfos() { const std::string ipKey = "ip"; const std::string ip_prefix = "30.221.146."; const std::string agentVersionKey = "agentVersion"; - for (int i = 0; i < 50; i ++) { + for (int i = 0; i < 50; i++) { std::string app = app_prefix + std::to_string(i); std::string ip = ip_prefix + std::to_string(i); auto logEvent = eventGroup.AddLogEvent(); @@ -1998,7 +1992,9 @@ void FlusherSLSUnittest::TestSendAPMAgentInfos() { logEvent->SetContent(startTimestampKey, startTimestamp); logEvent->SetContent(agentVersionKey, "0.0.1"); // auto now = std::chrono::steady_clock::now(); - logEvent->SetTimestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()); + logEvent->SetTimestamp( + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) + .count()); } APSARA_TEST_TRUE(flusher.Send(std::move(eventGroup))); flusher.FlushAll(); From 110701f4761f834efb5c4d576b6f76d6fef142af Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Tue, 14 Jan 2025 14:42:05 +0800 Subject: [PATCH 09/12] update --- core/plugin/flusher/sls/DiskBufferWriter.cpp | 2 - core/plugin/flusher/sls/FlusherSLS.cpp | 7 +- core/plugin/flusher/sls/SLSClientManager.cpp | 12 - core/plugin/flusher/sls/SLSClientManager.h | 2 - core/plugin/flusher/sls/SLSConstant.cpp | 1 - core/plugin/flusher/sls/SLSConstant.h | 1 - core/unittest/flusher/FlusherSLSUnittest.cpp | 437 ++++++------------- 7 files changed, 139 insertions(+), 323 deletions(-) diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index 352a83edb6..56c6739c11 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -80,8 +80,6 @@ static sls_logs::EndpointMode GetEndpointMode(EndpointMode mode) { static const string kAKErrorMsg = "can not get valid access key"; #endif -static const string kNoSubpathErrorMsg = "subpath not set"; - static const string kNoHostErrorMsg = "can not get available host"; static const string& GetSLSCompressTypeString(sls_logs::SlsCompressType compressType) { diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index db2b75d14a..c4edcd602e 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -297,7 +297,6 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline // TelemetryType string telemetryType; if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) { - // TelemetryType not set (for log scenarios) PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, @@ -334,7 +333,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } // Logstore - if (telemetryType.empty() || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { + if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { // log and metric if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { PARAM_ERROR_RETURN(mContext->GetLogger(), @@ -348,8 +347,6 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } } else if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES){ - // apm - mLogstore = DUMMY_LOG_STORE; } // Region @@ -1290,8 +1287,6 @@ unique_ptr FlusherSLS::CreatePostAPMBackendRequest(const string item->mType, item->mData, item->mRawSize, - item->mShardHashKey, - nullopt, mSubpath, query, header); diff --git a/core/plugin/flusher/sls/SLSClientManager.cpp b/core/plugin/flusher/sls/SLSClientManager.cpp index 0c2138fe4e..4bd836ce68 100644 --- a/core/plugin/flusher/sls/SLSClientManager.cpp +++ b/core/plugin/flusher/sls/SLSClientManager.cpp @@ -247,8 +247,6 @@ void PreparePostAPMBackendRequest(const string& accessKeyId, RawDataType dataType, const string& body, size_t rawSize, - const string& shardHashKey, - optional seqId, const string& path, string& query, map& header) { @@ -278,14 +276,6 @@ void PreparePostAPMBackendRequest(const string& accessKeyId, } map parameterList; - if (!shardHashKey.empty()) { - parameterList["key"] = shardHashKey; - if (seqId.has_value()) { - parameterList["seqid"] = to_string(seqId.value()); - } - } - query = GetQueryString(parameterList); - string signature = GetUrlSignature(HTTP_POST, path, header, parameterList, body, accessKeySecret); header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature; } @@ -383,8 +373,6 @@ SLSResponse PostAPMBackendLogs(const string& accessKeyId, dataType, body, rawSize, - shardHashKey, - nullopt, // sync request does not support exactly-once subpath, query, header); diff --git a/core/plugin/flusher/sls/SLSClientManager.h b/core/plugin/flusher/sls/SLSClientManager.h index 9210be252e..64ba54b2bc 100644 --- a/core/plugin/flusher/sls/SLSClientManager.h +++ b/core/plugin/flusher/sls/SLSClientManager.h @@ -102,8 +102,6 @@ void PreparePostAPMBackendRequest(const std::string& accessKeyId, RawDataType dataType, const std::string& body, size_t rawSize, - const std::string& shardHashKey, - std::optional seqId, const std::string& path, std::string& query, std::map& header); diff --git a/core/plugin/flusher/sls/SLSConstant.cpp b/core/plugin/flusher/sls/SLSConstant.cpp index 6b9da88fc9..113a9155ca 100644 --- a/core/plugin/flusher/sls/SLSConstant.cpp +++ b/core/plugin/flusher/sls/SLSConstant.cpp @@ -25,7 +25,6 @@ const string HEALTH = "/health"; const string APM_METRICS_URL = "/apm/metric/arms/v1/metric_log"; const string APM_TRACES_URL = "/apm/trace/arms/v1/trace_log"; const string APM_AGENTINFOS_URL = "/apm/meta/arms/v1/meta_log/AgentInfo"; -const string DUMMY_LOG_STORE = "___dummy___"; const string LOGTAIL_USER_AGENT = "ali-log-logtail"; diff --git a/core/plugin/flusher/sls/SLSConstant.h b/core/plugin/flusher/sls/SLSConstant.h index 75a5d9e451..bc62ab682b 100644 --- a/core/plugin/flusher/sls/SLSConstant.h +++ b/core/plugin/flusher/sls/SLSConstant.h @@ -27,7 +27,6 @@ extern const std::string HEALTH; extern const std::string APM_METRICS_URL; extern const std::string APM_TRACES_URL; extern const std::string APM_AGENTINFOS_URL; -extern const std::string DUMMY_LOG_STORE; extern const std::string CONTENT_MD5; diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index b3bc22c99c..154e6efcaa 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -64,9 +64,6 @@ class FlusherSLSUnittest : public testing::Test { void TestFlushAll(); void TestAddPackId(); void OnGoPipelineSend(); - void TestSendAPMMetrics(); - void TestSendAPMTraces(); - void TestSendAPMAgentInfos(); protected: static void SetUpTestCase() { @@ -460,6 +457,65 @@ void FlusherSLSUnittest::OnSuccessfulInit() { APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value()); SenderQueueManager::GetInstance()->Clear(); + + // apm + std::vector apmConfigStr = {R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_traces", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "trace" + } + } + )", R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_metrics", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "metric" + } + } + )", R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_agentinfo", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "agent_info" + } + } + )"}; + std::vector apmSubpath = {APM_TRACES_URL, APM_METRICS_URL, APM_AGENTINFOS_URL}; + std::vector apmTelemetryTypes = { + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_TRACES, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_METRICS, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_AGENTINFOS, + }; + for (size_t ii = 0; ii < apmConfigStr.size(); ii ++) { + auto& cfg = apmConfigStr[ii]; + APSARA_TEST_TRUE(ParseJsonTable(cfg, configJson, errorMsg)); + flusher.reset(new FlusherSLS()); + flusher->SetContext(ctx); + flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); + APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); + APSARA_TEST_EQUAL(flusher->mSubpath, apmSubpath[ii]); + APSARA_TEST_EQUAL(flusher->mTelemetryType, apmTelemetryTypes[ii]); + SenderQueueManager::GetInstance()->Clear(); + } // go param ctx.SetIsFlushingThroughGoPipelineFlag(true); @@ -891,6 +947,85 @@ void FlusherSLSUnittest::TestBuildRequest() { APSARA_TEST_EQUAL("test_project.test_endpoint", item.mCurrentHost); #endif } + { + // APM backend + Json::Value configJsonAPM, optionalGoPipelineAPM; + string errorMsgAPM; + // apm + std::vector apmConfigStr = {R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_traces", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "trace" + } + } + )", R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_metrics", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "metric" + } + } + )", R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_agentinfo", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "agent_info" + } + } + )"}; + std::vector apmSubpath = {APM_TRACES_URL, APM_METRICS_URL, APM_AGENTINFOS_URL}; + std::vector apmTelemetryTypes = { + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_TRACES, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_METRICS, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_AGENTINFOS, + }; + for (size_t zz = 0; zz < apmConfigStr.size(); zz ++) { + std::string configStrAPM = apmConfigStr[zz]; + APSARA_TEST_TRUE(ParseJsonTable(configStrAPM, configJsonAPM, errorMsgAPM)); + FlusherSLS flusherAPM; + flusherAPM.SetContext(ctx); + flusherAPM.SetMetricsRecordRef(FlusherSLS::sName, "flusher_sls_for_apm"); + APSARA_TEST_TRUE(flusherAPM.Init(configJsonAPM, optionalGoPipeline)); + + // normal + SLSSenderQueueItem item("hello, world!", rawSize, &flusherAPM, flusherAPM.GetQueueKey(), flusherAPM.mLogstore, RawDataType::EVENT_GROUP); + APSARA_TEST_TRUE(flusherAPM.BuildRequest(&item, req, &keepItem, &errMsg)); + APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); + APSARA_TEST_EQUAL(apmSubpath[zz], req->mUrl); + APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); + APSARA_TEST_FALSE(req->mHeader[DATE].empty()); + APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); + APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); + APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); + APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); + APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); + APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); + APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); + APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); + APSARA_TEST_EQUAL(body, req->mBody); + APSARA_TEST_TRUE(req->mHTTPSFlag); + APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHeader[HOST]); + } + } { // shard hash SLSSenderQueueItem item("hello, world!", @@ -1710,299 +1845,6 @@ void FlusherSLSUnittest::OnGoPipelineSend() { } } - -std::string GenerateRandomString(size_t length) { - const std::string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; - std::random_device rd; - std::mt19937 generator(rd()); - std::uniform_int_distribution<> distribution(0, chars.size() - 1); - std::string result; - for (size_t i = 0; i < length; ++i) { - result += chars[distribution(generator)]; - } - return result; -} -void FlusherSLSUnittest::TestSendAPMTraces() { - Json::Value configJson, optionalGoPipeline; - string configStr, errorMsg; - configStr = R"( - { - "Type": "flusher_sls", - "TelemetryType": "arms_traces", - "Aliuid": "1108555361245511", - "Project": "proj-xtrace-505959c38776d9324945dbff709582-cn-hangzhou", - "Region": "cn-hangzhou", - "Endpoint": "pub-cn-hangzhou-staging.log.aliyuncs.com", - "Match": { - "Type": "tag", - "Key": "data_type", - "Value": "trace" - } - } - )"; - ParseJsonTable(configStr, configJson, errorMsg); - FlusherSLS flusher; - flusher.SetContext(ctx); - flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); - flusher.Init(configJson, optionalGoPipeline); - SLSSenderQueueItem item("hello, world!", 100, &flusher, flusher.GetQueueKey(), flusher.mLogstore); - bool keepItem = false; - std::string errMsg = ""; - unique_ptr req; - APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); - { - auto now = std::chrono::system_clock::now(); - auto duration = now.time_since_epoch(); - auto seconds = std::chrono::duration_cast(duration).count(); - auto nano = std::chrono::duration_cast(duration).count(); - std::vector> items; - // construct vector - // 1000 timeseries for app - std::vector app_ids = {"eeeb8df999f59f569da84d27fa408a94", - "deddf8ef215107d8fd37540ac4e3291b", - "52abe1564d8ee3fea66e9302fc21d80d", - "87f79be5ab74d72b4a10b62c02dc7f34", - "1796627f8e0b7fbba042c145820311f9"}; - std::vector service_name - = {"test-service-1", "test-service-2", "test-service-3", "test-service-4", "test-service-5"}; - for (size_t i = 0; i < app_ids.size(); i++) { - std::shared_ptr mSourceBuffer = std::make_shared(); - ; - PipelineEventGroup mTestEventGroup(mSourceBuffer); - mTestEventGroup.SetTag(std::string("serviceName"), service_name[i]); - mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); - mTestEventGroup.SetTag(std::string("source_ip"), "10.54.0.55"); - mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); - mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); - mTestEventGroup.SetTag(std::string("data_type"), std::string("trace")); - for (size_t j = 0; j < 25; j++) { - auto spanEvent = mTestEventGroup.AddSpanEvent(); - // spanEvent->SetScopeTag(); - spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); - spanEvent->SetTag(std::string("workloadKind"), std::string("faceless")); - spanEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); - spanEvent->SetTag(std::string("host"), std::string("10.54.0.33")); - spanEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local/" + std::to_string(j))); - spanEvent->SetTag(std::string("rpcType"), std::string("0")); - spanEvent->SetTag(std::string("callType"), std::string("http")); - spanEvent->SetTag(std::string("statusCode"), std::string("200")); - spanEvent->SetTag(std::string("version"), std::string("HTTP1.1")); - spanEvent->SetName("/oneagent/qianlu/local/" + std::to_string(j)); - spanEvent->SetKind(SpanEvent::Kind::Server); - std::string trace_id = GenerateRandomString(32); - std::string span_id = GenerateRandomString(16); - spanEvent->SetSpanId(span_id); - spanEvent->SetTraceId(trace_id); - spanEvent->SetStartTimeNs(nano - 5e6); - spanEvent->SetEndTimeNs(nano); - spanEvent->SetTimestamp(seconds); - } - for (size_t j = 0; j < 25; j++) { - auto spanEvent = mTestEventGroup.AddSpanEvent(); - spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); - spanEvent->SetTag(std::string("workloadKind"), std::string("faceless")); - spanEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); - spanEvent->SetTag(std::string("host"), std::string("10.54.0.33")); - spanEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local/" + std::to_string(j))); - spanEvent->SetTag(std::string("rpcType"), std::string("25")); - spanEvent->SetTag(std::string("callType"), std::string("http-client")); - spanEvent->SetTag(std::string("statusCode"), std::string("200")); - spanEvent->SetTag(std::string("version"), std::string("HTTP1.1")); - spanEvent->SetName("/oneagent/qianlu/local/" + std::to_string(j)); - spanEvent->SetKind(SpanEvent::Kind::Client); - std::string trace_id = GenerateRandomString(32); - std::string span_id = GenerateRandomString(16); - spanEvent->SetSpanId(span_id); - spanEvent->SetTraceId(trace_id); - spanEvent->SetStartTimeNs(nano - 5e9); - spanEvent->SetEndTimeNs(nano); - spanEvent->SetTimestamp(seconds); - } - // TODO flush - flusher.Send(std::move(mTestEventGroup)); - } - } - flusher.FlushAll(); - vector res; - SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); - APSARA_TEST_EQUAL(1U, res.size()); -} -void FlusherSLSUnittest::TestSendAPMMetrics() { - Json::Value configJson, optionalGoPipeline; - string configStr, errorMsg; - configStr = R"( - { - "Type": "flusher_sls", - "TelemetryType": "arms_metrics", - "Aliuid": "1108555361245511", - "Project": "proj-xtrace-505959c38776d9324945dbff709582-cn-hangzhou", - "Region": "cn-hangzhou", - "Endpoint": "pub-cn-hangzhou-staging.log.aliyuncs.com", - "Match": { - "Type": "tag", - "Key": "data_type", - "Value": "metric" - } - } - )"; - ParseJsonTable(configStr, configJson, errorMsg); - FlusherSLS flusher; - flusher.SetContext(ctx); - flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); - flusher.Init(configJson, optionalGoPipeline); - // generate apm metrics - { - const std::vector app_metric_names = { - "arms_rpc_requests_count", - "arms_rpc_requests_slow_count", - "arms_rpc_requests_error_count", - "arms_rpc_requests_seconds", - "arms_rpc_requests_by_status_count", - }; - const std::vector tcp_metrics_names = { - "arms_npm_tcp_rtt_avg", - "arms_npm_tcp_count_by_state", - "arms_npm_tcp_conn_stats_count", - "arms_npm_tcp_drop_count", - "arms_npm_tcp_retrans_total", - "arms_npm_recv_packets_total", - "arms_npm_sent_packets_total", - "arms_npm_recv_bytes_total", - "arms_npm_sent_bytes_total", - }; - auto now = std::chrono::system_clock::now(); - auto duration = now.time_since_epoch(); - auto seconds = std::chrono::duration_cast(duration).count(); - std::vector> items; - // construct vector - // 1000 timeseries for app - std::vector app_ids = {"eeeb8df999f59f569da84d27fa408a94", - "deddf8ef215107d8fd37540ac4e3291b", - "52abe1564d8ee3fea66e9302fc21d80d", - "87f79be5ab74d72b4a10b62c02dc7f34", - "1796627f8e0b7fbba042c145820311f9"}; - for (size_t i = 0; i < app_ids.size(); i++) { - std::shared_ptr mSourceBuffer = std::make_shared(); - ; - PipelineEventGroup mTestEventGroup(mSourceBuffer); - mTestEventGroup.SetTag(std::string("pid"), std::string(app_ids[i])); - mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); - mTestEventGroup.SetTag(std::string("source_ip"), "10.54.0.55"); - mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); - mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); - mTestEventGroup.SetTag(std::string("data_type"), std::string("metric")); - for (size_t j = 0; j < app_metric_names.size(); j++) { - for (size_t z = 0; z < 10; z++) { - auto metricsEvent = mTestEventGroup.AddMetricEvent(); - metricsEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); - metricsEvent->SetTag(std::string("workloadKind"), std::string("faceless")); - metricsEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); - metricsEvent->SetTag(std::string("host"), std::string("10.54.0.33")); - metricsEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local" + std::to_string(z))); - metricsEvent->SetTag(std::string("rpcType"), std::string("0")); - metricsEvent->SetTag(std::string("callType"), std::string("http")); - metricsEvent->SetTag(std::string("statusCode"), std::string("200")); - metricsEvent->SetTag(std::string("version"), std::string("HTTP1.1")); - metricsEvent->SetName(app_metric_names[j]); - metricsEvent->SetValue(UntypedSingleValue{10.0}); - metricsEvent->SetTimestamp(seconds); - } - } - APSARA_TEST_TRUE(flusher.Send(std::move(mTestEventGroup))); - } - // tcp_metrics - for (size_t i = 0; i < app_ids.size(); i++) { - std::shared_ptr mSourceBuffer = std::make_shared(); - ; - PipelineEventGroup mTestEventGroup(mSourceBuffer); - mTestEventGroup.SetTag(std::string("pid"), std::string(app_ids[i])); - mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); - mTestEventGroup.SetTag(std::string("source_ip"), "10.54.0.44"); - mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); - mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); - mTestEventGroup.SetTag(std::string("data_type"), std::string("metric")); - for (size_t j = 0; j < tcp_metrics_names.size(); j++) { - for (size_t z = 0; z < 20; z++) { - auto metricsEvent = mTestEventGroup.AddMetricEvent(); - metricsEvent->SetName(tcp_metrics_names[j]); - metricsEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); - metricsEvent->SetTag(std::string("workloadKind"), std::string("qianlu")); - metricsEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); - metricsEvent->SetTag(std::string("host"), std::string("10.54.0.33")); - metricsEvent->SetTag(std::string("dest_ip"), std::string("10.54.0." + std::to_string(z))); - metricsEvent->SetTag(std::string("callType"), std::string("conn_stats")); - metricsEvent->SetValue(UntypedSingleValue{20.0}); - metricsEvent->SetTimestamp(seconds); - } - } - APSARA_TEST_TRUE(flusher.Send(std::move(mTestEventGroup))); - } - } - flusher.FlushAll(); - vector res; - SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); - APSARA_TEST_EQUAL(1U, res.size()); -} -void FlusherSLSUnittest::TestSendAPMAgentInfos() { - Json::Value configJson, optionalGoPipeline; - string configStr, errorMsg; - configStr = R"( - { - "Type": "flusher_sls", - "TelemetryType": "arms_agentinfo", - "Aliuid": "1108555361245511", - "Project": "proj-xtrace-505959c38776d9324945dbff709582-cn-hangzhou", - "Region": "cn-hangzhou", - "Endpoint": "pub-cn-hangzhou-staging.log.aliyuncs.com", - "Match": { - "Type": "tag", - "Key": "data_type", - "Value": "agent_info" - } - } - )"; - ParseJsonTable(configStr, configJson, errorMsg); - FlusherSLS flusher; - flusher.SetContext(ctx); - flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); - flusher.Init(configJson, optionalGoPipeline); - std::shared_ptr sourceBuffer = std::make_shared(); - PipelineEventGroup eventGroup(sourceBuffer); - eventGroup.SetTag(std::string("data_type"), std::string("agent_info")); - const std::string app_id_key = "appId"; - const std::string agentIdKey = "agentId"; - const std::string app_prefix = "app-"; - const std::string agent_version = "1.0.0-rc"; - const std::string vmVersion = "xxxx"; - const std::string startTimestamp = "1729479979167"; // ms - const std::string startTimestampKey = "startTimeStamp"; - const std::string appNameKey = "appName"; - const std::string appNamePrefix = "test-ebpf-app-"; - const std::string ipKey = "ip"; - const std::string ip_prefix = "30.221.146."; - const std::string agentVersionKey = "agentVersion"; - for (int i = 0; i < 50; i++) { - std::string app = app_prefix + std::to_string(i); - std::string ip = ip_prefix + std::to_string(i); - auto logEvent = eventGroup.AddLogEvent(); - logEvent->SetContent(app_id_key, app); - logEvent->SetContent(ipKey, ip); - logEvent->SetContent(agentIdKey, app); - logEvent->SetContent(appNameKey, appNamePrefix + std::to_string(i)); - logEvent->SetContent(startTimestampKey, startTimestamp); - logEvent->SetContent(agentVersionKey, "0.0.1"); - // auto now = std::chrono::steady_clock::now(); - logEvent->SetTimestamp( - std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) - .count()); - } - APSARA_TEST_TRUE(flusher.Send(std::move(eventGroup))); - flusher.FlushAll(); - vector res; - SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); - APSARA_TEST_EQUAL(1U, res.size()); -} - UNIT_TEST_CASE(FlusherSLSUnittest, OnSuccessfulInit) UNIT_TEST_CASE(FlusherSLSUnittest, OnFailedInit) UNIT_TEST_CASE(FlusherSLSUnittest, OnPipelineUpdate) @@ -2012,9 +1854,6 @@ UNIT_TEST_CASE(FlusherSLSUnittest, TestFlush) UNIT_TEST_CASE(FlusherSLSUnittest, TestFlushAll) UNIT_TEST_CASE(FlusherSLSUnittest, TestAddPackId) UNIT_TEST_CASE(FlusherSLSUnittest, OnGoPipelineSend) -UNIT_TEST_CASE(FlusherSLSUnittest, TestSendAPMAgentInfos) -UNIT_TEST_CASE(FlusherSLSUnittest, TestSendAPMMetrics) -UNIT_TEST_CASE(FlusherSLSUnittest, TestSendAPMTraces) } // namespace logtail From ede9029bded05ae8126d5f9d137ce0aa683320de Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Tue, 14 Jan 2025 14:55:04 +0800 Subject: [PATCH 10/12] resolve conflicts --- core/unittest/flusher/FlusherSLSUnittest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index 154e6efcaa..edc5011ad1 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -457,7 +457,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() { APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value()); SenderQueueManager::GetInstance()->Clear(); - + // apm std::vector apmConfigStr = {R"( { From 1b3fb9b5b54397697ba04367c078d5e30661d1b6 Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Tue, 14 Jan 2025 15:00:49 +0800 Subject: [PATCH 11/12] clang format --- core/unittest/flusher/FlusherSLSUnittest.cpp | 23 ++++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index edc5011ad1..4f2321301c 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -472,7 +472,8 @@ void FlusherSLSUnittest::OnSuccessfulInit() { "Value": "trace" } } - )", R"( + )", + R"( { "Type": "flusher_sls", "TelemetryType": "arms_metrics", @@ -485,7 +486,8 @@ void FlusherSLSUnittest::OnSuccessfulInit() { "Value": "metric" } } - )", R"( + )", + R"( { "Type": "flusher_sls", "TelemetryType": "arms_agentinfo", @@ -505,7 +507,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() { sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_METRICS, sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_AGENTINFOS, }; - for (size_t ii = 0; ii < apmConfigStr.size(); ii ++) { + for (size_t ii = 0; ii < apmConfigStr.size(); ii++) { auto& cfg = apmConfigStr[ii]; APSARA_TEST_TRUE(ParseJsonTable(cfg, configJson, errorMsg)); flusher.reset(new FlusherSLS()); @@ -965,7 +967,8 @@ void FlusherSLSUnittest::TestBuildRequest() { "Value": "trace" } } - )", R"( + )", + R"( { "Type": "flusher_sls", "TelemetryType": "arms_metrics", @@ -978,7 +981,8 @@ void FlusherSLSUnittest::TestBuildRequest() { "Value": "metric" } } - )", R"( + )", + R"( { "Type": "flusher_sls", "TelemetryType": "arms_agentinfo", @@ -998,7 +1002,7 @@ void FlusherSLSUnittest::TestBuildRequest() { sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_METRICS, sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_AGENTINFOS, }; - for (size_t zz = 0; zz < apmConfigStr.size(); zz ++) { + for (size_t zz = 0; zz < apmConfigStr.size(); zz++) { std::string configStrAPM = apmConfigStr[zz]; APSARA_TEST_TRUE(ParseJsonTable(configStrAPM, configJsonAPM, errorMsgAPM)); FlusherSLS flusherAPM; @@ -1007,7 +1011,12 @@ void FlusherSLSUnittest::TestBuildRequest() { APSARA_TEST_TRUE(flusherAPM.Init(configJsonAPM, optionalGoPipeline)); // normal - SLSSenderQueueItem item("hello, world!", rawSize, &flusherAPM, flusherAPM.GetQueueKey(), flusherAPM.mLogstore, RawDataType::EVENT_GROUP); + SLSSenderQueueItem item("hello, world!", + rawSize, + &flusherAPM, + flusherAPM.GetQueueKey(), + flusherAPM.mLogstore, + RawDataType::EVENT_GROUP); APSARA_TEST_TRUE(flusherAPM.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); APSARA_TEST_EQUAL(apmSubpath[zz], req->mUrl); From 004e19c0e2654dd633b0bdbdde5bb42929bcb507 Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Tue, 14 Jan 2025 17:06:12 +0800 Subject: [PATCH 12/12] update --- core/plugin/flusher/sls/DiskBufferWriter.cpp | 1 - core/plugin/flusher/sls/FlusherSLS.cpp | 82 +++++++++----------- core/plugin/flusher/sls/SLSClientManager.cpp | 1 - core/plugin/flusher/sls/SLSClientManager.h | 1 - core/unittest/flusher/FlusherSLSUnittest.cpp | 4 +- 5 files changed, 37 insertions(+), 52 deletions(-) diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index 56c6739c11..fd2a0444fc 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -909,7 +909,6 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe dataType, logData, bufferMeta.rawsize(), - bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : "", bufferMeta.subpath()); default: { // should not happen diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index c4edcd602e..0044f4583a 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -337,16 +337,14 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline // log and metric if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { PARAM_ERROR_RETURN(mContext->GetLogger(), - mContext->GetAlarm(), - errorMsg, - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); + mContext->GetAlarm(), + errorMsg, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); } - } else if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS || - mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES){ } // Region @@ -452,7 +450,6 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } #endif - // Batch const char* key = "Batch"; @@ -483,25 +480,17 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } // ShardHashKeys - if (!GetOptionalListParam(config, "ShardHashKeys", mShardHashKeys, errorMsg)) { - PARAM_WARNING_IGNORE(mContext->GetLogger(), - mContext->GetAlarm(), - errorMsg, - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); - } else if (!mShardHashKeys.empty() && mContext->IsExactlyOnceEnabled()) { - mShardHashKeys.clear(); - PARAM_WARNING_IGNORE(mContext->GetLogger(), - mContext->GetAlarm(), - "exactly once enabled when ShardHashKeys is not empty", - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); + if (mTelemetryType == sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS && !mContext->IsExactlyOnceEnabled()) { + if (!GetOptionalListParam(config, "ShardHashKeys", mShardHashKeys, errorMsg)) { + PARAM_WARNING_IGNORE(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } } DefaultFlushStrategyOptions strategy{ @@ -1269,27 +1258,26 @@ unique_ptr FlusherSLS::CreatePostMetricStoreLogsRequest(const s } unique_ptr FlusherSLS::CreatePostAPMBackendRequest(const string& accessKeyId, - const string& accessKeySecret, - SLSClientManager::AuthType type, - SLSSenderQueueItem* item, - const std::string& subPath) const { - + const string& accessKeySecret, + SLSClientManager::AuthType type, + SLSSenderQueueItem* item, + const std::string& subPath) const { string query; map header; PreparePostAPMBackendRequest(accessKeyId, - accessKeySecret, - type, - item->mCurrentHost, - item->mRealIpFlag, - mProject, - item->mLogstore, - CompressTypeToString(mCompressor->GetCompressType()), - item->mType, - item->mData, - item->mRawSize, - mSubpath, - query, - header); + accessKeySecret, + type, + item->mCurrentHost, + item->mRealIpFlag, + mProject, + item->mLogstore, + CompressTypeToString(mCompressor->GetCompressType()), + item->mType, + item->mData, + item->mRawSize, + mSubpath, + query, + header); bool httpsFlag = SLSClientManager::GetInstance()->UsingHttps(mRegion); return make_unique(HTTP_POST, httpsFlag, diff --git a/core/plugin/flusher/sls/SLSClientManager.cpp b/core/plugin/flusher/sls/SLSClientManager.cpp index 4bd836ce68..c46e3ce732 100644 --- a/core/plugin/flusher/sls/SLSClientManager.cpp +++ b/core/plugin/flusher/sls/SLSClientManager.cpp @@ -358,7 +358,6 @@ SLSResponse PostAPMBackendLogs(const string& accessKeyId, RawDataType dataType, const string& body, size_t rawSize, - const string& shardHashKey, const std::string& subpath) { string query; map header; diff --git a/core/plugin/flusher/sls/SLSClientManager.h b/core/plugin/flusher/sls/SLSClientManager.h index 64ba54b2bc..a242399167 100644 --- a/core/plugin/flusher/sls/SLSClientManager.h +++ b/core/plugin/flusher/sls/SLSClientManager.h @@ -138,7 +138,6 @@ SLSResponse PostAPMBackendLogs(const std::string& accessKeyId, RawDataType dataType, const std::string& body, size_t rawSize, - const std::string& shardHashKey, const std::string& subpath); SLSResponse PutWebTracking(const std::string& host, bool httpsFlag, diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index 4f2321301c..1369561fb3 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -158,7 +158,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() { "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "Aliuid": "123456789", - "TelemetryType": "metrics", + "TelemetryType": "logs", "ShardHashKeys": [ "__source__" ] @@ -180,7 +180,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() { APSARA_TEST_EQUAL("", flusher->mAliuid); #endif APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", flusher->mEndpoint); - APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_METRICS, flusher->mTelemetryType); + APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType); APSARA_TEST_EQUAL(1U, flusher->mShardHashKeys.size()); APSARA_TEST_EQUAL("__source__", flusher->mShardHashKeys[0]); SenderQueueManager::GetInstance()->Clear();