Go sync.Map 和并发安全

Golang 提供的 map 并不是并发安全的,当存在并发读写 map 的情况时,需要我们自行实现并发安全,或者使用 Golang 标准库提供的 sync.Map。本文将介绍各种保证 map 并发安全的方法和实现,包括 sync.Map 的实现原理。

并发读写问题#

Golang 不保证原生 map 的并发安全,如果同时存在多个 goroutine 并发读写 map,将会抛出 fatal 并终止程序。这里需要注意,fatal 是无法被 recover 捕获的。

以下是一个存在并发读写问题的例子,程序中存在两个 goroutine,其中一个一直读取 map,另一个一直在写入 map。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package main

import "fmt"

func main() {
	m := make(map[int]int)
	go func() {
		for i := 0; i < 10000; i++ {
			m[i] = i
		}
	}()
	for i := 0; i < 10000; i++ {
		fmt.Println(m[i])
	}
}

执行程序后,会抛出并发读写 map 的 fatal,并且会在控制台输出涉及了读写 map 的两个 goroutine 的调用栈信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
fatal error: concurrent map read and map write

goroutine 1 [running]:
main.main()
        /tmp/main.go:13 +0x7b

goroutine 19 [runnable]:
main.main.func1()
        /tmp/main.go:9 +0x2f
created by main.main in goroutine 1
        /tmp/main.go:7 +0x58
exit status 2

当然,如果是并发读 map,并不会有任何问题。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
m := make(map[int]int)
m[9999] = 100
go func() {
    for i := 0; i < 10000; i++ {
        fmt.Println(m[i])
    }
}()
for i := 0; i < 10000; i++ {
    fmt.Println(m[i])
}

读写加锁#

解决并发读写问题最简单的解决方案就是通过加锁的方式让读写流程按合理的顺序执行。前面我们了解到并发读写是存在问题的,而并发读是没有问题的,因此使用 sync.RWMutex 加锁适合用于这种场景。

下面这个示例,我们在进行 map 读操作前使用 RLock 加锁,在进行 map 写操作前使用 Lock 加锁,这样我们能够确保不会出现并发读写问题。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
m := make(map[int]int)
var mu sync.RWMutex
go func() {
	for i := 0; i < 10000; i++ {
		mu.Lock()
		m[i] = i
		mu.Unlock()
	}
}()
for i := 0; i < 10000; i++ {
	mu.RLock()
	tmp := m[i]
	mu.RUnlock() // 在 fmt.Println 之前更快释放锁, 因为后续只使用了临时变量
	fmt.Println(tmp)
}

这种解决方案完全由实现者确保并发安全,优势是实现简单、通俗易懂,缺点是相当于加了一把全局锁,如果存在写操作时,所有其他读写操作都要等待写操作完成才可继续进行。

如果为了避免出现部分场景漏掉加锁和释放锁操作的情况发生,可以对锁进行封装,将读和写分别封装到 Load 和 Store 方法中,使用时无需关注如何操作锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type ConcurrentMap[K comparable, V any] struct {
	mu sync.RWMutex
	m  map[K]V
}

func NewConcurrentMap[K comparable, V any]() *ConcurrentMap[K, V] {
	m := new(ConcurrentMap[K, V])
	m.m = make(map[K]V, 0)
	return m
}

func (m *ConcurrentMap[K, V]) Load(key K) (V, bool) {
	m.mu.RLock()
	defer m.mu.RUnlock()
	v, ok := m.m[key]
	return v, ok
}

func (m *ConcurrentMap[K, V]) Store(key K, value V) {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.m[key] = value
}

func main() {
	m := NewConcurrentMap[int, int]()
	go func() {
		for i := 0; i < 10000; i++ {
			m.Store(i, i)
		}
	}()
	for i := 0; i < 10000; i++ {
		tmp, _ := m.Load(i)
		fmt.Println(tmp)
	}
}

并发安全的 sync.Map#

sync.Map 使用#

Golang 的标准库提供了一个并发安全的 map 实现,即 sync 包里面的 sync.Map。sync.Map 实际上也是对 map 的封装,但是使用了一些特殊实现而不是直接加锁,使得其性能优于直接加锁。

接下来我们先了解下 sync.Map 是如何使用的,通过阅读 Golang 标准库源码可以发现 sync.Map 为我们提供了如下的方法,这些方法都是并发安全的原子操作。

1
2
3
4
5
6
7
8
9
func (m *Map) CompareAndDelete(key, old any) bool
func (m *Map) CompareAndSwap(key, old, new any) bool
func (m *Map) Delete(key any)
func (m *Map) Load(key any) (any, bool)
func (m *Map) LoadAndDelete(key any) (any, bool)
func (m *Map) LoadOrStore(key, value any) (any, bool)
func (m *Map) Range(f func(key, value any) bool)
func (m *Map) Store(key, value any)
func (m *Map) Swap(key, value any) (any, bool)

当我们需要读写 map 时,可以使用 Load 和 Store 进行并发安全的读写操作。

1
2
3
4
var m sync.Map
m.Store(1, "MegaShow")
fmt.Println(m.Load(1))  // => MegaShow true
fmt.Println(m.Load(2))  // => <nil> false

如果需要对 map 进行遍历操作时,需要通过 Range 方法进行遍历。

1
2
3
4
m.Range(func(key, value any) bool {
	fmt.Println(key, value)
	return true
})

如果需要实现更复杂的原子操作,可以调用 sync.Map 所实现的其他方法完成这些操作,比如 LoadOrStore、CompareAndSwap、Swap 等方法。

sync.Map 性能#

现在我们来看看标准库提供的 sync.Map 跟使用 sync.RWMutex 实现的并发安全的 map 性能有啥差距。Golang 官方提供了相应的 benchmark 测试样例,在我的电脑下执行的结果如下。

1
2
3
4
5
6
7
8
BenchmarkLoadMostlyHits/*main.RWMutexMap-12          39950328    31.24 ns/op
BenchmarkLoadMostlyHits/*sync.Map-12                206646472    5.878 ns/op
BenchmarkLoadMostlyMisses/*main.RWMutexMap-12        43427280    28.94 ns/op
BenchmarkLoadMostlyMisses/*sync.Map-12              271705485    4.427 ns/op
BenchmarkLoadOrStoreBalanced/*main.RWMutexMap-12      4890918    270.7 ns/op
BenchmarkLoadOrStoreBalanced/*sync.Map-12             4743060    292.2 ns/op
BenchmarkLoadOrStoreUnique/*main.RWMutexMap-12        2636839    438.6 ns/op
BenchmarkLoadOrStoreUnique/*sync.Map-12               2336914    519.9 ns/op

可以观察到,在我的电脑上 sync.Map 不管是读存在的 key 还是不存在的 key,相对于用 sync.RWMutex 实现并发安全的 map,在并发测试的情况下,都有 5-6 倍左右的性能优势。而对于写操作,sync.Map 存在一定程度的性能劣势,所以 sync.Map 是适合于读多写少的场景。

那么,sync.Map 是怎么做到读性能优于 sync.RWMutex 的封装呢,接下来我们分析下 sync.Map 的实现。

sync.Map 实现#

先来看看 sync.Map 的数据结构。

sync-map

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type Map struct {
	mu Mutex
	read atomic.Pointer[readOnly]
	dirty map[any]*entry
	misses int
}

type readOnly struct {
	m       map[any]*entry
	amended bool
}

type entry struct {
	p atomic.Pointer[any]
}

首先我们分析下 sync.Map 的各个字段。

  • mu:互斥锁,用于保障 read 和 dirty 的并发安全;
  • read:可直接并发读的 map 数据,未存储全量数据;
  • dirty:并发不安全的 map 数据,存储全量数据或为 nil (如果为 nil 时 read 存储全量数据);
  • misses:未命中 read 的次数,用于记录是否需要将 dirty 转换为 read。

sync.Map 使用了一种巧妙的设计,在结构内部使用了两个原生 map 存储数据。其中 read 用于大量并发读场景,由于本身使用 atomic.Pointer 封装,读取数据时无需进行手动加锁,可以直接获取数据。不过部分场景 read 中并未存储全量数据,在 dirty 里面存储了部分脏数据,由于 dirty 本身由原生的 map 实现,如果读取短期内更新的数据或者进行写入操作时,需要进行手动加锁。

当 sync.Map 需要写入或交换数据时,首先会判断 read 中是否存在对应的 key。如果存在,则直接替换或交换 read 中对应 key 的数据;如果不存在,则通过 mu 对整个 Map 加锁,然后将数据写入 dirty 中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
	// 获取 read, 如果 read 为空则初始化一个 read
	// 如果 read 中存在 key, 则尝试交换
	// 如果 read 中存在 key 但状态为已删除, 这里 ok 会返回 false, 即交换失败
	// 由于 e.p 是原子的, 这里交换数据不会存在并发读写问题
	read := m.loadReadOnly()
	if e, ok := read.m[key]; ok {
		if v, ok := e.trySwap(&value); ok {
			if v == nil {
				return nil, false
			}
			return *v, true
		}
	}

	// read 中不存在 key 或交换失败, 只能整个 Map 加锁
	m.mu.Lock()

	// 再次获取 read 并进行尝试交换, 由于加锁前 read 可能被修改
	read = m.loadReadOnly()
	if e, ok := read.m[key]; ok {
		// 如果 key 的状态为已删除, 则将值置为 nil 并写入 dirty
		// 因为状态为已删除时, 意味着 dirty 中无该 key
		if e.unexpungeLocked() {
			m.dirty[key] = e
		}
		// 交换数据 (这里一定成功, 因为状态不可能为已删除)
		if v := e.swapLocked(&value); v != nil {
			loaded = true
			previous = *v
		}
	} else if e, ok := m.dirty[key]; ok {
		// 如果 read 中不存在 key 但 dirty 中存在 key, 直接交换 dirty 中的数据
		if v := e.swapLocked(&value); v != nil {
			loaded = true
			previous = *v
		}
	} else {
		// 如果 read 为全量数据时, 将 read 设为非全量数据, 并写入 dirty
		if !read.amended {
			m.dirtyLocked() // dirty 为空则初始化 dirty
			m.read.Store(&readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
	}

	// 释放锁
	m.mu.Unlock()
	return previous, loaded
}

根据上面的代码分析,我们可以发现,read 有一个 amended 字段,用于判断 dirty 中是否有脏数据,即 dirty 中是否存在 read 中不存在的 key。如果 amended 为 true 则意味着我们有些数据无法通过 read 直接获取,而是要通过 dirty 获取。

另外,entry 存在一种特殊状态,key 存在但是处于已删除状态。代码创建了一个 entry 类型的 expunged 空变量,通过判断 entry 是否为 expunged 判断是否已删除。这样,即使 key 已删除,在 read 中依然存在该 key 和 value 的数据,因此我们可以并发读 read 请求已删除的 key,无需加锁。

当 sync.Map 需要读取数据时,首先会判断 read 中是否存在该 key 和 dirty 中是否有脏数据。如果存在 key 且没有脏数据则直接返回获取的数据;如果不存在 key 或存在脏数据,则对整个 Map 进行加锁,然后读取 dirty 中的数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (m *Map) Load(key any) (value any, ok bool) {
	// 获取 read, 如果 read 为空则初始化一个 read
	read := m.loadReadOnly()
	e, ok := read.m[key]
	if !ok && read.amended {
		// 如果不存在 key 或 dirty 中存在脏数据, 则加锁
		// 需要再次判断 read, 因为加锁前 read 可能被修改
		m.mu.Lock()
		read = m.loadReadOnly()
		e, ok = read.m[key]
		if !ok && read.amended {
			// 判断 dirty 中是否存在数据
			// misses++, 并且判断如果 misses 次数是否超过 dirty 长度
			// 则将 dirty 赋值给 read
			e, ok = m.dirty[key]
			m.missLocked()
		}
		// 释放锁
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

在需要读取 dirty 的时候,Map 内部会记录 miss 后读取 dirty 的次数,如果次数过多到达一定阈值,则将 dirty 当成 read 存储下来,并且将 dirty 变量记录为 nil。等待下次需要写入 dirty 数据时,dirty 才会再次全量拷贝一份 read 数据。

1
2
3
4
5
6
7
8
9
func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(&readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

当 sync.Map 需要删除数据时,会判断是否需要删除 dirty 中的 key,同时会将 read 中的 entry 重置为已删除状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
	// 获取 read, 如果 read 为空则初始化一个 read
	read := m.loadReadOnly()
	e, ok := read.m[key]
	if !ok && read.amended {
		// 如果不存在 key 或 dirty 中存在脏数据, 则加锁
		// 需要再次判断 read, 因为加锁前 read 可能被修改
		m.mu.Lock()
		read = m.loadReadOnly()
		e, ok = read.m[key]
		if !ok && read.amended {
			// 删除 dirty 中的 key, 并且判断是否需要将 dirty 赋值给 read
			e, ok = m.dirty[key]
			delete(m.dirty, key)
			m.missLocked()
		}
		m.mu.Unlock() // 释放锁
	}
	if ok {
		return e.delete() // 将 entry 置为已删除状态
	}
	return nil, false
}

可以观察到,sync.Map 这种两个 map 的实现方式,使得 sync.Map 非常适用于读多写少的场景,读性能高于 sync.RWMutex 加锁,而写性能略低于 sync.RWMutex 加锁,因为 sync.Map 写场景需要处理复杂的逻辑。

分片读写加锁#

既然 sync.Map 适合于读多写少的场景,那么有没有一种实现能适合于写多的场景。答案是肯定的,我们可以对 map 进行分片,通过多个 map 降低写操作加锁阻塞的概率。但是这种方案有利也有弊,利是它的写性能要优于上面两种方案,但是缺点是读性能可能要低于上面两种方案,且需要专门为每一种 key 类型提供分片的方法。

接下来我们简单实现一个带分片的并发安全 map。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type ShardConcurrentMap[K comparable, V any] struct {
	mu        []sync.RWMutex
	m         []map[K]V
	shardNum  int
	shardFunc func(key K) int
}

func NewShardConcurrentMap[K comparable, V any](shardNum int, shardFunc func(key K) int) *ShardConcurrentMap[K, V] {
	m := new(ShardConcurrentMap[K, V])
	m.mu = make([]sync.RWMutex, shardNum)
	m.m = make([]map[K]V, shardNum)
	for idx := range m.m {
		m.m[idx] = make(map[K]V, 0)
	}
	m.shardNum = shardNum
	m.shardFunc = shardFunc
	return m
}

func (m *ShardConcurrentMap[K, V]) Load(key K) (V, bool) {
	shard := m.shardFunc(key) % m.shardNum
	m.mu[shard].RLock()
	defer m.mu[shard].RUnlock()
	v, ok := m.m[shard][key]
	return v, ok
}

func (m *ShardConcurrentMap[K, V]) Store(key K, value V) {
	shard := m.shardFunc(key) % m.shardNum
	m.mu[shard].Lock()
	defer m.mu[shard].Unlock()
	m.m[shard][key] = value
}

构造 ShardConcurrentMap 的时候,需要传入分片的数量和分片函数,然后我们将根据分片数量 N 初始化 N 个 sync.RWMutex 和 N 个 map。每个分片之间相互独立,当需要读或写操作时,先计算出对应的 key 在哪个分片上,然后再操作该分片。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
m := NewShardConcurrentMap[int, int](32, func(key int) int {
	return key
})
go func() {
	for i := 0; i < 10000; i++ {
		m.Store(i, i)
	}
}()
for i := 0; i < 10000; i++ {
	tmp, _ := m.Load(i)
	fmt.Println(tmp)
}

结语#

本文介绍了 map 的并发读写问题,以及三种并发读写问题的解决方案,每种解决方案都有利有弊,在我们使用 map 进行并发读写的场景下,应该结合实际场景选择合适的方案来保障程序的并发安全。