diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index ff94990e6d..fd2a0444fc 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -770,6 +770,7 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) { bufferMeta.set_shardhashkey(data->mShardHashKey); bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType())); bufferMeta.set_telemetrytype(flusher->mTelemetryType); + bufferMeta.set_subpath(flusher->GetSubpath()); #ifdef __ENTERPRISE__ bufferMeta.set_endpointmode(GetEndpointMode(flusher->mEndpointMode)); #endif @@ -866,30 +867,57 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe } else { dataType = RawDataType::EVENT_GROUP; } - if (bufferMeta.has_telemetrytype() && bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { - return PostMetricStoreLogs(accessKeyId, - accessKeySecret, - type, - host, - httpsFlag, - bufferMeta.project(), - bufferMeta.logstore(), - GetSLSCompressTypeString(bufferMeta.compresstype()), - logData, - bufferMeta.rawsize()); - } else { - return PostLogStoreLogs(accessKeyId, - accessKeySecret, - type, - host, - httpsFlag, - bufferMeta.project(), - bufferMeta.logstore(), - GetSLSCompressTypeString(bufferMeta.compresstype()), - dataType, - logData, - bufferMeta.rawsize(), - bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : ""); + + auto telemetryType + = bufferMeta.has_telemetrytype() ? bufferMeta.telemetrytype() : sls_logs::SLS_TELEMETRY_TYPE_LOGS; + switch (telemetryType) { + case sls_logs::SLS_TELEMETRY_TYPE_LOGS: + return PostLogStoreLogs(accessKeyId, + accessKeySecret, + type, + host, + httpsFlag, + bufferMeta.project(), + bufferMeta.logstore(), + GetSLSCompressTypeString(bufferMeta.compresstype()), + dataType, + logData, + bufferMeta.rawsize(), + bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : ""); + case sls_logs::SLS_TELEMETRY_TYPE_METRICS: + return PostMetricStoreLogs(accessKeyId, + accessKeySecret, + type, + host, + httpsFlag, + bufferMeta.project(), + bufferMeta.logstore(), + GetSLSCompressTypeString(bufferMeta.compresstype()), + logData, + bufferMeta.rawsize()); + case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS: + case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES: + case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS: + return PostAPMBackendLogs(accessKeyId, + accessKeySecret, + type, + host, + httpsFlag, + bufferMeta.project(), + bufferMeta.logstore(), + GetSLSCompressTypeString(bufferMeta.compresstype()), + dataType, + logData, + bufferMeta.rawsize(), + bufferMeta.subpath()); + default: { + // should not happen + LOG_ERROR(sLogger, ("Unhandled telemetry type", " should not happen")); + SLSResponse response; + response.mErrorCode = LOGE_REQUEST_ERROR; + response.mErrorMsg = "Unhandled telemetry type"; + return response; + } } } diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 676f5fbc9b..0044f4583a 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -294,16 +294,57 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mContext->GetRegion()); } + // TelemetryType + string telemetryType; + if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) { + PARAM_WARNING_DEFAULT(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + "logs", + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } else if (telemetryType == "metrics") { + // TelemetryType set to metrics + mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS + : sls_logs::SLS_TELEMETRY_TYPE_LOGS; + } else if (telemetryType == "arms_agentinfo") { + mSubpath = APM_AGENTINFOS_URL; + mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS; + } else if (telemetryType == "arms_metrics") { + mSubpath = APM_METRICS_URL; + mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS; + } else if (telemetryType == "arms_traces") { + mSubpath = APM_TRACES_URL; + mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES; + } else if (!telemetryType.empty() && telemetryType != "logs") { + // TelemetryType invalid + PARAM_WARNING_DEFAULT(mContext->GetLogger(), + mContext->GetAlarm(), + "string param TelemetryType is not valid", + "logs", + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } + // Logstore - if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { - PARAM_ERROR_RETURN(mContext->GetLogger(), - mContext->GetAlarm(), - errorMsg, - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); + if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { + // log and metric + if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { + PARAM_ERROR_RETURN(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } } // Region @@ -409,32 +450,6 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } #endif - // TelemetryType - string telemetryType; - if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) { - PARAM_WARNING_DEFAULT(mContext->GetLogger(), - mContext->GetAlarm(), - errorMsg, - "logs", - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); - } else if (telemetryType == "metrics") { - mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS - : sls_logs::SLS_TELEMETRY_TYPE_LOGS; - } else if (!telemetryType.empty() && telemetryType != "logs") { - PARAM_WARNING_DEFAULT(mContext->GetLogger(), - mContext->GetAlarm(), - "string param TelemetryType is not valid", - "logs", - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); - } // Batch const char* key = "Batch"; @@ -465,25 +480,17 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline } // ShardHashKeys - if (!GetOptionalListParam(config, "ShardHashKeys", mShardHashKeys, errorMsg)) { - PARAM_WARNING_IGNORE(mContext->GetLogger(), - mContext->GetAlarm(), - errorMsg, - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); - } else if (!mShardHashKeys.empty() && mContext->IsExactlyOnceEnabled()) { - mShardHashKeys.clear(); - PARAM_WARNING_IGNORE(mContext->GetLogger(), - mContext->GetAlarm(), - "exactly once enabled when ShardHashKeys is not empty", - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); + if (mTelemetryType == sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS && !mContext->IsExactlyOnceEnabled()) { + if (!GetOptionalListParam(config, "ShardHashKeys", mShardHashKeys, errorMsg)) { + PARAM_WARNING_IGNORE(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } } DefaultFlushStrategyOptions strategy{ @@ -667,6 +674,11 @@ bool FlusherSLS::BuildRequest(SenderQueueItem* item, unique_ptr case sls_logs::SLS_TELEMETRY_TYPE_METRICS: req = CreatePostMetricStoreLogsRequest(accessKeyId, accessKeySecret, type, data); break; + case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS: + case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS: + case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES: + req = CreatePostAPMBackendRequest(accessKeyId, accessKeySecret, type, data, mSubpath); + break; default: break; } @@ -1245,6 +1257,41 @@ unique_ptr FlusherSLS::CreatePostMetricStoreLogsRequest(const s 1); } +unique_ptr FlusherSLS::CreatePostAPMBackendRequest(const string& accessKeyId, + const string& accessKeySecret, + SLSClientManager::AuthType type, + SLSSenderQueueItem* item, + const std::string& subPath) const { + string query; + map header; + PreparePostAPMBackendRequest(accessKeyId, + accessKeySecret, + type, + item->mCurrentHost, + item->mRealIpFlag, + mProject, + item->mLogstore, + CompressTypeToString(mCompressor->GetCompressType()), + item->mType, + item->mData, + item->mRawSize, + mSubpath, + query, + header); + bool httpsFlag = SLSClientManager::GetInstance()->UsingHttps(mRegion); + return make_unique(HTTP_POST, + httpsFlag, + item->mCurrentHost, + httpsFlag ? 443 : 80, + subPath, + "", + header, + item->mData, + item, + INT32_FLAG(default_http_request_timeout_sec), + 1); +} + sls_logs::SlsCompressType ConvertCompressType(CompressType type) { sls_logs::SlsCompressType compressType = sls_logs::SLS_CMP_NONE; switch (type) { diff --git a/core/plugin/flusher/sls/FlusherSLS.h b/core/plugin/flusher/sls/FlusherSLS.h index a228e2567d..b26919e213 100644 --- a/core/plugin/flusher/sls/FlusherSLS.h +++ b/core/plugin/flusher/sls/FlusherSLS.h @@ -79,6 +79,8 @@ class FlusherSLS : public HttpFlusher { // for use of Go pipeline and shennong bool Send(std::string&& data, const std::string& shardHashKey, const std::string& logstore = ""); + std::string GetSubpath() const { return mSubpath; } + std::string mProject; std::string mLogstore; std::string mRegion; @@ -130,6 +132,13 @@ class FlusherSLS : public HttpFlusher { const std::string& accessKeySecret, SLSClientManager::AuthType type, SLSSenderQueueItem* item) const; + std::unique_ptr CreatePostAPMBackendRequest(const std::string& accessKeyId, + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + SLSSenderQueueItem* item, + const std::string& subPath) const; + + std::string mSubpath; Batcher mBatcher; std::unique_ptr mGroupSerializer; diff --git a/core/plugin/flusher/sls/SLSClientManager.cpp b/core/plugin/flusher/sls/SLSClientManager.cpp index 9989d1991e..c46e3ce732 100644 --- a/core/plugin/flusher/sls/SLSClientManager.cpp +++ b/core/plugin/flusher/sls/SLSClientManager.cpp @@ -236,6 +236,50 @@ void PreparePostMetricStoreLogsRequest(const string& accessKeyId, header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature; } +void PreparePostAPMBackendRequest(const string& accessKeyId, + const string& accessKeySecret, + SLSClientManager::AuthType type, + const string& host, + bool isHostIp, + const string& project, + const string& logstore, + const string& compressType, + RawDataType dataType, + const string& body, + size_t rawSize, + const string& path, + string& query, + map& header) { + if (isHostIp) { + header[HOST] = project + "." + host; + } else { + header[HOST] = host; + } + header[USER_AGENT] = SLSClientManager::GetInstance()->GetUserAgent(); + header[DATE] = GetDateString(); + header[CONTENT_TYPE] = TYPE_LOG_PROTOBUF; + header[CONTENT_LENGTH] = to_string(body.size()); + header[CONTENT_MD5] = CalcMD5(body); + header[X_LOG_APIVERSION] = LOG_API_VERSION; + header[X_LOG_SIGNATUREMETHOD] = HMAC_SHA1; + if (!compressType.empty()) { + header[X_LOG_COMPRESSTYPE] = compressType; + } + if (dataType == RawDataType::EVENT_GROUP) { + header[X_LOG_BODYRAWSIZE] = to_string(rawSize); + } else { + header[X_LOG_BODYRAWSIZE] = to_string(body.size()); + header[X_LOG_MODE] = LOG_MODE_BATCH_GROUP; + } + if (type == SLSClientManager::AuthType::ANONYMOUS) { + header[X_LOG_KEYPROVIDER] = MD5_SHA1_SALT_KEYPROVIDER; + } + + map parameterList; + string signature = GetUrlSignature(HTTP_POST, path, header, parameterList, body, accessKeySecret); + header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature; +} + SLSResponse PostLogStoreLogs(const string& accessKeyId, const string& accessKeySecret, SLSClientManager::AuthType type, @@ -303,6 +347,41 @@ SLSResponse PostMetricStoreLogs(const string& accessKeyId, return ParseHttpResponse(response); } +SLSResponse PostAPMBackendLogs(const string& accessKeyId, + const string& accessKeySecret, + SLSClientManager::AuthType type, + const string& host, + bool httpsFlag, + const string& project, + const string& logstore, + const string& compressType, + RawDataType dataType, + const string& body, + size_t rawSize, + const std::string& subpath) { + string query; + map header; + PreparePostAPMBackendRequest(accessKeyId, + accessKeySecret, + type, + host, + false, // sync request always uses vip + project, + logstore, + compressType, + dataType, + body, + rawSize, + subpath, + query, + header); + HttpResponse response; + SendHttpRequest( + make_unique(HTTP_POST, httpsFlag, host, httpsFlag ? 443 : 80, subpath, "", header, body), + response); + return ParseHttpResponse(response); +} + SLSResponse PutWebTracking(const string& host, bool httpsFlag, const string& logstore, diff --git a/core/plugin/flusher/sls/SLSClientManager.h b/core/plugin/flusher/sls/SLSClientManager.h index 42384c8a1d..a242399167 100644 --- a/core/plugin/flusher/sls/SLSClientManager.h +++ b/core/plugin/flusher/sls/SLSClientManager.h @@ -91,6 +91,20 @@ void PreparePostMetricStoreLogsRequest(const std::string& accessKeyId, size_t rawSize, std::string& path, std::map& header); +void PreparePostAPMBackendRequest(const std::string& accessKeyId, + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + const std::string& host, + bool isHostIp, + const std::string& project, + const std::string& logstore, + const std::string& compressType, + RawDataType dataType, + const std::string& body, + size_t rawSize, + const std::string& path, + std::string& query, + std::map& header); SLSResponse PostLogStoreLogs(const std::string& accessKeyId, const std::string& accessKeySecret, SLSClientManager::AuthType type, @@ -113,6 +127,18 @@ SLSResponse PostMetricStoreLogs(const std::string& accessKeyId, const std::string& compressType, const std::string& body, size_t rawSize); +SLSResponse PostAPMBackendLogs(const std::string& accessKeyId, + const std::string& accessKeySecret, + SLSClientManager::AuthType type, + const std::string& host, + bool httpsFlag, + const std::string& project, + const std::string& logstore, + const std::string& compressType, + RawDataType dataType, + const std::string& body, + size_t rawSize, + const std::string& subpath); SLSResponse PutWebTracking(const std::string& host, bool httpsFlag, const std::string& logstore, diff --git a/core/plugin/flusher/sls/SLSConstant.cpp b/core/plugin/flusher/sls/SLSConstant.cpp index d81dbf284e..113a9155ca 100644 --- a/core/plugin/flusher/sls/SLSConstant.cpp +++ b/core/plugin/flusher/sls/SLSConstant.cpp @@ -22,6 +22,10 @@ const string LOGSTORES = "/logstores"; const string METRICSTORES = "/prometheus"; const string HEALTH = "/health"; +const string APM_METRICS_URL = "/apm/metric/arms/v1/metric_log"; +const string APM_TRACES_URL = "/apm/trace/arms/v1/trace_log"; +const string APM_AGENTINFOS_URL = "/apm/meta/arms/v1/meta_log/AgentInfo"; + const string LOGTAIL_USER_AGENT = "ali-log-logtail"; const string CONTENT_MD5 = "Content-MD5"; diff --git a/core/plugin/flusher/sls/SLSConstant.h b/core/plugin/flusher/sls/SLSConstant.h index 5874d5f2ec..bc62ab682b 100644 --- a/core/plugin/flusher/sls/SLSConstant.h +++ b/core/plugin/flusher/sls/SLSConstant.h @@ -24,6 +24,10 @@ extern const std::string LOGSTORES; extern const std::string METRICSTORES; extern const std::string HEALTH; +extern const std::string APM_METRICS_URL; +extern const std::string APM_TRACES_URL; +extern const std::string APM_AGENTINFOS_URL; + extern const std::string CONTENT_MD5; extern const std::string LOGTAIL_USER_AGENT; diff --git a/core/protobuf/sls/logtail_buffer_meta.proto b/core/protobuf/sls/logtail_buffer_meta.proto index 131e4099d7..93c9917f3f 100644 --- a/core/protobuf/sls/logtail_buffer_meta.proto +++ b/core/protobuf/sls/logtail_buffer_meta.proto @@ -37,4 +37,5 @@ message LogtailBufferMeta optional SlsTelemetryType telemetrytype = 9; optional EndpointMode endpointmode = 10; optional string endpoint = 11; + optional string subpath = 12; } diff --git a/core/protobuf/sls/sls_logs.proto b/core/protobuf/sls/sls_logs.proto index 9f3aad5856..5535e6209e 100644 --- a/core/protobuf/sls/sls_logs.proto +++ b/core/protobuf/sls/sls_logs.proto @@ -27,6 +27,9 @@ enum SlsTelemetryType { SLS_TELEMETRY_TYPE_LOGS = 0; SLS_TELEMETRY_TYPE_METRICS = 1; + SLS_TELEMETRY_TYPE_APM_METRICS = 2; + SLS_TELEMETRY_TYPE_APM_TRACES = 3; + SLS_TELEMETRY_TYPE_APM_AGENTINFOS = 4; } message Log diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index 073b859441..1369561fb3 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include "json/json.h" @@ -157,7 +158,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() { "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "Aliuid": "123456789", - "TelemetryType": "metrics", + "TelemetryType": "logs", "ShardHashKeys": [ "__source__" ] @@ -179,7 +180,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() { APSARA_TEST_EQUAL("", flusher->mAliuid); #endif APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", flusher->mEndpoint); - APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_METRICS, flusher->mTelemetryType); + APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType); APSARA_TEST_EQUAL(1U, flusher->mShardHashKeys.size()); APSARA_TEST_EQUAL("__source__", flusher->mShardHashKeys[0]); SenderQueueManager::GetInstance()->Clear(); @@ -457,6 +458,67 @@ void FlusherSLSUnittest::OnSuccessfulInit() { APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value()); SenderQueueManager::GetInstance()->Clear(); + // apm + std::vector apmConfigStr = {R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_traces", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "trace" + } + } + )", + R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_metrics", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "metric" + } + } + )", + R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_agentinfo", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "agent_info" + } + } + )"}; + std::vector apmSubpath = {APM_TRACES_URL, APM_METRICS_URL, APM_AGENTINFOS_URL}; + std::vector apmTelemetryTypes = { + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_TRACES, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_METRICS, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_AGENTINFOS, + }; + for (size_t ii = 0; ii < apmConfigStr.size(); ii++) { + auto& cfg = apmConfigStr[ii]; + APSARA_TEST_TRUE(ParseJsonTable(cfg, configJson, errorMsg)); + flusher.reset(new FlusherSLS()); + flusher->SetContext(ctx); + flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); + APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); + APSARA_TEST_EQUAL(flusher->mSubpath, apmSubpath[ii]); + APSARA_TEST_EQUAL(flusher->mTelemetryType, apmTelemetryTypes[ii]); + SenderQueueManager::GetInstance()->Clear(); + } + // go param ctx.SetIsFlushingThroughGoPipelineFlag(true); configStr = R"( @@ -887,6 +949,92 @@ void FlusherSLSUnittest::TestBuildRequest() { APSARA_TEST_EQUAL("test_project.test_endpoint", item.mCurrentHost); #endif } + { + // APM backend + Json::Value configJsonAPM, optionalGoPipelineAPM; + string errorMsgAPM; + // apm + std::vector apmConfigStr = {R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_traces", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "trace" + } + } + )", + R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_metrics", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "metric" + } + } + )", + R"( + { + "Type": "flusher_sls", + "TelemetryType": "arms_agentinfo", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "agent_info" + } + } + )"}; + std::vector apmSubpath = {APM_TRACES_URL, APM_METRICS_URL, APM_AGENTINFOS_URL}; + std::vector apmTelemetryTypes = { + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_TRACES, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_METRICS, + sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_AGENTINFOS, + }; + for (size_t zz = 0; zz < apmConfigStr.size(); zz++) { + std::string configStrAPM = apmConfigStr[zz]; + APSARA_TEST_TRUE(ParseJsonTable(configStrAPM, configJsonAPM, errorMsgAPM)); + FlusherSLS flusherAPM; + flusherAPM.SetContext(ctx); + flusherAPM.SetMetricsRecordRef(FlusherSLS::sName, "flusher_sls_for_apm"); + APSARA_TEST_TRUE(flusherAPM.Init(configJsonAPM, optionalGoPipeline)); + + // normal + SLSSenderQueueItem item("hello, world!", + rawSize, + &flusherAPM, + flusherAPM.GetQueueKey(), + flusherAPM.mLogstore, + RawDataType::EVENT_GROUP); + APSARA_TEST_TRUE(flusherAPM.BuildRequest(&item, req, &keepItem, &errMsg)); + APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); + APSARA_TEST_EQUAL(apmSubpath[zz], req->mUrl); + APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); + APSARA_TEST_FALSE(req->mHeader[DATE].empty()); + APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); + APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); + APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); + APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); + APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); + APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); + APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); + APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); + APSARA_TEST_EQUAL(body, req->mBody); + APSARA_TEST_TRUE(req->mHTTPSFlag); + APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHeader[HOST]); + } + } { // shard hash SLSSenderQueueItem item("hello, world!", diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index 20cc535975..fc17768739 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -51,6 +51,7 @@ class PipelineUnittest : public ::testing::Test { void TestFlushBatch() const; void TestInProcessingCount() const; void TestWaitAllItemsInProcessFinished() const; + void TestMultiFlusherAndRouter() const; protected: static void SetUpTestCase() { @@ -2945,6 +2946,74 @@ void PipelineUnittest::TestWaitAllItemsInProcessFinished() const { APSARA_TEST_EQUAL(std::future_status::ready, future.wait_for(std::chrono::seconds(0))); } + +void PipelineUnittest::TestMultiFlusherAndRouter() const { + unique_ptr configJson; + string configStr, errorMsg; + unique_ptr config; + unique_ptr pipeline; + // new pipeline + configStr = R"( + { + "global": { + "ProcessPriority": 1 + }, + "inputs": [ + { + "Type": "input_file", + "FilePaths": [ + "/home/test.log" + ] + } + ], + "flushers": [ + { + "Type": "flusher_sls", + "TelemetryType": "arms_traces", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "trace" + } + }, + { + "Type": "flusher_sls", + "TelemetryType": "arms_metrics", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "metric" + } + }, + { + "Type": "flusher_sls", + "TelemetryType": "arms_agentinfo", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "agent_info" + } + } + ] + } + )"; + configJson.reset(new Json::Value()); + APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); + config.reset(new PipelineConfig(configName, std::move(configJson))); + APSARA_TEST_TRUE(config->Parse()); + pipeline.reset(new Pipeline()); + APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); +} + UNIT_TEST_CASE(PipelineUnittest, OnSuccessfulInit) UNIT_TEST_CASE(PipelineUnittest, OnFailedInit) UNIT_TEST_CASE(PipelineUnittest, TestProcessQueue) @@ -2956,6 +3025,8 @@ UNIT_TEST_CASE(PipelineUnittest, TestSend) UNIT_TEST_CASE(PipelineUnittest, TestFlushBatch) UNIT_TEST_CASE(PipelineUnittest, TestInProcessingCount) UNIT_TEST_CASE(PipelineUnittest, TestWaitAllItemsInProcessFinished) +UNIT_TEST_CASE(PipelineUnittest, TestMultiFlusherAndRouter) + } // namespace logtail