Skip to content
maggie edited this page Sep 15, 2020 · 1 revision

EventSubscribe模块重构

  • 模块介绍
  • 重构任务
  • 重构设计
  • 接口说明

1. 模块介绍

主要功能:

  • 订阅合约事件。
  • 接收事件推送。
  • 断连时,更改订阅状态。
  • 重连时,重新注册订阅

主要处理的消息类型:

CLIENT_REGISTER_EVENT_LOG(0x15); // type for event log filter register request and response
EVENT_LOG_PUSH(0x1002); // type for event log push

2. 重构任务

  • 适配新的Network模块。
  • 新增unsubscribeEvent功能
  • 优雅关闭,代码整理,垃圾清理

3. 重构设计

重构点1. 适配新的网络模块

使用方法:

// 初始化网络模块
Channel ch = Channel.build("./config.yaml");
//初始化事件订阅模块
EventSubscribe ES = EventSubscribe.build(ch,"Group1");
//订阅事件
ES.subscribeEvent();

重构点2. 新增取消订阅功能

一个订阅(EventFileter)的状态图:

新增流程:

  1. EventFilter在“待请求订阅”状态时:接收到取消订阅的指令时,删除该EventFilter
  2. EventFilter在“待响应”/“推送中“状态时:接收到取消订阅的指令时,向节点发送“取消订阅”的消息
  3. EventFilter在“待响应”/“推送中“状态时:接收到节点发来的”取消订阅成功“消息,删除该EventFilter

新增消息种类:EVENT_UNSUBSCRIBE

SDK和节点需要新支持一种消息“取消订阅”

// EventSubscribe模块处理的消息
CLIENT_REGISTER_EVENT_LOG(0x15), // 订阅事件
EVENT_LOG_PUSH(0x1002); // 事件推送
EVENT_UNSUBSCRIBE(0x1003) // 取消订阅


// AMOP模块处理的消息 略
// Client模块处理的消息类型 略
// Network模块或Channel模块处理的消息类型 略

请求格式:

  • SDK向节点发送取消订阅的请求
{
  "groupID": "1",
  "filterID":"bb31e4ec086c48e18f21cb994e2e5967"
}
  • 节点回复

    {
      "filterID": "bb31e4ec086c48e18f21cb994e2e5967",
      "result": 2
    }

    新增状态“2”->"成功取消订阅"

public enum EventFilterPushStatus {
  SUCCESS(0),
  PUSH_COMPLETED(1),
  INVALID_PARAMS(-41000),
  INVALID_REQUEST(-41001),
  GROUP_NOT_EXIST(-41002),
  INVALID_RANGE(-41003),
  INVALID_RESPONSE(-41004),
  REQUEST_TIMEOUT(-41005),
  OTHER_ERROR(-41006),
  SUCCESS_UNSUBSCRIBE(2), // 新增“成功取消订阅”
}

SDK取消订阅实现代码(概要)

interface EventSubscribe{
  ...
	unsubscribeEvent(EventUserParams params);
}

class EventSubscribeImp implements EventSubscribe, MsgHandler{
  Channel ch;
  EventSubManager manager;
  ...
  public void unsubscribeEvent(EventUserParams params){
    EventFilter filter = manager.getFilterByEventUserParams(params);
    if(filter.getStatus() == ){ //待请求订阅状态时,直接删除该EventFilter即可
      manager.removeEventSub(filter.getFilterId())
      return;
    }else{ //其它情况,发送“取消订阅”消息
      // 创建“取消订阅”消息
      Message msg = new Message();
      msg.setSeq(newSeq());
      msg.setType((short) MessageType.EVENT_UNSUBSCRIBE.getType());
      msg.setData(filter.generateSubScribeParams().toJsonString());
      msg.setTopic("");
      
      //发送消息
      ChannelHandlerContext ctx = filter.getCtx();
    	ctx.writeAndFlush(msg)
    }
  }
  public void onMessage(ChannelHandlerContext ctx, Message msg){
    ...
  }
}

重构点3. 优雅关闭、代码优化

关闭模块时取消所有订阅

class EventSubscribeImp implements EventSubscribe, MsgHandler{
 EventSubManager manager;
 ...
 void stop(){
   // 取消正在生效的Event订阅
   List<EventFilter> filters = manager.getWorkingEventFilter();
   for (filter:filters){
     unsubscribeEvent(filter);
   }
   // 删除所有Event订阅
   manager.clearAllEventFilter();
 }
}

代码优化

如:以下代码,避免直接New一个Thread,改为使用线程池。

public void start() {

        if (running) {
            return;
        }
        running = true;

        new Thread(
          new Runnable() {
            @Override
            public void run() {
              ...
            }
          }
          })
          .start();
    }
}

4. 接口说明

主要接口:

interface EventSubscribe {

  /* 新建一个EventSubScribe对象
  *  输入:Channel ,初始化好的网络。
  *  输入:groupId ,事件所属的群组。
  *  返回:实现了EventSubScribe接口的实例
  */
  static EventSubscribe build(Channel ch, String groupId);
  
  /* 新增订阅
  *  输入:EventUserParams, 事件订阅的参数,包括Topic,Address等
  *  输入:EventCallback ,事件推送的回调函数。
  */
  void subscribeEvent(EventUserParams params, EventCallback callback);
  
  /* 取消订阅
  *  输入:EventUserParams, 取消事件订阅的参数,包括FilterID,GroupID等
  */
  void unsubscribeEvent(EventUserParams);
  
  /*启动模块
  * 重连时,可调用此函数,重新注册所有订阅
  */
  start();
  
  /*关闭模块
  * 删除所有订阅
  */
  stop();
}
Clone this wiki locally