Skip to content

Latest commit

 

History

History
325 lines (254 loc) · 11 KB

README_ZH.md

File metadata and controls

325 lines (254 loc) · 11 KB

Cacheme

多源结构化异步缓存框架。

  • 通过Node结构化管理缓存,可以给不同的Node分配不同的缓存存储方式及缓存策略
  • 多种序列化方式, 支持 pickle/json/msgpack及压缩存储
  • API全部添加Type hint
  • 基于TinyLFU的高效缓存管理策略,使用Rust编写
  • 通过asyncio Event避免Thundering herd问题,提高命中率及减轻高并发下数据源/缓存源压力
  • 基于Node的缓存统计API

相关项目:

相关介绍文章:

目录

基本要求

Python 3.7+

安装

pip install cacheme

不同存储源通过对应driver支持,可以根据情况选择安装

pip install cacheme[redis]
pip install cacheme[aiomysql]
pip install cacheme[motor]
pip install cacheme[asyncpg]

定义Node

Node是Cacheme的核心部分。Node定义包含了缓存的key定义,缓存源数据读取以及存储相关的各种配置。通过例子比较直接:

import cacheme
from dataclasses import dataclass
from cacheme.serializer import MsgPackSerializer

@dataclass
class UserInfoNode(cacheme.Node):
    user_id: int

    def key(self) -> str:
        return f"user:{self.user_id}:info"

    async def load(self) -> Dict:
        user = get_user_from_db(self.user_id)
        return serialize(user)

    class Meta(cacheme.Node.Meta):
        version = "v1"
        caches = [cacheme.Cache(storage="my-redis", ttl=None)]
        serializer = MsgPackSerializer()

以上这个例子定义了UserInfoNode,用于缓存UserInfo数据。缓存的key通过key函数生成。通过dataclass装饰器自动生成init方法。这样在调用Cacheme API时只使用node,避免手工输入key string。load函数定义了当缓存miss时如何从数据源获取数据。而Meta class则定义了cache的version(会自动加入key中),cache的存储方式,这里用了名叫my-redis的存储源以及存储/读取时用的serializer。

同时Cacheme也支持动态创建Node,可以和装饰器一起使用来快速缓存现有函数。当然推荐的方法还是单独定义Node class。

@Memoize(cacheme.build_node("TestNodeDynamic", "v1", [Cache(storage="local", ttl=None)]))
async def fn(a: int) -> int:
    return 1


@fn.to_node
def _(a: int) -> cacheme.DynamicNode:
    return DynamicNode(key=f"bar:{a}")

这里使用的时DynamicNode, 因为没有预先定义的Node class。只支持单一的key参数。

注册Storage

Cacheme的Node和Storage是分开的,Node表示业务信息,比如用户信息node。而storage则是cache的存储方式。一个Node可以支持串联多种存储方式,同样一个存储方式也可以用在多种node上。

import cacheme

await cacheme.register_storage("my-redis", cacheme.Storage(url="redis://localhost:6379"))

Cacheme API

get: 通过node获取数据

user = await cacheme.get(UserInfoNode(user_id=1))

get_all: 通过node获取多条数据,传入nodes必须是同一类型

users = await cacheme.get_all([UserInfoNode(user_id=1), UserInfoNode(user_id=2)])

invalidate: 删除某个node的缓存

await cacheme.invalidate(UserInfoNode(user_id=1))

refresh: 重新从数据源读取某个node的缓存

await cacheme.refresh(UserInfoNode(user_id=1))

Memoize: memoize装饰器,可用于memoize已有函数

Decorate your function with cacheme.Memoize decorator and cache node. Cacheme will load data using the decorated function and ignore load method. Because your function may contain variable number of args/kwargs, we need one more step to map between args/kwargs to node. The decorated map function should have same input signature as memoized function, and return a cache node.

@cacheme.Memoize(UserInfoNode)
async def get_user_info(user_id: int) -> Dict:
    return {}

# function name is not important, so just use _ here
@get_user_info.to_node
def _(user_id: int) -> UserInfoNode:
    return UserInfoNode(user_id=user_id)

nodes: 列出所有nodes

nodes = cacheme.nodes()

stats: 获取节点统计数据

metrics = cacheme.stats(UserInfoNode)

metrics.request_count() # total request count
metrics.hit_count() # total hit count
metrics.hit_rate() # hit_count/request_count
metrics.miss_count() # (request_count - hit_count)/request_count
metrics.miss_rate() # miss_count/request_count
metric.load_success_count() # total load success count
metrics.load_failure_count() # total load fail count
metrics.load_failure_rate() # load_failure_count/load_count
metrics.load_count() # total load count
metrics.total_load_time() # total load time in nanoseconds
metrics.average_load_time() # total_load_time/load_count

set_prefix: 设置全局key前缀

cacheme.set_prefix("mycache")

Cache Node

Key

实际存储到storage层的key形式为{prefix}:{key()}:{Meta.version}

Meta Class

  • version[str]: node版本信息.
  • caches[List[Cache]]: Node缓存的存储源. 多个存储源会按从左到右的顺序依次调用,在写入缓存时也会依次写入。定义Cache需要2个参数:storage[str]ttl[Optional[timedelta]]. storage是调用 register_storage时传入的name, 而ttl就是这个node对应缓存的ttl.
  • serializer[Optional[Serializer]]: Serializer用于dump/load data. 如果是local cache,由于直接使用dict存储会忽略serializer. See Serializers.
  • doorkeeper[Optional[DoorKeeper]]: See DoorKeeper.

以下例子展示了使用local + redis两级缓存的情况

import cacheme
from dataclasses import dataclass
from datetime import timedelta
from cacheme.serializer import MsgPackSerializer

@dataclass
class UserInfoNode(cacheme.Node):
    user_id: int

    def key(self) -> str:
        return f"user:{self.user_id}:info"

    async def load(self) -> Dict:
        user = get_user_from_db(self.user_id)
        return serialize(user)

    class Meta(cacheme.Node.Meta):
        version = "v1"
        caches = [
            cacheme.Cache(storage="local", ttl=timedelta(seconds=30)),
            cacheme.Cache(storage="my-redis", ttl=timedelta(days=10))
        ]
        serializer = MsgPackSerializer()

Serializers

Cacheme 提供以下内置serializer.

  • PickleSerializer: 使用Python pickle,支持各种类型.
  • JSONSerializer: 使用pydantic_encoderjson, 支持python基本类型/dataclass/pydantic model. See pydantic types.
  • MsgPackSerializer: 使用pydantic_encodermsgpack, 支持python基本类型/dataclass/pydantic model. See pydantic types.

以上3种serializer同时有对应的压缩版本, 在存入存储源前会使用zlib level-3进行压缩

  • CompressedPickleSerializer
  • CompressedJSONSerializer
  • CompressedMsgPackSerializer

DoorKeeper

概念来源于TinyLfu 论文.

The Doorkeeper is a regular Bloom filter placed in front of the cahce. Upon item arrival, we first check if the item is contained in the Doorkeeper. If it is not contained in the Doorkeeper (as is expected with first timers and tail items), the item is inserted to the Doorkeeper and otherwise, it is inserted to the cache.

缓存请求第一次到达服务端时先不缓存数据,只是更新Bloom filter, 等请求第二次到达时才把数据存入缓存。这么做好处是很多只请求1次的数据会被筛掉不进入缓存,节约空间。坏处是所有请求都会至少从数据源load两次。BloomFilter在请求到达size后会自动重制。

from cacheme import BloomFilter

@dataclass
class UserInfoNode(cacheme.Node):

    class Meta(cacheme.Node.Meta):
        # size 100000, false positive probability 0.01
        doorkeeper = BloomFilter(100000, 0.01)

Cache Storage

Local Storage

Local Storage使用Python dict存储数据,支持lru和tlfu两种policy。当缓存到达设定size时会自动通过policy进行驱逐。

# lru policy
Storage(url="local://lru", size=10000)

# tinylfu policy
Storage(url="local://tlfu", size=10000)

Parameters:

  • url: local://{policy}. 2 policies are currently supported:

  • size: size of the storage. Policy will be used to evict key when cache is full.

Redis Storage

Storage(url="redis://localhost:6379")

# cluster
Storage(url="redis://localhost:6379", cluster=True)

Parameters:

  • url: redis connection url.
  • cluster: bool, cluster or not, default False.
  • pool_size: connection pool size, default 100.

MongoDB Storage

使用该storage前需要先创建index. See mongo.js

Storage(url="mongodb://test:password@localhost:27017",database="test",collection="cache")

Parameters:

  • url: mongodb connection url.
  • database: mongodb database name.
  • collection: mongodb collection name.
  • pool_size: connection pool size, default 50.

Sqlite Storage

使用该storage前需要先创建table及index. See sqlite.sql

Storage(url="sqlite:///test", table="cache")

Parameters:

  • url: sqlite connection url.
  • table: cache table name.
  • pool_size: connection pool size, default 50.

PostgreSQL Storage

使用该storage前需要先创建table及index. See postgresql.sql

Storage(url="postgresql://username:[email protected]:5432/test", table="cache")

Parameters:

  • url: postgres connection url.
  • table: cache table name.
  • pool_size: connection pool size, default 50.

MySQL Storage

使用该storage前需要先创建table及index. See mysql.sql

Storage("mysql://username:password@localhost:3306/test", table="cache")

Parameters:

  • url: mysql connection url.
  • table: cache table name.
  • pool_size: connection pool size, default 50.

Benchmarks

  • Local Storage Hit Ratios(hit_count/request_count) hit ratios source code

  • Throughput Benchmark of different storages

    See benchmark