From 7c80c3b2851966c8caad0329b4966cdf058a0065 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 22 Nov 2024 13:38:10 +0800 Subject: [PATCH] add development doc for native plugin (#1907) --- core/plugin/flusher/sls/SLSResponse.cpp | 2 +- docs/cn/SUMMARY.md | 10 +- .../how-to-write-native-flusher-plugins.md | 209 ++++++++++++++++++ .../how-to-write-native-input-plugins.md | 98 ++++++++ 4 files changed, 314 insertions(+), 5 deletions(-) create mode 100644 docs/cn/developer-guide/plugin-development/how-to-write-native-flusher-plugins.md create mode 100644 docs/cn/developer-guide/plugin-development/how-to-write-native-input-plugins.md diff --git a/core/plugin/flusher/sls/SLSResponse.cpp b/core/plugin/flusher/sls/SLSResponse.cpp index b55896aa1a..d9fa405479 100644 --- a/core/plugin/flusher/sls/SLSResponse.cpp +++ b/core/plugin/flusher/sls/SLSResponse.cpp @@ -34,7 +34,7 @@ bool SLSResponse::Parse(const HttpResponse& response) { mStatusCode = response.GetStatusCode(); if (mStatusCode == 0) { - mErrorCode = sdk::LOG_REQUEST_TIMEOUT; + mErrorCode = sdk::LOGE_REQUEST_TIMEOUT; mErrorMsg = "Request timeout"; } else if (mStatusCode != 200) { try { diff --git a/docs/cn/SUMMARY.md b/docs/cn/SUMMARY.md index f830edbb95..9a99de227d 100644 --- a/docs/cn/SUMMARY.md +++ b/docs/cn/SUMMARY.md @@ -150,10 +150,12 @@ * [Checkpoint接口](developer-guide/plugin-development/checkpoint-api.md) * [Logger接口](developer-guide/plugin-development/logger-api.md) * [自监控指标接口](developer-guide/plugin-development/plugin-self-monitor-guide.md) - * [如何开发Input插件](developer-guide/plugin-development/how-to-write-input-plugins.md) - * [如何开发Processor插件](developer-guide/plugin-development/how-to-write-processor-plugins.md) - * [如何开发Aggregator插件](developer-guide/plugin-development/how-to-write-aggregator-plugins.md) - * [如何开发Flusher插件](developer-guide/plugin-development/how-to-write-flusher-plugins.md) + * [如何开发原生Input插件](developer-guide/plugin-development/how-to-write-native-input-plugins.md) + * [如何开发原生Flusher插件](developer-guide/plugin-development/how-to-write-native-flusher-plugins.md) + * [如何开发扩展Input插件](developer-guide/plugin-development/how-to-write-input-plugins.md) + * [如何开发扩展Processor插件](developer-guide/plugin-development/how-to-write-processor-plugins.md) + * [如何开发扩展Aggregator插件](developer-guide/plugin-development/how-to-write-aggregator-plugins.md) + * [如何开发扩展Flusher插件](developer-guide/plugin-development/how-to-write-flusher-plugins.md) * [如何生成插件文档](developer-guide/plugin-development/how-to-genernate-plugin-docs.md) * [插件文档规范](docs/cn/developer-guide/plugin-development/plugin-doc-templete.md) * [纯插件模式启动](developer-guide/plugin-development/pure-plugin-start.md) diff --git a/docs/cn/developer-guide/plugin-development/how-to-write-native-flusher-plugins.md b/docs/cn/developer-guide/plugin-development/how-to-write-native-flusher-plugins.md new file mode 100644 index 0000000000..031b1ed76c --- /dev/null +++ b/docs/cn/developer-guide/plugin-development/how-to-write-native-flusher-plugins.md @@ -0,0 +1,209 @@ +# 如何开发原生Flusher插件 + +## 接口定义 + +```c++ +class Flusher : public Plugin { +public: + // 用于初始化插件参数,同时根据参数初始化Flusher级的组件 + virtual bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) = 0; + virtual bool Start(); + virtual bool Stop(bool isPipelineRemoving); + // 用于将处理插件的输出经过聚合、序列化和压缩处理后,放入发送队列 + virtual void Send(PipelineEventGroup&& g) = 0; + // 用于将聚合组件内指定聚合队列内的数据进行强制发送 + virtual void Flush(size_t key) = 0; + // 用于将聚合组件内的所有数据进行强制发送 + virtual void FlushAll() = 0; +}; +``` + +对于使用Http协议发送数据的Flusher,进一步定义了HttpFlusher,接口如下: + +```c++ +class HttpFlusher : public Flusher { +public: + // 用于将待发送数据打包成http请求 + virtual bool BuildRequest(SenderQueueItem* item, std::unique_ptr& req, bool* keepItem) const = 0; + // 用于发送完成后进行记录和处理 + virtual void OnSendDone(const HttpResponse& response, SenderQueueItem* item) = 0; +}; +``` + +## Flusher级组件 + +### 聚合(必选) + +* 作用:将多个小的PipelineEventGroup根据tag异同合并成一个大的group,提升发送效率 + +* 参数: + +可以在flusher的参数中配置Batch字段,该字段的类型为map,其中允许包含的字段如下: + +| **名称** | **类型** | **默认值** | **说明** | +| --- | --- | --- | --- | +| MinCnt | uint | 每个Flusher自定义 | 每个聚合队列最少包含的event数量 | +| MinSizeBytes | uint | 每个Flusher自定义 | 每个聚合队列最小的尺寸 | +| TimeoutSecs | uint | 每个Flusher自定义 | 每个聚合队列在第一个event加入后,在被输出前最多等待的时间 | + +* 类接口: + +```c++ +template +class Batcher { +public: + bool Init(const Json::Value& config, + Flusher* flusher, + const DefaultFlushStrategyOptions& strategy, + bool enableGroupBatch = false); + void Add(PipelineEventGroup&& g, std::vector& res); + void FlushQueue(size_t key, BatchedEventsList& res); + void FlushAll(std::vector& res); +} +``` + +### 序列化(必选) + +* 作用:对聚合模块的输出进行序列化,分为2个层级: + + * event级:对每一个event单独进行序列化 + + * event group级:对多个event进行批量的序列化 + +* 类接口: + +```c++ +// T: PipelineEventPtr, BatchedEvents, BatchedEventsList +template +class Serializer { +private: + virtual bool Serialize(T&& p, std::string& res, std::string& errorMsg) = 0; +}; + +using EventSerializer = Serializer; +using EventGroupSerializer = Serializer; +``` + +### 压缩(可选) + +* 作用:对序列化后的结果进行压缩 + +* 类接口: + +```c++ +class Compressor { +private: + virtual bool Compress(const std::string& input, std::string& output, std::string& errorMsg) = 0; +}; +``` + +## 开发步骤 + +下面以开发一个HttpFlusher为例,说明整个开发步骤: + +1. 在plugin/flusher目录下新建一个Flusherxxx.h和Flusherxxx.cpp文件,用于派生HttpFlusher接口生成具体的插件类; + +2. 在Flusherxxx.h文件中定义新的输出插件类Flusherxxx,满足以下要求: + + a. 所有的可配置参数的权限为public,其余参数的权限均为private + + b. 新增一个聚合组件:`Batcher<> mBatcher;` + + c. 新增一个序列化组件:`std::unique_ptr mSerializer;`,其中T为`EventSerializer`和`EventGroupSerializer`中的一种 + + d. 如果需要压缩,则新增一个压缩组件:`std::unique_ptr mCompressor;` + +3. 在pipeline/serializer目录下新建一个xxxSerializer.h和xxxSerializer.cpp文件,用于派生`Serializer` 接口生成具体类; + +4. (可选)如果需要压缩组件,且现有压缩组件库中没有所需算法,则新增一个压缩组件: + + a. 在common/compression/CompressType.h文件中,扩展CompressType类用以标识新的压缩算法; + + b. 在common/compression目录下新建一个xxxCompressor.h和xxxCompressor.cpp文件,用于派生`Compressor`接口生成具体类; + + c. 在common/compression/CompressorFactory.cpp文件的各个函数中注册该压缩组件; + +5. 在Flusherxxx.cpp文件中实现插件类 + + a. `Init`函数: + + i. 根据入参初始化插件,针对非法参数,根据非法程度和影响决定是跳过该参数、使用默认值或直接拒绝加载插件。 + + ii. 调用相关函数完成聚合、序列化和压缩组件的初始化 + + b. `SerializeAndPush(BatchedEventsList&&)`函数: + + ```c++ + void Flusherxxx::SerializeAndPush(BatchedEventsList&& groupLists) { + // 1. 使用mSerializer->Serialize函数对入参序列化 + // 2. 如果需要压缩,则使用mCompressor->Compress函数对序列化结果进行压缩 + // 3. 构建发送队列元素,其中, + // a. data为待发送内容 + // b. 如果没用压缩组件,则rawSize=data.size();否则,rawSize为压缩前(序列化后)数据的长度 + // c. mLogstoreKey为发送队列的key + auto item = make_unique(std::move(data), + rawSize, + this, + mQueueKey); + Flusher::PushToQueue(std::move(item)); + } + ``` + + c. `SerializeAndPush(vector&&)`函数: + + ```c++ + void Flusherxxx::SerializeAndPush(vector&& groupLists) { + for (auto& groupList : groupLists) { + SerializeAndPush(std::move(groupList)); + } + } + ``` + + d. `Send`函数: + + ```c++ + void Flusherxxx::Send(PipelineEventGroup&& g) { + vector res; + mBatcher.Add(std::move(g), res); + SerializeAndPush(std::move(res)); + } + ``` + + e. `Flush`函数: + + ```c++ + void Flusherxxx::Flush(size_t key) { + BatchedEventsList res; + mBatcher.FlushQueue(key, res); + SerializeAndPush(std::move(res)); + } + ``` + + f. `FlushAll`函数: + + ```c++ + void Flusherxxx::FlushAll() { + vector res; + mBatcher.FlushAll(res); + SerializeAndPush(std::move(res)); + } + + ``` + + g. `BuildRequest`函数:将待发送数据包装成一个Http请求,如果请求构建失败,使用`keepItem`参数记录是否要保留数据供以后重试。 + + h. `OnSendDone`函数:根据返回的http response进行相应的记录和操作。 + +6. 在`PluginRegistry`类中注册该插件: + +7. 在pipeline/plugin/PluginRegistry.cpp文件的头文件包含区新增如下行: + + ```c++ + #include "plugin/flusher/Flusherxxx.h" + ``` + +8. 在`PluginRegistry`类的`LoadStaticPlugins()`函数中新增如下行: + + ```c++ + RegisterFlusherCreator(new StaticFlusherCreator()); + ``` diff --git a/docs/cn/developer-guide/plugin-development/how-to-write-native-input-plugins.md b/docs/cn/developer-guide/plugin-development/how-to-write-native-input-plugins.md new file mode 100644 index 0000000000..505cb40509 --- /dev/null +++ b/docs/cn/developer-guide/plugin-development/how-to-write-native-input-plugins.md @@ -0,0 +1,98 @@ +# 如何开发原生Input插件 + +## 工作模式 + +同一输入类型的所有插件实例共享同一个线程来获取数据,插件实例只负责保存插件配置。 + +## 接口定义 + +```c++ +class Input : public Plugin { +public: + // 初始化插件,入参为插件参数 + virtual bool Init(const Json::Value& config) = 0; + // 负责向管理类注册配置 + virtual bool Start() = 0; + // 负责向管理类注销配置 + virtual bool Stop(bool isPipelineRemoving) = 0; +}; +``` + +## 开发步骤 + +1. 在plugin/input目录下新建一个Inputxxx.h和Inputxxx.cpp文件,用于派生Input接口生成具体的插件类; + +2. 在Inputxxx.h文件中定义新的输入插件类Inputxxx,满足以下规范: + + a. 所有的可配置参数的权限为public,其余参数的权限均为private。 + +3. 在Inputxxx.cpp文件中实现`Init`函数,即根据入参初始化插件,针对非法参数,根据非法程度和影响决定是跳过该参数、使用默认值或直接拒绝加载插件。 + +4. 在根目录下新增一个目录,用于创建当前输入插件的管理类及其他辅助类,该管理类需要继承InputRunner接口: + +```c++ +class InputRunner { +public: + // 调用点:由插件的Start函数调用 + // 作用:初始化管理类,并至少启动一个线程用于采集数据 + // 注意:该函数必须是可重入的,因此需要在函数开头判断是否已经启动线程,如果是则直接退出 + virtual void Init() = 0; + // 调用点:进程退出时,或配置热加载结束后无注册插件时由框架调用 + // 作用:停止管理类,并进行扫尾工作,如资源回收、checkpoint记录等 + virtual void Stop() = 0; + // 调用点:每次配置热加载结束后由框架调用 + // 作用:判断是否有插件注册,若无,则框架将调用Stop函数对线程资源进行回收 + virtual bool HasRegisteredPlugin() const = 0; +} +``` + +管理类是输入插件线程资源的实际拥有者,其最基本的运行流程如下: + +- 依次访问每个注册的配置,根据配置情况抓取数据; + +- 根据数据类型将源数据转换为PipelineEvent子类中的一种,并将一批数据组装成PipelineEventGroup; + +- 将PipelineEventGroup发送到相应配置的处理队列中: + +```c++ +ProcessorRunner::GetInstance()->PushQueue(queueKey, inputIdx, std::move(group)); +``` + +其中, + +- queueKey是队列的key,可以从相应流水线的PipelineContext类的`GetProcessQueueKey()`方法来获取。 + +- inputIdx是当前数据所属输入插件在该流水线所有输入插件的位置(即配置中第几个,从0开始计数) + +- group是待发送的数据包 + +最后,为了支持插件向管理类注册,管理类还需要提供注册和注销函数供插件使用,从性能的角度考虑,**该注册和注销过程应当是独立的,即某个插件的注册和注销不应当影响整个线程的运转**。 + +5. 在Inputxxx.cpp文件中实现其余接口函数: + + ```c++ + bool Inputxxx::Start() { + // 1. 调用管理类的Start函数 + // 2. 将当前插件注册到管理类中 + } + + bool Inputxxx::Stop(bool isPipelineRemoving) { + // 将当前插件从管理类中注销 + } + ``` + +6. 在`PluginRegistry`类中注册该插件: + + a. 在pipeline/plugin/PluginRegistry.cpp文件的头文件包含区新增如下行: + + ```c++ + #include "plugin/input/Inputxxx.h" + ``` + + b. 在`PluginRegistry`类的`LoadStaticPlugins()`函数中新增如下行: + + ```c++ + RegisterInputCreator(new StaticInputCreator()); + ``` + + c. 在`PipelineManager`类的构造函数中注册该插件的管理类