forked from peanutzhen/peanutcache
-
Notifications
You must be signed in to change notification settings - Fork 0
/
peanutcache.go
133 lines (117 loc) · 3.13 KB
/
peanutcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright 2021 Peanutzhen. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package peanutcache
import (
"fmt"
"geecache/peanutcache/singlefilght"
"log"
"sync"
)
// peanutcache 模块提供比cache模块更高一层抽象的能力
// 换句话说,实现了填充缓存/命名划分缓存的能力
var (
mu sync.RWMutex // 管理读写groups并发控制
groups = make(map[string]*Group)
)
// Retriever 要求对象实现从数据源获取数据的能力
type Retriever interface {
retrieve(string) ([]byte, error)
}
type RetrieverFunc func(key string) ([]byte, error)
// RetrieverFunc 通过实现retrieve方法,使得任意匿名函数func
// 通过被RetrieverFunc(func)类型强制转换后,实现了 Retriever 接口的能力
func (f RetrieverFunc) retrieve(key string) ([]byte, error) {
return f(key)
}
// Group 提供命名管理缓存/填充缓存的能力
type Group struct {
name string
cache *cache
retriever Retriever
server Picker
flight *singlefilght.Flight
}
// NewGroup 创建一个新的缓存空间
func NewGroup(name string, maxBytes int64, retriever Retriever) *Group {
if retriever == nil {
panic("Group retriever must be existed!")
}
g := &Group{
name: name,
cache: newCache(maxBytes),
retriever: retriever,
flight: &singlefilght.Flight{},
}
mu.Lock()
groups[name] = g
mu.Unlock()
return g
}
// RegisterSvr 为 Group 注册 Server
func (g *Group) RegisterSvr(p Picker) {
if g.server != nil {
panic("group had been registered server")
}
g.server = p
}
// GetGroup 获取对应命名空间的缓存
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
}
func DestroyGroup(name string) {
g := GetGroup(name)
if g != nil {
svr := g.server.(*server)
svr.Stop()
delete(groups, name)
log.Printf("Destroy cache [%s %s]", name, svr.addr)
}
}
func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key required")
}
if value, ok := g.cache.get(key); ok {
log.Println("cache hit")
return value, nil
}
// cache missing, get it another way
return g.load(key)
}
func (g *Group) load(key string) (ByteView, error) {
view, err := g.flight.Fly(key, func() (interface{}, error) {
if g.server != nil {
if fetcher, ok := g.server.Pick(key); ok {
log.Printf("pick peer to get *%s*\n", key)
bytes, err := fetcher.Fetch(g.name, key)
if err == nil {
return ByteView{b: cloneBytes(bytes)}, nil
}
log.Printf("fail to get *%s* from peer, %s.\n", key, err.Error())
}
}
return g.getLocally(key)
})
if err == nil {
return view.(ByteView), err
}
return ByteView{}, err
}
// getLocally 本地向Retriever取回数据并填充缓存
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.retriever.retrieve(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: cloneBytes(bytes)}
g.populateCache(key, value)
return value, nil
}
// populateCache 提供填充缓存的能力
func (g *Group) populateCache(key string, value ByteView) {
g.cache.add(key, value)
}