diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index 5ab054f2a8..5745a04107 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -80,8 +80,6 @@ static sls_logs::EndpointMode GetEndpointMode(EndpointMode mode) { static const string kAKErrorMsg = "can not get valid access key"; #endif -static const string kNoSubpathErrorMsg = "subpath not set"; - static const string kNoHostErrorMsg = "can not get available host"; static const string& GetSLSCompressTypeString(sls_logs::SlsCompressType compressType) { diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index ab6fa03939..a6fdbdb058 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -297,7 +297,6 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline // TelemetryType string telemetryType; if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) { - // TelemetryType not set (for log scenarios) PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, @@ -334,7 +333,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } // Logstore - if (telemetryType.empty() || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { + if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { // log and metric if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { PARAM_ERROR_RETURN(mContext->GetLogger(), @@ -348,8 +347,6 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } } else if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES){ - // apm - mLogstore = DUMMY_LOG_STORE; } // Region @@ -1286,8 +1283,6 @@ unique_ptr FlusherSLS::CreatePostAPMBackendRequest(const string item->mType, item->mData, item->mRawSize, - item->mShardHashKey, - nullopt, mSubpath, query, header); diff --git a/core/plugin/flusher/sls/SLSClientManager.cpp b/core/plugin/flusher/sls/SLSClientManager.cpp index b9d3bd7b7b..8c92a466df 100644 --- a/core/plugin/flusher/sls/SLSClientManager.cpp +++ b/core/plugin/flusher/sls/SLSClientManager.cpp @@ -246,8 +246,6 @@ void PreparePostAPMBackendRequest(const string& accessKeyId, RawDataType dataType, const string& body, size_t rawSize, - const string& shardHashKey, - optional seqId, const string& path, string& query, map& header) { @@ -277,14 +275,6 @@ void PreparePostAPMBackendRequest(const string& accessKeyId, } map parameterList; - if (!shardHashKey.empty()) { - parameterList["key"] = shardHashKey; - if (seqId.has_value()) { - parameterList["seqid"] = to_string(seqId.value()); - } - } - query = GetQueryString(parameterList); - string signature = GetUrlSignature(HTTP_POST, path, header, parameterList, body, accessKeySecret); header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature; } @@ -382,8 +372,6 @@ SLSResponse PostAPMBackendLogs(const string& accessKeyId, dataType, body, rawSize, - shardHashKey, - nullopt, // sync request does not support exactly-once subpath, query, header); diff --git a/core/plugin/flusher/sls/SLSClientManager.h b/core/plugin/flusher/sls/SLSClientManager.h index 9210be252e..64ba54b2bc 100644 --- a/core/plugin/flusher/sls/SLSClientManager.h +++ b/core/plugin/flusher/sls/SLSClientManager.h @@ -102,8 +102,6 @@ void PreparePostAPMBackendRequest(const std::string& accessKeyId, RawDataType dataType, const std::string& body, size_t rawSize, - const std::string& shardHashKey, - std::optional seqId, const std::string& path, std::string& query, std::map& header); diff --git a/core/plugin/flusher/sls/SLSConstant.cpp b/core/plugin/flusher/sls/SLSConstant.cpp index 6b9da88fc9..113a9155ca 100644 --- a/core/plugin/flusher/sls/SLSConstant.cpp +++ b/core/plugin/flusher/sls/SLSConstant.cpp @@ -25,7 +25,6 @@ const string HEALTH = "/health"; const string APM_METRICS_URL = "/apm/metric/arms/v1/metric_log"; const string APM_TRACES_URL = "/apm/trace/arms/v1/trace_log"; const string APM_AGENTINFOS_URL = "/apm/meta/arms/v1/meta_log/AgentInfo"; -const string DUMMY_LOG_STORE = "___dummy___"; const string LOGTAIL_USER_AGENT = "ali-log-logtail"; diff --git a/core/plugin/flusher/sls/SLSConstant.h b/core/plugin/flusher/sls/SLSConstant.h index 75a5d9e451..bc62ab682b 100644 --- a/core/plugin/flusher/sls/SLSConstant.h +++ b/core/plugin/flusher/sls/SLSConstant.h @@ -27,7 +27,6 @@ extern const std::string HEALTH; extern const std::string APM_METRICS_URL; extern const std::string APM_TRACES_URL; extern const std::string APM_AGENTINFOS_URL; -extern const std::string DUMMY_LOG_STORE; extern const std::string CONTENT_MD5; diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index ea2e1eb50d..b6456b5c99 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -64,9 +64,6 @@ class FlusherSLSUnittest : public testing::Test { void TestFlushAll(); void TestAddPackId(); void OnGoPipelineSend(); - void TestSendAPMMetrics(); - void TestSendAPMTraces(); - void TestSendAPMAgentInfos(); protected: static void SetUpTestCase() { @@ -443,6 +440,65 @@ void FlusherSLSUnittest::OnSuccessfulInit() { ctx.SetExactlyOnceFlag(false); SenderQueueManager::GetInstance()->Clear(); + // apm + std::vector apmConfigStr = {R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_traces", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "trace" + } + } + )", R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_metrics", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "metric" + } + } + )", R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_agentinfo", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "agent_info" + } + } + )"}; + std::vector apmSubpath = {APM_TRACES_URL, APM_METRICS_URL, APM_AGENTINFOS_URL}; + std::vector apmTelemetryTypes = { + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_TRACES, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_METRICS, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_AGENTINFOS, + }; + for (size_t ii = 0; ii < apmConfigStr.size(); ii ++) { + auto& cfg = apmConfigStr[ii]; + APSARA_TEST_TRUE(ParseJsonTable(cfg, configJson, errorMsg)); + flusher.reset(new FlusherSLS()); + flusher->SetContext(ctx); + flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); + APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); + APSARA_TEST_EQUAL(flusher->mSubpath, apmSubpath[ii]); + APSARA_TEST_EQUAL(flusher->mTelemetryType, apmTelemetryTypes[ii]); + SenderQueueManager::GetInstance()->Clear(); + } + // go param ctx.SetIsFlushingThroughGoPipelineFlag(true); configStr = R"( @@ -873,6 +929,85 @@ void FlusherSLSUnittest::TestBuildRequest() { APSARA_TEST_EQUAL("test_project.test_endpoint", item.mCurrentHost); #endif } + { + // APM backend + Json::Value configJsonAPM, optionalGoPipelineAPM; + string errorMsgAPM; + // apm + std::vector apmConfigStr = {R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_traces", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "trace" + } + } + )", R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_metrics", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "metric" + } + } + )", R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_agentinfo", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "agent_info" + } + } + )"}; + std::vector apmSubpath = {APM_TRACES_URL, APM_METRICS_URL, APM_AGENTINFOS_URL}; + std::vector apmTelemetryTypes = { + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_TRACES, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_METRICS, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_AGENTINFOS, + }; + for (size_t zz = 0; zz < apmConfigStr.size(); zz ++) { + std::string configStrAPM = apmConfigStr[zz]; + APSARA_TEST_TRUE(ParseJsonTable(configStrAPM, configJsonAPM, errorMsgAPM)); + FlusherSLS flusherAPM; + flusherAPM.SetContext(ctx); + flusherAPM.SetMetricsRecordRef(FlusherSLS::sName, "flusher_sls_for_apm"); + APSARA_TEST_TRUE(flusherAPM.Init(configJsonAPM, optionalGoPipeline)); + + // normal + SLSSenderQueueItem item("hello, world!", rawSize, &flusherAPM, flusherAPM.GetQueueKey(), flusherAPM.mLogstore, RawDataType::EVENT_GROUP); + APSARA_TEST_TRUE(flusherAPM.BuildRequest(&item, req, &keepItem, &errMsg)); + APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); + APSARA_TEST_EQUAL(apmSubpath[zz], req->mUrl); + APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); + APSARA_TEST_FALSE(req->mHeader[DATE].empty()); + APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); + APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); + APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); + APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); + APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); + APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); + APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); + APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); + APSARA_TEST_EQUAL(body, req->mBody); + APSARA_TEST_TRUE(req->mHTTPSFlag); + APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHeader[HOST]); + } + } { // shard hash SLSSenderQueueItem item("hello, world!", @@ -1692,299 +1827,6 @@ void FlusherSLSUnittest::OnGoPipelineSend() { } } - -std::string GenerateRandomString(size_t length) { - const std::string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; - std::random_device rd; - std::mt19937 generator(rd()); - std::uniform_int_distribution<> distribution(0, chars.size() - 1); - std::string result; - for (size_t i = 0; i < length; ++i) { - result += chars[distribution(generator)]; - } - return result; -} -void FlusherSLSUnittest::TestSendAPMTraces() { - Json::Value configJson, optionalGoPipeline; - string configStr, errorMsg; - configStr = R"( - { - "Type": "flusher_sls", - "TelemetryType": "arms_traces", - "Aliuid": "1108555361245511", - "Project": "proj-xtrace-505959c38776d9324945dbff709582-cn-hangzhou", - "Region": "cn-hangzhou", - "Endpoint": "pub-cn-hangzhou-staging.log.aliyuncs.com", - "Match": { - "Type": "tag", - "Key": "data_type", - "Value": "trace" - } - } - )"; - ParseJsonTable(configStr, configJson, errorMsg); - FlusherSLS flusher; - flusher.SetContext(ctx); - flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); - flusher.Init(configJson, optionalGoPipeline); - SLSSenderQueueItem item("hello, world!", 100, &flusher, flusher.GetQueueKey(), flusher.mLogstore); - bool keepItem = false; - std::string errMsg = ""; - unique_ptr req; - APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); - { - auto now = std::chrono::system_clock::now(); - auto duration = now.time_since_epoch(); - auto seconds = std::chrono::duration_cast(duration).count(); - auto nano = std::chrono::duration_cast(duration).count(); - std::vector> items; - // construct vector - // 1000 timeseries for app - std::vector app_ids = {"eeeb8df999f59f569da84d27fa408a94", - "deddf8ef215107d8fd37540ac4e3291b", - "52abe1564d8ee3fea66e9302fc21d80d", - "87f79be5ab74d72b4a10b62c02dc7f34", - "1796627f8e0b7fbba042c145820311f9"}; - std::vector service_name - = {"test-service-1", "test-service-2", "test-service-3", "test-service-4", "test-service-5"}; - for (size_t i = 0; i < app_ids.size(); i++) { - std::shared_ptr mSourceBuffer = std::make_shared(); - ; - PipelineEventGroup mTestEventGroup(mSourceBuffer); - mTestEventGroup.SetTag(std::string("serviceName"), service_name[i]); - mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); - mTestEventGroup.SetTag(std::string("source_ip"), "10.54.0.55"); - mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); - mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); - mTestEventGroup.SetTag(std::string("data_type"), std::string("trace")); - for (size_t j = 0; j < 25; j++) { - auto spanEvent = mTestEventGroup.AddSpanEvent(); - // spanEvent->SetScopeTag(); - spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); - spanEvent->SetTag(std::string("workloadKind"), std::string("faceless")); - spanEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); - spanEvent->SetTag(std::string("host"), std::string("10.54.0.33")); - spanEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local/" + std::to_string(j))); - spanEvent->SetTag(std::string("rpcType"), std::string("0")); - spanEvent->SetTag(std::string("callType"), std::string("http")); - spanEvent->SetTag(std::string("statusCode"), std::string("200")); - spanEvent->SetTag(std::string("version"), std::string("HTTP1.1")); - spanEvent->SetName("/oneagent/qianlu/local/" + std::to_string(j)); - spanEvent->SetKind(SpanEvent::Kind::Server); - std::string trace_id = GenerateRandomString(32); - std::string span_id = GenerateRandomString(16); - spanEvent->SetSpanId(span_id); - spanEvent->SetTraceId(trace_id); - spanEvent->SetStartTimeNs(nano - 5e6); - spanEvent->SetEndTimeNs(nano); - spanEvent->SetTimestamp(seconds); - } - for (size_t j = 0; j < 25; j++) { - auto spanEvent = mTestEventGroup.AddSpanEvent(); - spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); - spanEvent->SetTag(std::string("workloadKind"), std::string("faceless")); - spanEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); - spanEvent->SetTag(std::string("host"), std::string("10.54.0.33")); - spanEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local/" + std::to_string(j))); - spanEvent->SetTag(std::string("rpcType"), std::string("25")); - spanEvent->SetTag(std::string("callType"), std::string("http-client")); - spanEvent->SetTag(std::string("statusCode"), std::string("200")); - spanEvent->SetTag(std::string("version"), std::string("HTTP1.1")); - spanEvent->SetName("/oneagent/qianlu/local/" + std::to_string(j)); - spanEvent->SetKind(SpanEvent::Kind::Client); - std::string trace_id = GenerateRandomString(32); - std::string span_id = GenerateRandomString(16); - spanEvent->SetSpanId(span_id); - spanEvent->SetTraceId(trace_id); - spanEvent->SetStartTimeNs(nano - 5e9); - spanEvent->SetEndTimeNs(nano); - spanEvent->SetTimestamp(seconds); - } - // TODO flush - flusher.Send(std::move(mTestEventGroup)); - } - } - flusher.FlushAll(); - vector res; - SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); - APSARA_TEST_EQUAL(1U, res.size()); -} -void FlusherSLSUnittest::TestSendAPMMetrics() { - Json::Value configJson, optionalGoPipeline; - string configStr, errorMsg; - configStr = R"( - { - "Type": "flusher_sls", - "TelemetryType": "arms_metrics", - "Aliuid": "1108555361245511", - "Project": "proj-xtrace-505959c38776d9324945dbff709582-cn-hangzhou", - "Region": "cn-hangzhou", - "Endpoint": "pub-cn-hangzhou-staging.log.aliyuncs.com", - "Match": { - "Type": "tag", - "Key": "data_type", - "Value": "metric" - } - } - )"; - ParseJsonTable(configStr, configJson, errorMsg); - FlusherSLS flusher; - flusher.SetContext(ctx); - flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); - flusher.Init(configJson, optionalGoPipeline); - // generate apm metrics - { - const std::vector app_metric_names = { - "arms_rpc_requests_count", - "arms_rpc_requests_slow_count", - "arms_rpc_requests_error_count", - "arms_rpc_requests_seconds", - "arms_rpc_requests_by_status_count", - }; - const std::vector tcp_metrics_names = { - "arms_npm_tcp_rtt_avg", - "arms_npm_tcp_count_by_state", - "arms_npm_tcp_conn_stats_count", - "arms_npm_tcp_drop_count", - "arms_npm_tcp_retrans_total", - "arms_npm_recv_packets_total", - "arms_npm_sent_packets_total", - "arms_npm_recv_bytes_total", - "arms_npm_sent_bytes_total", - }; - auto now = std::chrono::system_clock::now(); - auto duration = now.time_since_epoch(); - auto seconds = std::chrono::duration_cast(duration).count(); - std::vector> items; - // construct vector - // 1000 timeseries for app - std::vector app_ids = {"eeeb8df999f59f569da84d27fa408a94", - "deddf8ef215107d8fd37540ac4e3291b", - "52abe1564d8ee3fea66e9302fc21d80d", - "87f79be5ab74d72b4a10b62c02dc7f34", - "1796627f8e0b7fbba042c145820311f9"}; - for (size_t i = 0; i < app_ids.size(); i++) { - std::shared_ptr mSourceBuffer = std::make_shared(); - ; - PipelineEventGroup mTestEventGroup(mSourceBuffer); - mTestEventGroup.SetTag(std::string("pid"), std::string(app_ids[i])); - mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); - mTestEventGroup.SetTag(std::string("source_ip"), "10.54.0.55"); - mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); - mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); - mTestEventGroup.SetTag(std::string("data_type"), std::string("metric")); - for (size_t j = 0; j < app_metric_names.size(); j++) { - for (size_t z = 0; z < 10; z++) { - auto metricsEvent = mTestEventGroup.AddMetricEvent(); - metricsEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); - metricsEvent->SetTag(std::string("workloadKind"), std::string("faceless")); - metricsEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); - metricsEvent->SetTag(std::string("host"), std::string("10.54.0.33")); - metricsEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local" + std::to_string(z))); - metricsEvent->SetTag(std::string("rpcType"), std::string("0")); - metricsEvent->SetTag(std::string("callType"), std::string("http")); - metricsEvent->SetTag(std::string("statusCode"), std::string("200")); - metricsEvent->SetTag(std::string("version"), std::string("HTTP1.1")); - metricsEvent->SetName(app_metric_names[j]); - metricsEvent->SetValue(UntypedSingleValue{10.0}); - metricsEvent->SetTimestamp(seconds); - } - } - APSARA_TEST_TRUE(flusher.Send(std::move(mTestEventGroup))); - } - // tcp_metrics - for (size_t i = 0; i < app_ids.size(); i++) { - std::shared_ptr mSourceBuffer = std::make_shared(); - ; - PipelineEventGroup mTestEventGroup(mSourceBuffer); - mTestEventGroup.SetTag(std::string("pid"), std::string(app_ids[i])); - mTestEventGroup.SetTag(std::string("appId"), std::string(app_ids[i])); - mTestEventGroup.SetTag(std::string("source_ip"), "10.54.0.44"); - mTestEventGroup.SetTag(std::string("source"), std::string("ebpf")); - mTestEventGroup.SetTag(std::string("appType"), std::string("EBPF")); - mTestEventGroup.SetTag(std::string("data_type"), std::string("metric")); - for (size_t j = 0; j < tcp_metrics_names.size(); j++) { - for (size_t z = 0; z < 20; z++) { - auto metricsEvent = mTestEventGroup.AddMetricEvent(); - metricsEvent->SetName(tcp_metrics_names[j]); - metricsEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); - metricsEvent->SetTag(std::string("workloadKind"), std::string("qianlu")); - metricsEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); - metricsEvent->SetTag(std::string("host"), std::string("10.54.0.33")); - metricsEvent->SetTag(std::string("dest_ip"), std::string("10.54.0." + std::to_string(z))); - metricsEvent->SetTag(std::string("callType"), std::string("conn_stats")); - metricsEvent->SetValue(UntypedSingleValue{20.0}); - metricsEvent->SetTimestamp(seconds); - } - } - APSARA_TEST_TRUE(flusher.Send(std::move(mTestEventGroup))); - } - } - flusher.FlushAll(); - vector res; - SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); - APSARA_TEST_EQUAL(1U, res.size()); -} -void FlusherSLSUnittest::TestSendAPMAgentInfos() { - Json::Value configJson, optionalGoPipeline; - string configStr, errorMsg; - configStr = R"( - { - "Type": "flusher_sls", - "TelemetryType": "arms_agentinfo", - "Aliuid": "1108555361245511", - "Project": "proj-xtrace-505959c38776d9324945dbff709582-cn-hangzhou", - "Region": "cn-hangzhou", - "Endpoint": "pub-cn-hangzhou-staging.log.aliyuncs.com", - "Match": { - "Type": "tag", - "Key": "data_type", - "Value": "agent_info" - } - } - )"; - ParseJsonTable(configStr, configJson, errorMsg); - FlusherSLS flusher; - flusher.SetContext(ctx); - flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); - flusher.Init(configJson, optionalGoPipeline); - std::shared_ptr sourceBuffer = std::make_shared(); - PipelineEventGroup eventGroup(sourceBuffer); - eventGroup.SetTag(std::string("data_type"), std::string("agent_info")); - const std::string app_id_key = "appId"; - const std::string agentIdKey = "agentId"; - const std::string app_prefix = "app-"; - const std::string agent_version = "1.0.0-rc"; - const std::string vmVersion = "xxxx"; - const std::string startTimestamp = "1729479979167"; // ms - const std::string startTimestampKey = "startTimeStamp"; - const std::string appNameKey = "appName"; - const std::string appNamePrefix = "test-ebpf-app-"; - const std::string ipKey = "ip"; - const std::string ip_prefix = "30.221.146."; - const std::string agentVersionKey = "agentVersion"; - for (int i = 0; i < 50; i++) { - std::string app = app_prefix + std::to_string(i); - std::string ip = ip_prefix + std::to_string(i); - auto logEvent = eventGroup.AddLogEvent(); - logEvent->SetContent(app_id_key, app); - logEvent->SetContent(ipKey, ip); - logEvent->SetContent(agentIdKey, app); - logEvent->SetContent(appNameKey, appNamePrefix + std::to_string(i)); - logEvent->SetContent(startTimestampKey, startTimestamp); - logEvent->SetContent(agentVersionKey, "0.0.1"); - // auto now = std::chrono::steady_clock::now(); - logEvent->SetTimestamp( - std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) - .count()); - } - APSARA_TEST_TRUE(flusher.Send(std::move(eventGroup))); - flusher.FlushAll(); - vector res; - SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); - APSARA_TEST_EQUAL(1U, res.size()); -} - UNIT_TEST_CASE(FlusherSLSUnittest, OnSuccessfulInit) UNIT_TEST_CASE(FlusherSLSUnittest, OnFailedInit) UNIT_TEST_CASE(FlusherSLSUnittest, OnPipelineUpdate) @@ -1994,9 +1836,6 @@ UNIT_TEST_CASE(FlusherSLSUnittest, TestFlush) UNIT_TEST_CASE(FlusherSLSUnittest, TestFlushAll) UNIT_TEST_CASE(FlusherSLSUnittest, TestAddPackId) UNIT_TEST_CASE(FlusherSLSUnittest, OnGoPipelineSend) -UNIT_TEST_CASE(FlusherSLSUnittest, TestSendAPMAgentInfos) -UNIT_TEST_CASE(FlusherSLSUnittest, TestSendAPMMetrics) -UNIT_TEST_CASE(FlusherSLSUnittest, TestSendAPMTraces) } // namespace logtail