0%

Ristretto 源码阅读笔记

Ristretto 是一个基于 Go 语言的、可并发访问的、可设置内存存储上限的高性能进程内缓存库。Ristretto 的含义为“超浓缩咖啡”,该命名是因为该项目的目标是与知名 JAVA 缓存库 Caffeine 竞争。

相比其他 Go 语言的进程内缓存项目,Ristretto 的设计与实现上具有以下几个显著特点:

  • 基于近似 LFU 算法实现缓存逐出;
  • 支持多种 key 与 value 类型,能够在进程内缓存较为复杂的结构体;
  • 写入门限(DoorKeeper)设计,过滤写入键值对,提高缓存命中率;
  • Key Cost 设计,能够更好地描述一个键值对对缓存的负载量;
  • 利用sync.Pool实现类似 thread_local 缓存,减少并发竞争。

Ristretto 软件框架

Ristretto 的实现并不复杂,从功能上可以将 Ristretto 划分为几个模块:存储模块、操作缓冲模块、缓存控制模块、缓存计数模块,几个模块分别由不同的数据结构来负责。

1
2
3
4
5
6
7
8
9
10
11
12
13
type Cache struct {
// 存储模块
store store
// 缓存控制模块
policy policy
// 读写操作缓冲
getBuf *ringBuffer
setBuf chan *Item
// 缓存计数
Metrics *Metrics

...
}

操作缓冲模块可以分为读操作缓冲与写操作缓冲,两者会采取不同的策略来减少对缓存控制模块的访问。缓存控制模块具有 tinyLFU 和 sampledLFU,分别负责缓存的准入和逐出。缓存计数模块则是用于读取 Ristretto 的缓存命中率等日志信息。

store

存储模块使用了 Go 语言中非常经典的分片哈希表设计,目前的实践已经证明这是在 Go 中综合并发性能最好的并发哈希表设计,其他的一些同类型项目也都采用了类似的设计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 缓存 metadata
type storeItem struct {
key uint64 // 键
conflict uint64 // 用于解决哈希冲突
value interface{} // 值
expiration time.Time // 过期时间,0 代表无
}
// 缓存大表
type shardedMap struct {
shards []*lockedMap // 分片表
expiryMap *expirationMap // 过期时间表
}
// 缓存分片表
type lockedMap struct {
sync.RWMutex
data map[uint64]storeItem
em *expirationMap
}

在 Ristretto 中,哈希表的分片和存储都使用了同一个哈希值,为了解决哈希冲突的问题,每一个键值对都带有使用另一个哈希算法计算出的 conflict 值,避免冲突的策略是不允许覆盖。当键值对时为数字类型时,哈希值会使用转化后的 uint64 类型;当键值对为string[]byte类型时会使用 Go runtime 中的runtime.memhash函数来计算哈希值。由于使用了汇编代码来实现哈希值计算,该哈希算法相比其他常见类型的哈希算法计算速度更快。

getBuf

getBuf用于控制“用户读取缓存”这一行为传递到 tinyLFU 的速度和方式,具象化来说就是记录下每一次读取操作需要访问的键值对,在 Ristretto 中的实现中使用了键的哈希值。相比于 LRU 和近似 LRU 算法,实现一个 LFU 或近似 LFU 算法需要记录更多的数据。而进程内缓存这一应用场景,通常都要求较高的并发读写性能,会有较多的线程访问。一个进程内共享的缓存意味着记录 LFU 相关信息的数据结构同样会被高并发地访问,因此有必要降低 LFU 数据结构的竞争来提高缓存的整体性能。

降低并发度,常用的方法就是队列和分片这两种模式,Ristretto 中选择的是队列方式。其核心思想是将 stream 转化为 batch stream,具体做法是在每一个协程栈上使用一个thread local 数据结构来缓存该协程读取的键值对,当协程本地的数据聚合到一定规模时再批量写入缓存控制模块中。由于 Go 中并不存在thread_local关键字,Ristretto 利用了sync.Pool的特性来实现了一个近似的 thread local。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (b *ringBuffer) Push(item uint64) {
// 从 Pool 中取出一个对象,将数据放入后归还
stripe := b.pool.Get().(*ringStripe)
stripe.Push(item)
// 对象归还时,内部信息并不会清楚
b.pool.Put(stripe)
}

func (s *ringStripe) Push(item uint64) {
// 将读取的键值对放入切片中
s.data = append(s.data, item)
// 切片满了之后,通过 channel 无阻塞地发送
// 若 channel 已满,丢弃消息
if len(s.data) >= s.capa {
if s.cons.Push(s.data) {
s.data = make([]uint64, 0, s.capa)
} else {
s.data = s.data[:0]
}
}
}

当一个协程需要读取键值对时,会首先从sync.Pool中取出一个ringStripe对象,这里获取对象会优先考虑从 P 的本地栈来获取。ringStripe对象内具有一个切片,用来暂时存储该键值对,只有当切片存满之后,才会非阻塞地通过 channel 发送给缓存控制模块。在官方博客中提到,该 channel 的长度设置得比较小,并且当 channel 满时会选择丢弃ringStripe中的数据(使用 select实现)。由于缓存控制模块中收到的是 batch stream,因此在获取锁的过程中能够连续处理更多信息,降低了需要获取锁的次数,因此能够大大降低此处的并发冲突。另一点则是利用了sync.Pool的 thread local 特性,避免在聚合的过程中需要访问一个进程内全局可见的数据结构,这也是常用的优化手段。最后,经过聚合后的读取信息将会被defaultPolicy接收,用于调控缓存的准入门限。

sync.Pool发生 GC、channel 满时,都会导致读取缓存的信息丢失,但是这对 LFU 算法的影响是比较小的。因为在理想情况下,用户在某一时刻对缓存的读取是均匀的,丢弃少部分读取信息相当于是一个抽样。这种处理有助于限制分析用户读取信息时的 CPU 利用率,虽然牺牲了一部分 LFU 的准确性,但是能使得软件整体性能提升。

这里的设计综合了 thread local 降低并发以及批处理降低并发的思想,之所以能够采取这种设计是因为缓存控制模块是允许有延迟的,只需要保持最终一致性即可。

setbuf

setbuf用于调节用户的写入流,不同于getbufsetbuf的设计出发点是提升缓存的写入性能,其核心思想通过保证最终一致性来减少写入链的长度,与数据库中的 WAL 思想有些类似。

Ristretto 的设计中对 UPDATE 操作和 WRITE、DELETE 操作进行了分流,UPDATE 的写入链比其余两个更短。可以使用以下伪代码来表示三个操作的写入链。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func Operation(key,value any) {

if isUpdateOrWrite {
CreateObjectToInsert()
}

if isUpdate {
return Update()
}
select {
case setBuf <- op:
return true
default:
return false
}
}

Update 操作由于不涉及到键的变更,因此与缓存控制模块中的写入门限无关,可以直接在缓存表中更新。而 Write 操作因为涉及到了键的变更,需要额外检查是否满足缓存的写入门限,而检查是否满足写入门限又涉及到了并发访问的问题。考虑到写入操作还涉及到键值对的过期,具有较高的时限性,这里再使用 batch 优化就不太合适了。在比较典型的进程缓存场景下,其实写入的并发量是不高的,因此这里只简单地使用了 channel,channel 内部的锁较为特殊比sync.Mutex速度更快。

Delete 操作的设计与 Write 操作的设计是密切相关的。Write 操作由于 channel 缓冲的存在被串行执行,这是一个慢速通道,如果某一时刻中积压的 Write 操作过多,可能会导致用户同时发起的 Delete 操作先于 Write 操作完成。即并发中的快慢路径问题。删除操作对于进程内缓存是及其重要的,如果操作发生乱序,可能会导致用户读取到过期数据,这是不能容忍的——缓存中允许少数据但不能容许多数据,否则缓存就失去了存在的意义。因此在 Ristretto 的设计中, Delete 操作同样也使用 channel 来执行。

使用 channel 还有另外一点考虑。如果进程中某一时刻突然有较多的写入,缓存的压力将会骤增,Ristretto 中使用了非阻塞的 channel 来丢弃过多的写入操作,这将会有助于缓解缓存压力。要知道,并不是每一次缓存写入都是极其重要的,但是我们可以确认如果一次缓存及其重要,那么它将会在未来一段时间内再次被写入。因此,丢弃一部分写操作有助于让缓存的写入性能变得更加平滑。

经 channel 发送出的数据会被一个后台 goroutine 处理,该 goroutine 负责检查该操作能否最终应用到缓存中,若被缓存拒绝,则操作会被丢弃并且不会通知用户。该 goroutine 的核心逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 所有被延迟的操作都会在这里被处理
for {
select {
case i := <-c.setBuf:
// 如果没有指定 cost,根据指定策略计算 cost
if i.Cost == 0 && c.cost != nil && i.flag != itemDelete {
i.Cost = c.cost(i.Value)
}
if !c.ignoreInternalCost {
i.Cost += itemSize
}
// 根据不同的操作类型采取不同操作
switch i.flag {
case itemNew:
// 根据策略判断能否写入
victims, added := c.policy.Add(i.Key, i.Cost)
if added {
c.store.Set(i)
c.Metrics.add(keyAdd, i.Key, 1)
trackAdmission(i.Key)
} else {
c.onReject(i)
}
// 驱逐失效的缓存
for _, victim := range victims {
victim.Conflict, victim.Value = c.store.Del(victim.Key, 0)
onEvict(victim)
}

case itemUpdate:
c.policy.Update(i.Key, i.Cost)

case itemDelete:
c.policy.Del(i.Key) // Deals with metrics updates.
_, val := c.store.Del(i.Key, i.Conflict)
c.onExit(val)
}
// 清理和退出逻辑
case <-c.cleanupTicker.C:
c.store.Cleanup(c.policy, onEvict)
case <-c.stop:
return
}
}

该代码片段的逻辑非常简单,不过多描述。

写入与逐出策略

当 Ristretto 并没有达到缓存使用的内存上限时,所有的键值对都可以自由地写入缓存中;当缓存使用的内存达到上限时,则会根据写入门限和逐出策略来决定哪些键值对需要保留,哪些键值对需要被逐出。defaultPolicy结构体中包括了使用的写入门限实现与逐出策略实现,该结构体使用互斥锁进行保护,其中admit字段实现了写入控制策略,evict字段实现了逐出策略。

1
2
3
4
5
6
7
8
9
type defaultPolicy struct {
sync.Mutex
admit *tinyLFU // 写入策略实现
evict *sampledLFU // 逐出策略实现
itemsCh chan []uint64
stop chan struct{}
isClosed bool
metrics *Metrics
}

当进程内缓存达到上限时,一个 key 能否写入取决于自身准入值和 Cost 以及缓存中存储的其余键的准入值和 Cost。一个键的准入值描述了它在当前的采样周期内被读取操作所需求的程度高低,在不同的采样周期内,一个键的准入值是不同的。这种定义方式不仅有助于更好地描述缓存使用者在最近一段时间内的需求,也有助于降低计算准入值所需要的数据量。在这种定义方式下,准入值是与时间相关的,每次获得准入值都需要计算,因此对 LFU 算法的效率有一定要求。

为了更好地衡量一个键值对带来的影响,Ristretto 还引入了 Cost 来描述键值对的体量。一个键值对的 Cost 越大,就意味着它需要逐出缓存中更多的键来满足空间需求。而 Ristretto 在一轮比较中只会逐出一个键值对。这代表了 Cost 较大的键值对在写入时会经历更多轮的比较,这意味着更多的键值对会被抽样和比较,如果 newKey 的准入值并不够高,那么它很有可能会在多轮比较中被淘汰。这种逐出算法有利于筛选出准入值和 Cost 都较为适中的键值对,保证缓存在有限的空间内保留更多更有价值的键值对。

当缓存已写满时,Ristretto 会使用 tinyLFU 算法计算出需要新写入的 newKey 的准入值,然后在缓存已有的键值对中随机抽样一组,并选出其中准入值最小的键 minKey。如果 minKey 的准入值大于 newKey,那么会拒绝写入 newKey;否则会将 minKey 从缓存中逐出,更新缓存中的剩余空间并判断是否有足够空间写入。如果空间仍不足够,会重复上述过程,直至 newKey 被拒绝或被接受。该过程在 Ristretto 中的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 函数会尝试将键插入缓存中,如果判别过程中发生了键的逐出,被逐出的键会被记录在返回值evicted中
// 返回值accepted代表键是否允许写入
func (p *defaultPolicy) Add(key uint64, cost int64) (evicted []*Item,accepted bool) {
p.Lock()
defer p.Unlock()
// Cannot add an item bigger than entire cache.
if cost > p.evict.getMaxCost() {
return nil, false
}
// Double Check,由于写操作的延迟,本应该是更新操作
if has := p.evict.updateIfHas(key, cost); has {
return nil, false
}
// 计算剩余空间
room := p.evict.roomLeft(cost)
// 还有剩余空间就直接插入
if room >= 0 {
p.evict.add(key, cost)
p.metrics.add(costAdd, key, uint64(cost))
return nil, true
}
// 计算将要插入键的准入值
incHits := p.admit.Estimate(key)
sample := make([]*policyPair, 0, lfuSample)
victims := make([]*Item, 0)
// 重复逐出键直至有足够空间
for ; room < 0; room = p.evict.roomLeft(cost) {
// 随机采样几个键,并计算其中的最小准入值
sample = p.evict.fillSample(sample)
minKey, minHits, minId, minCost := uint64(0), int64(math.MaxInt64), 0, int64(0)
for i, pair := range sample {
if hits := p.admit.Estimate(pair.key); hits < minHits {
minKey, minHits, minId, minCost = pair.key, hits, i, pair.cost
}
}
// 如果最小准入值大于要插入键的准入值,拒绝进入缓存
if incHits < minHits {
p.metrics.add(rejectSets, key, 1)
return victims, false
}
// 逐出准入值最小的键
p.evict.del(minKey)
sample[minId] = sample[len(sample)-1]
sample = sample[:len(sample)-1]
victims = append(victims, &Item{
Key: minKey,
Conflict: 0,
Cost: minCost,
})
}
p.evict.add(key, cost)
p.metrics.add(costAdd, key, uint64(cost))
return victims, true
}

当尝试插入一个 newKey 时,可能会出现 newKey 并没有写入,但是仍有一部分键被逐出的情况。

tinyLFU 准入门限

写入策略主要实现在tinyLFU结构体中,该结构体及其算法实现了 tinyLFU 算法,它包含四个字段。其中,freq 字段是一个 Count-Min sketch(cmSketch is a Count-Min sketch implementation with 4-bit counters, heavily based on Damian Gryski’s CM4),door 字段是一个布隆过滤器。

1
2
3
4
5
6
type tinyLFU struct {
freq *cmSketch // Count-Min sketch
door *z.Bloom // 布隆过滤器
incrs int64 // 当前记录的键值对数量
resetAt int64 // 重置门限
}

在 tinyLFU 算法中,一个 Key 的准入门限是由 freq 和 door 两者共同决定的,先简要介绍一下 Count-Min sketch 算法。

Count-Min sketch 是一个类似于 HyperLogLog 的计数算法,在数据量较大时通过牺牲一定的准确度来提升运算效率。但 HyperLogLog 算法目的是估算数据量的多少,而 Count-Min sketch 则为了估算一个数出现的频率。该算法内部包含了 n 个哈希算法和一个 n*m 大小的数组,当一个 key 需要被记录时,会使用不同的哈希算法计算出哈希值并对 m 取模后存入到数组对应位置。当需要统计时,使用同样的方法得到一个 n 大小的数组,被统计数的出现频率被视为该数组中的最小值。

Count-Min sketch 算法的准确率与数据规模是密切相关的,该算法需要设置一个阈值以初始化,当数据规模超出阈值后,估算频率的准确度会有较大的偏离。在 tinyLFU 中会只保留一定规模的键值对,当键值对超出规模时会被重置。这种设计不仅保证了 Count-Min sketch 算法的准确度维持在一个相对合理的范围内,还能保证只统计过去一段时间内的读取操作,实践证明不仅取得了较好的缓存命中率,还有助于限制需要记录的数据规模。

当需要评估一个键的准入值时,tinyLFU 主要考虑了两个维度的问题:该键被需要的频率、是否有相似的键值被需要。该键被需要的频率,即该键被读取的次数是需要考虑的主要因素,tinyLFU 中是使用Count-Min sketch 算法实现的。而另外一个因素,是否有相似的键值被需要,则体现了该键在未来一段时间内被读取的可能性。缓存中的键值对在大多数时间内都是具有关联性的,因为这些键很有可能是通过某些算法来实现的。一个键在过去一段时间内曾被读取就意味着一个与之相似的键很有可能在未来的一段时间内被读取,但这种因果关系并不是强因果关系,因此这一因素被作为次要因素来考虑。tinyLFU 中使用布隆过滤器来描述键的相似性问题,在布隆过滤器中相似的键更有可能被写入到相同的哈希槽中。

1
2
3
4
5
6
7
8
9
func (p *tinyLFU) Estimate(key uint64) int64 {
// 评估键在过去被读取的频率
hits := p.freq.Estimate(key)
// 评估是否有相似键被读取
if p.door.Has(key) {
hits++
}
return hits
}

另一方面,布隆过滤器还会限制什么样的键能够进入到 Count-Min sketch。

1
2
3
4
5
6
7
8
9
10
11
func (p *tinyLFU) Increment(key uint64) {
// 过滤布隆过滤器中不存在的key
if added := p.door.AddIfNotHas(key); !added {
p.freq.Increment(key)
}
p.incrs++
// 超过上限,重置
if p.incrs >= p.resetAt {
p.reset()
}
}

根据官方的说法,这是为了防止长尾键污染缓存。

Before we place a new key in TinyLFU, Ristretto uses a bloom filter to first check if the key has been seen before. Only if the key is already present in the bloom filter, is it inserted into the TinyLFU. This is to avoid polluting TinyLFU with a long tail of keys that are not seen more than once.

总结

Ristretto 与其他缓存最大的不同之处是其使用了 LFU 算法。在高并发情景下,为了降低 LFU 计数器这一全局资源所带来的竞争,Ristretto 采用了本地缓存批处理采样的思想来改变访问 LFU 计数器的方式,取得了非常好的效果。在工程实现上,最值得学习的是采用sync.Pool搭配 ring_buffer 的模式,实现了一个近似的 thread local 缓存。在允许一定数据丢失的场景下,如采样,使用这种思路可以巧妙地将流处理转换为批处理模式,大大提升系统的吞吐量。