Skip to content

Commit

Permalink
add development doc for native plugin (#1907)
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 authored Nov 22, 2024
1 parent 427b11b commit 7c80c3b
Showing 4 changed files with 314 additions and 5 deletions.
2 changes: 1 addition & 1 deletion core/plugin/flusher/sls/SLSResponse.cpp
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ bool SLSResponse::Parse(const HttpResponse& response) {
mStatusCode = response.GetStatusCode();

if (mStatusCode == 0) {
mErrorCode = sdk::LOG_REQUEST_TIMEOUT;
mErrorCode = sdk::LOGE_REQUEST_TIMEOUT;
mErrorMsg = "Request timeout";
} else if (mStatusCode != 200) {
try {
10 changes: 6 additions & 4 deletions docs/cn/SUMMARY.md
Original file line number Diff line number Diff line change
@@ -150,10 +150,12 @@
* [Checkpoint接口](developer-guide/plugin-development/checkpoint-api.md)
* [Logger接口](developer-guide/plugin-development/logger-api.md)
* [自监控指标接口](developer-guide/plugin-development/plugin-self-monitor-guide.md)
* [如何开发Input插件](developer-guide/plugin-development/how-to-write-input-plugins.md)
* [如何开发Processor插件](developer-guide/plugin-development/how-to-write-processor-plugins.md)
* [如何开发Aggregator插件](developer-guide/plugin-development/how-to-write-aggregator-plugins.md)
* [如何开发Flusher插件](developer-guide/plugin-development/how-to-write-flusher-plugins.md)
* [如何开发原生Input插件](developer-guide/plugin-development/how-to-write-native-input-plugins.md)
* [如何开发原生Flusher插件](developer-guide/plugin-development/how-to-write-native-flusher-plugins.md)
* [如何开发扩展Input插件](developer-guide/plugin-development/how-to-write-input-plugins.md)
* [如何开发扩展Processor插件](developer-guide/plugin-development/how-to-write-processor-plugins.md)
* [如何开发扩展Aggregator插件](developer-guide/plugin-development/how-to-write-aggregator-plugins.md)
* [如何开发扩展Flusher插件](developer-guide/plugin-development/how-to-write-flusher-plugins.md)
* [如何生成插件文档](developer-guide/plugin-development/how-to-genernate-plugin-docs.md)
* [插件文档规范](docs/cn/developer-guide/plugin-development/plugin-doc-templete.md)
* [纯插件模式启动](developer-guide/plugin-development/pure-plugin-start.md)
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# 如何开发原生Flusher插件

## 接口定义

```c++
class Flusher : public Plugin {
public:
// 用于初始化插件参数,同时根据参数初始化Flusher级的组件
virtual bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) = 0;
virtual bool Start();
virtual bool Stop(bool isPipelineRemoving);
// 用于将处理插件的输出经过聚合、序列化和压缩处理后,放入发送队列
virtual void Send(PipelineEventGroup&& g) = 0;
// 用于将聚合组件内指定聚合队列内的数据进行强制发送
virtual void Flush(size_t key) = 0;
// 用于将聚合组件内的所有数据进行强制发送
virtual void FlushAll() = 0;
};
```
对于使用Http协议发送数据的Flusher,进一步定义了HttpFlusher,接口如下:
```c++
class HttpFlusher : public Flusher {
public:
// 用于将待发送数据打包成http请求
virtual bool BuildRequest(SenderQueueItem* item, std::unique_ptr<HttpSinkRequest>& req, bool* keepItem) const = 0;
// 用于发送完成后进行记录和处理
virtual void OnSendDone(const HttpResponse& response, SenderQueueItem* item) = 0;
};
```

## Flusher级组件

### 聚合(必选)

* 作用:将多个小的PipelineEventGroup根据tag异同合并成一个大的group,提升发送效率

* 参数:

可以在flusher的参数中配置Batch字段,该字段的类型为map,其中允许包含的字段如下:

| **名称** | **类型** | **默认值** | **说明** |
| --- | --- | --- | --- |
| MinCnt | uint | 每个Flusher自定义 | 每个聚合队列最少包含的event数量 |
| MinSizeBytes | uint | 每个Flusher自定义 | 每个聚合队列最小的尺寸 |
| TimeoutSecs | uint | 每个Flusher自定义 | 每个聚合队列在第一个event加入后,在被输出前最多等待的时间 |

* 类接口:

```c++
template <typename T = EventBatchStatus>
class Batcher {
public:
bool Init(const Json::Value& config,
Flusher* flusher,
const DefaultFlushStrategyOptions& strategy,
bool enableGroupBatch = false);
void Add(PipelineEventGroup&& g, std::vector<BatchedEventsList>& res);
void FlushQueue(size_t key, BatchedEventsList& res);
void FlushAll(std::vector<BatchedEventsList>& res);
}
```
### 序列化(必选)
* 作用:对聚合模块的输出进行序列化,分为2个层级:
* event级:对每一个event单独进行序列化
* event group级:对多个event进行批量的序列化
* 类接口:
```c++
// T: PipelineEventPtr, BatchedEvents, BatchedEventsList
template <typename T>
class Serializer {
private:
virtual bool Serialize(T&& p, std::string& res, std::string& errorMsg) = 0;
};
using EventSerializer = Serializer<PipelineEventPtr>;
using EventGroupSerializer = Serializer<BatchedEvents>;
```

### 压缩(可选)

* 作用:对序列化后的结果进行压缩

* 类接口:

```c++
class Compressor {
private:
virtual bool Compress(const std::string& input, std::string& output, std::string& errorMsg) = 0;
};
```
## 开发步骤
下面以开发一个HttpFlusher为例,说明整个开发步骤:
1. 在plugin/flusher目录下新建一个Flusherxxx.h和Flusherxxx.cpp文件,用于派生HttpFlusher接口生成具体的插件类;
2. 在Flusherxxx.h文件中定义新的输出插件类Flusherxxx,满足以下要求:
a. 所有的可配置参数的权限为public,其余参数的权限均为private
b. 新增一个聚合组件:`Batcher<> mBatcher;`
c. 新增一个序列化组件:`std::unique_ptr<T> mSerializer;`,其中T为`EventSerializer`和`EventGroupSerializer`中的一种
d. 如果需要压缩,则新增一个压缩组件:`std::unique_ptr<Compressor> mCompressor;`
3. 在pipeline/serializer目录下新建一个xxxSerializer.h和xxxSerializer.cpp文件,用于派生`Serializer` 接口生成具体类;
4. (可选)如果需要压缩组件,且现有压缩组件库中没有所需算法,则新增一个压缩组件:
a. 在common/compression/CompressType.h文件中,扩展CompressType类用以标识新的压缩算法;
b. 在common/compression目录下新建一个xxxCompressor.h和xxxCompressor.cpp文件,用于派生`Compressor`接口生成具体类;
c. 在common/compression/CompressorFactory.cpp文件的各个函数中注册该压缩组件;
5. 在Flusherxxx.cpp文件中实现插件类
a. `Init`函数:
i. 根据入参初始化插件,针对非法参数,根据非法程度和影响决定是跳过该参数、使用默认值或直接拒绝加载插件。
ii. 调用相关函数完成聚合、序列化和压缩组件的初始化
b. `SerializeAndPush(BatchedEventsList&&)`函数:
```c++
void Flusherxxx::SerializeAndPush(BatchedEventsList&& groupLists) {
// 1. 使用mSerializer->Serialize函数对入参序列化
// 2. 如果需要压缩,则使用mCompressor->Compress函数对序列化结果进行压缩
// 3. 构建发送队列元素,其中,
// a. data为待发送内容
// b. 如果没用压缩组件,则rawSize=data.size();否则,rawSize为压缩前(序列化后)数据的长度
// c. mLogstoreKey为发送队列的key
auto item = make_unique<SenderQueueItem>(std::move(data),
rawSize,
this,
mQueueKey);
Flusher::PushToQueue(std::move(item));
}
```
c. `SerializeAndPush(vector<BatchedEventsList>&&)`函数:
```c++
void Flusherxxx::SerializeAndPush(vector<BatchedEventsList>&& groupLists) {
for (auto& groupList : groupLists) {
SerializeAndPush(std::move(groupList));
}
}
```
d. `Send`函数:
```c++
void Flusherxxx::Send(PipelineEventGroup&& g) {
vector<BatchedEventsList> res;
mBatcher.Add(std::move(g), res);
SerializeAndPush(std::move(res));
}
```
e. `Flush`函数:
```c++
void Flusherxxx::Flush(size_t key) {
BatchedEventsList res;
mBatcher.FlushQueue(key, res);
SerializeAndPush(std::move(res));
}
```
f. `FlushAll`函数:
```c++
void Flusherxxx::FlushAll() {
vector<BatchedEventsList> res;
mBatcher.FlushAll(res);
SerializeAndPush(std::move(res));
}
```
g. `BuildRequest`函数:将待发送数据包装成一个Http请求,如果请求构建失败,使用`keepItem`参数记录是否要保留数据供以后重试。
h. `OnSendDone`函数:根据返回的http response进行相应的记录和操作。
6. 在`PluginRegistry`类中注册该插件:
7. 在pipeline/plugin/PluginRegistry.cpp文件的头文件包含区新增如下行:
```c++
#include "plugin/flusher/Flusherxxx.h"
```
8. 在`PluginRegistry`类的`LoadStaticPlugins()`函数中新增如下行:
```c++
RegisterFlusherCreator(new StaticFlusherCreator<Flusherxxx>());
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# 如何开发原生Input插件

## 工作模式

同一输入类型的所有插件实例共享同一个线程来获取数据,插件实例只负责保存插件配置。

## 接口定义

```c++
class Input : public Plugin {
public:
// 初始化插件,入参为插件参数
virtual bool Init(const Json::Value& config) = 0;
// 负责向管理类注册配置
virtual bool Start() = 0;
// 负责向管理类注销配置
virtual bool Stop(bool isPipelineRemoving) = 0;
};
```
## 开发步骤
1. 在plugin/input目录下新建一个Inputxxx.h和Inputxxx.cpp文件,用于派生Input接口生成具体的插件类;
2. 在Inputxxx.h文件中定义新的输入插件类Inputxxx,满足以下规范:
a. 所有的可配置参数的权限为public,其余参数的权限均为private。
3. 在Inputxxx.cpp文件中实现`Init`函数,即根据入参初始化插件,针对非法参数,根据非法程度和影响决定是跳过该参数、使用默认值或直接拒绝加载插件。
4. 在根目录下新增一个目录,用于创建当前输入插件的管理类及其他辅助类,该管理类需要继承InputRunner接口:
```c++
class InputRunner {
public:
// 调用点:由插件的Start函数调用
// 作用:初始化管理类,并至少启动一个线程用于采集数据
// 注意:该函数必须是可重入的,因此需要在函数开头判断是否已经启动线程,如果是则直接退出
virtual void Init() = 0;
// 调用点:进程退出时,或配置热加载结束后无注册插件时由框架调用
// 作用:停止管理类,并进行扫尾工作,如资源回收、checkpoint记录等
virtual void Stop() = 0;
// 调用点:每次配置热加载结束后由框架调用
// 作用:判断是否有插件注册,若无,则框架将调用Stop函数对线程资源进行回收
virtual bool HasRegisteredPlugin() const = 0;
}
```

管理类是输入插件线程资源的实际拥有者,其最基本的运行流程如下:

- 依次访问每个注册的配置,根据配置情况抓取数据;

- 根据数据类型将源数据转换为PipelineEvent子类中的一种,并将一批数据组装成PipelineEventGroup;

- 将PipelineEventGroup发送到相应配置的处理队列中:

```c++
ProcessorRunner::GetInstance()->PushQueue(queueKey, inputIdx, std::move(group));
```
其中,
- queueKey是队列的key,可以从相应流水线的PipelineContext类的`GetProcessQueueKey()`方法来获取。
- inputIdx是当前数据所属输入插件在该流水线所有输入插件的位置(即配置中第几个,从0开始计数)
- group是待发送的数据包
最后,为了支持插件向管理类注册,管理类还需要提供注册和注销函数供插件使用,从性能的角度考虑,**该注册和注销过程应当是独立的,即某个插件的注册和注销不应当影响整个线程的运转**。
5. 在Inputxxx.cpp文件中实现其余接口函数:
```c++
bool Inputxxx::Start() {
// 1. 调用管理类的Start函数
// 2. 将当前插件注册到管理类中
}
bool Inputxxx::Stop(bool isPipelineRemoving) {
// 将当前插件从管理类中注销
}
```
6. 在`PluginRegistry`类中注册该插件:
a. 在pipeline/plugin/PluginRegistry.cpp文件的头文件包含区新增如下行:
```c++
#include "plugin/input/Inputxxx.h"
```
b. 在`PluginRegistry`类的`LoadStaticPlugins()`函数中新增如下行:
```c++
RegisterInputCreator(new StaticInputCreator<Inputxxx>());
```
c. 在`PipelineManager`类的构造函数中注册该插件的管理类

0 comments on commit 7c80c3b

Please sign in to comment.