Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

简析redux技术栈(二):认识saga的buffer和chanel #9

Open
flytam opened this issue Aug 18, 2019 · 0 comments
Open

简析redux技术栈(二):认识saga的buffer和chanel #9

flytam opened this issue Aug 18, 2019 · 0 comments
Labels
react New feature or request

Comments

@flytam
Copy link
Owner

flytam commented Aug 18, 2019


我们知道redux-saga 也是通过中间件的形式与 redux 本身连接起来。例如下面使用了redux-saga的react项目需要以下这样的 初始化

function configureStore(initialState) {
  // 运行返回一个redux middleware
  const sagaMiddleware = createSagaMiddleware();
  return {
    ...createStore(
      reducer,
      initialState,
      applyMiddleware(middleware1, middleware2, sagaMiddleware)
    ),
    runSaga: sagaMiddleware.run
  };
}

所以分析 redux-saga 的第一步,就从 redux-saga 的中间件开始。我们平时写代码在 react 中与 saga 进行交互,都是dispatch一个action到与我们的 saga 逻辑进行交互。翻看createSagaMiddleware源码,可以很清晰的看到,这就是使用了中间件后,我们每次dispatch一个 action 后,在 saga 中间件内会往channelput这个action,进而触发我们 saga 里面的逻辑。就实现了 react 组件和 saga 的交互了。那么这个stdChannel是什么呢

// 省略一些多余部分
function sagaMiddlewareFactory({ channel = stdChannel() } = {}) {
  //...
  function sagaMiddleware({ getState, dispatch }) {
    return next => action => {
      const result = next(action);
      // 实现了react和saga的交互
      channel.put(action);
      return result;
    };
  }
  //...
  return sagaMiddleware;
}

在了解 saga 的运行机制之前,先学习 redux-saga 源码内部的两个比较常用的数据结构bufferchanel

buffer

buffer 是一个固定长度类似队列的数据结构,它有四种类型(下面介绍),对外暴露了几个函数,如下

  • put 用来缓存 action
  • take 取出一个 action
  • isEmpty 判断 buffer 是否为空
  • flush 取出缓存的内的所有 action

我们知道如果我们直接使用数组的 push/unshift(pop/shift)函数实现队列的话,当我们出队列的时候时间复杂度是o(n)。而这里的 buffer 实现是比较巧妙的。数据存储是使用定长数组。通过pushIndexpopIndex标识位来记录出入队列的位置,它们的初始值都是 0,出队列的时候直接把popIndex位置空,然后值+1。入队列则是pushIndex+1。这样,无论take还是put,时间复杂度都是o(1)

pushIndex达到了 buffer 的长度的时候,buffer 的处理会根据 buffer 类型不同进行处理

1、ON_OVERFLOW_THROW:超出限制直接报错

2、ON_OVERFLOW_SLIDE:类似于环状队列,达到长度限制后,从索引 0 继续存储。

3、ON_OVERFLOW_EXPAND:达到限制后,长度自动变大 2 倍。

4、ON_OVERFLOW_DROP:达到限制后,后续的都丢弃

chanel

chanel 的实现是类似发布/订阅的设计模式。chanel.take(taker)存入一个 taker 函数,chanel.put(action)时,取出 cb 函数执行,action 是用来消费 taker 的

  • 普通 chanel(单播)

特点:当put一个 action 时,如果没有taker的时候,会将这个 action 存起来,存 action 是用了上面提到的buffer这个数据结构。等到有 taker 的时候可以马上调用 action。

一个简化版的单播 chanel 实现如下

class Chanel {
  constructor() {
    // 存action
    this.buffers = [];
    // 存taker
    this.takers = [];
    this.isClosed = false;
  }
  take(cb) {
    if (this.isClosed) {
      return;
    }
    if (this.buffers.length > 0) {
      cb(this.buffers.shift());
    } else {
      this.takers.push(cb);
    }
  }
  put(action) {
    if (this.takers.length === 0) {
      this.buffers.push(action);
    } else {
      this.takers.shift()(action);
    }
  }
  close() {
    if (this.isClosed) {
      return;
    }
  }
}

eventChanel 是在普通 Chanel 基础上实现,是用来用于订阅外部的事件源。chanel的一些使用参考可以看文档

简化的 eventChanel 实现如下,其实给订阅函数传进一个函数,调用这个函数可以往 Chanel 内 put 东西。

class eventChanel extends Chanel {
  constructor(subscribe) {
    super();
    this.unscribe = subscribe(action => {
      super.put(action);
    });
  }

  close() {
    this.unscribe();
    this.isClosed = true;
  }
}
  • 多播(multiCast) chanel

从上面的中间件源码可以看到,redux-saga 默认情况下的ChanelstdChannelstdChannel就是基于多播 chanel (multiCastChanel)实现,只不过添加了redux-saga本身的调度系统。multiCastChanel和 nodejs 的eventEmiter是非常类似的,multiCastChaneltake类似于eventEmiteronce,multiCastChanelput类似于eventEmiteremit

通俗的理解,saga 内 multiCastChanel 和 Chanel 最大的区别是,multiCastChanel 不能存 action,只能存 taker,能根据 action 的 type 判断是否执行 taker;chanel 可以缓存 action 和 taker,接收到 action 马上触发 taker,不会判断 type,类似于两个人对话的样子(单播)

一个简化版的 multiCastChanel 实现如下

class Chanel {
  constructor() {
    this.isClosed = false;
    this.takers = [];
  }
  put(action) {
    if (this.isClosed) {
      return;
    }
    const takers = this.takers;
    for (let i = 0, len = takers.length; i < len; i++) {
      if (!takers[i].MATCH || action.type === takers[i].MATCH) {
        takers[i](action);
        takers.splice(takers.indexOf(takers[i]), 1);
      }
    }
  }
  take(cb, match) {
    cb["MATCH"] = match;
    this.takers.push(cb);
  }
  close() {
    this.isClosed = true;
  }
}

源码中的 stdChanel 实现

export function stdChannel() {
  const chan = multicastChannel();
  const { put } = chan;
  chan.put = input => {
    // saga的action,不进入调度状态
    if (input[SAGA_ACTION]) {
      put(input);
      return;
    }
    asap(() => {
      put(input);
    });
  };
  return chan;
}

上面代码中的multicastChannel和我们的简化版 chanel 原理是一样的。我们可以看到,stdChanel是对multicastChannelput方法进行了重写。只是对于非 saga 内置action使用asap(() => { put(input); });进行调用,这个asap方法其实是 saga 内部调度系统的一个执行函数,它的作用是如果当前 saga 是空闲状态,则执行我们的回调;如果是挂起状态则将回调存进任务队列中。后面会专门介绍 saga 的调度系统。

@flytam flytam added the Go label Nov 16, 2019
@flytam flytam added react New feature or request and removed Go labels Jun 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
react New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant