Sync

go中的sync包

在Go语言中,除了使用channel进行goroutine之间的通信和同步操作外,还可以使用syne包下的并发工具。

并发工具类 说明
Mutex 互斥锁
RWMutex 读写锁
WaitGroup 并发等待组
Map 并发安全字典
Cond 同步等待条件
Once 只执行一次
Pool 临时对象池

临界区 #

有时候在Go代码中可能会存在多个goroutine同时操作一个资源区(临界区),这种情况会发生竟态问题(数据竟态)。

临界区:当程序并发地运行时,多个 [Go 协程]不应该同时访问那些修改共享资源的代码。这些修改共享资源的代码称为临界区。

package main
import (
	"fmt"
	"sync"
)
var x = 10
var wg sync.WaitGroup
func add() {
	for i := 0; i < 5000; i++ {
		x = x + 1
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

代码中我们开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,导致最后的结果与期待的不符。

sync.Mutex #

Mutex 用于提供一种加锁机制(Locking Mechanism),可确保在某时刻只有一个协程在临界区运行,以防止出现竞态条件。

go里面的锁是不可重入的,即不可以重复进入。

Mutex 可以在 [sync] 包内找到。[Mutex] 定义了两个方法:[Lock]和 [Unlock](。所有在 LockUnlock 之间的代码,都只能由一个 Go 协程执行,于是就可以避免竞态条件。

mutex.Lock()  
x = x + 1  
mutex.Unlock()

如果有一个 Go 协程已经持有了锁(Lock),当其他协程试图获得该锁时,这些协程会被阻塞,直到 Mutex 解除锁定为止

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。

package main
import (
	"fmt"
	"sync"
)
var x = 10
var wg sync.WaitGroup
var lock sync.Mutex  // 值类型,不需要初始化
func add() {
	for i := 0; i < 5000; i++ {
		lock.Lock()
		x = x + 1
		lock.Unlock()
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

sync.RWMutex #

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。

type RWMutex struct {
    w Mutex
    writerSem uint32
    readerSem uint32
    readerCount int32
    readerWait int32
}

读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

package main
import (
	"fmt"
	"sync"
	"time"
)
var x = 10
var wg sync.WaitGroup
//var rwlock sync.RWMutex  // 值类型,不需要初始化
//var lock sync.Mutex  // 值类型,不需要初始化
var lock   sync.Mutex
var rwlock sync.RWMutex
func write()  {
	//rwlock.Lock() // 写锁都用Lock
	lock.Lock()
	time.Sleep(1 * time.Millisecond) // 模拟写耗时1毫秒
	x=x+1
	//rwlock.Unlock()
	lock.Unlock()
	wg.Done()
}
func read()  {
	//rwlock.RLock() // 读锁用RLock
	lock.Lock()
	time.Sleep(time.Millisecond) // 模拟读耗时1毫秒
	//fmt.Printf("x 现在的值是:%d\n",x)
	//rwlock.RUnlock()
	lock.Unlock()
	wg.Done()
}
func main() {
	// 统计开始时间
	time1:=time.Now()
	// 开10个协程写
	for i := 0; i < 10 ; i++ {
		wg.Add(1)
		go write()
	}
	// 开1000个协程读
	for i := 0; i < 1000 ; i++ {
		wg.Add(1)
		go read()
	}
	wg.Wait()
	fmt.Println("x最终值为:",x)
	// 统计结束时间
	time2:=time.Now()
	fmt.Println(time2.Sub(time1))  // 结束时间-开始时间
	// 使用读写锁:15.387426ms
	// 使用互斥锁:1.26868106s
}
需要注意的是读写锁非常适合读多写少的场景如果读和写的操作差别不大读写锁的优势就发挥不出来

说明:

对于这两种锁类型,任何一个 Lock() 或 RLock() 均需要保证对应有 Unlock() 或 RUnlock() 调用与之对应,否则可能导致等待该锁的所有 goroutine 处于饥饿状态,甚至可能导致死锁。锁的典型使用模式如下:

package main
import (    
	"fmt"    
	"sync"
)
var ( 
  count int // 逻辑中使用的某个变量 无论是包级的变量还是结构体成员字段,都可以。
  countGuard sync.Mutex// 与变量对应的使用互斥锁 一般情况下,建议将互斥锁的粒度设置得越小越好,降低因为共享访问时等待的时间。这里笔者习惯性地将互斥锁的变量命名为以下格式:变量名+Guard 以表示这个互斥锁用于保护这个变量。
)
func GetCount() int { //一个获取 count 值的函数封装,通过这个函数可以并发安全的访问变量 count。
  countGuard.Lock()// 锁定 尝试对 countGuard 互斥量进行加锁。一旦 countGuard 发生加锁,如果另外一个 goroutine 尝试继续加锁时将会发生阻塞,直到这个 countGuard 被解锁。
  defer countGuard.Unlock()// 在函数退出时解除锁定 使用 defer 将 countGuard 的解锁进行延迟调用,解锁操作将会发生在 GetCount() 函数返回时。
  return count
}
func SetCount(c int) {//在设置 count 值时,同样使用 countGuard 进行加锁、解锁操作,保证修改 count 值的过程是一个原子过程,不会发生并发访问冲突。
  countGuard.Lock()
  count = c
  countGuard.Unlock()
}
func main() {
  SetCount(1)// 可以进行并发安全的设置
  fmt.Println(GetCount())// 可以进行并发安全的获取
}

在读多写少的环境中,可以优先使用读写互斥锁(sync.RWMutex),它比互斥锁更加高效。sync 包中的 RWMutex 提供了读写互斥锁的封装。

我们将互斥锁例子中的一部分代码修改为读写互斥锁,参见下面代码:

var (
	count int// 逻辑中使用的某个变量
	countGuard sync.RWMutex// 与变量对应的使用互斥锁
)
func GetCount() int {
	countGuard.RLock()// 锁定 在这一行,把 countGuard.Lock() 换做 countGuard.RLock(),将读写互斥锁标记为读状态。如果此时另外一个 goroutine 并发访问了 countGuard,同时也调用了 countGuard.RLock() 时,并不会发生阻塞。
	defer countGuard.RUnlock()// 在函数退出时解除锁定
	return count
}

sync.WaitGroup #

在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:

方法名 功能
(wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。

var wg sync.WaitGroup
func hello() {
    defer wg.Done()
    fmt.Println("Hello World!")
}
func main() {
    wg.Add(1)
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("主协程结束!")
    wg.Wait()
}

需要注意sync.WaitGroup是一个结构体,传递的时候要传递指针。

sync.Once #

sync.Once 的使用场景

sync.Once 是 Go 标准库提供的使函数只执行一次的实现,常应用于单例模式,例如初始化配置、保持数据库连接等。作用与 init 函数类似,但有区别。

  • init 函数是当所在的 package 首次被加载时执行,若迟迟未被使用,则既浪费了内存,又延长了程序加载时间。
  • sync.Once 可以在代码的任意位置初始化和调用,因此可以延迟到使用时再执行,并发场景下是线程安全的。

在多数情况下,sync.Once 被用于控制变量的初始化,这个变量的读写满足如下三个条件:

  • 当且仅当第一次访问某个变量时,进行初始化(写);
  • 变量初始化过程中,所有读都被阻塞,直到初始化完成;
  • 变量仅初始化一次,初始化完成后驻留在内存里。

sync.Once 仅提供了一个方法 Do,参数 f 是对象初始化函数。

func (o *Once) Do(f func())

注意:如果要执行的函数f需要传递参数就需要搭配闭包来使用。

package main
import (
	"fmt"
	"sync"
)
// 实现单例
// 定义一个 sync.Once
var one sync.Once
// 定义一个animalSig的指针变量
var animalSig *Animal
// 定义一个结构体
type Animal struct {
	name string
	age  int
}
func getAnimalInstance() *Animal {
	one.Do(func() {
		fmt.Println("只会执行一次")
		animalSig = &Animal{"狗狗", 1}
	})
	return animalSig
}
func main() {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			res:=getAnimalInstance()
			fmt.Printf("单例animalSig地址为:%p\n",res)
			wg.Done()
		}()
	}
	wg.Wait()
}

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

闭包 = 函数 + 外部变量引用

闭包是函数和它引用的变量共同组成的实体,可以像变量一样被传递和调用。这使得闭包能够记住并访问它创建时的环境变量。

// 闭包示例:计数器函数
func counter() func() int {
    count := 0
    return func() int { // 闭包
        count++        // 捕获外部变量 count
        return count
    }
}

sync.Pool #

sync.Pool 的使用场景

一句话总结:保存和复用临时对象,减少内存分配,降低 GC 压力。

举个简单的例子:

type Student struct {
	Name   string
	Age    int32
	Remark [1024]byte
}

var buf, _ = json.Marshal(Student{Name: "Geektutu", Age: 25})

func unmarsh() {
	stu := &Student{}
	json.Unmarshal(buf, stu)
}

json 的反序列化在文本解析和网络通信过程中非常常见,当程序并发度非常高的情况下,短时间内需要创建大量的临时对象。而这些对象是都是分配在堆上的,会给 GC 造成很大压力,严重影响程序的性能。

参考:垃圾回收(GC)的工作原理

Go 语言从 1.3 版本开始提供了对象重用的机制,即 sync.Pool。sync.Pool 是可伸缩的,同时也是并发安全的,其大小仅受限于内存的大小。sync.Pool 用于存储那些被分配了但是没有被使用,而未来可能会使用的值。这样就可以不用再次经过内存分配,可直接复用已有对象,减轻 GC 的压力,从而提升系统的性能。

sync.Pool 的大小是可伸缩的,高负载时会动态扩容,存放在池中的对象如果不活跃了会被自动清理。

如何使用

sync.Pool 的使用方式非常简单:

  • 声明对象池

只需要实现 New 函数即可。对象池中没有对象时,将会调用 New 函数创建。

var studentPool = sync.Pool{
    New: func() interface{} { 
        return new(Student) 
    },
}
  • Get & Put
stu := studentPool.Get().(*Student)
json.Unmarshal(buf, stu)
studentPool.Put(stu)
  • Get() 用于从对象池中获取对象,因为返回值是 interface{},因此需要类型转换。
  • Put() 则是在对象使用完毕后,返回对象池。

sync.Map #

Go语言中内置的map不是并发安全的。请看下面的示例:

package main
import (
	"fmt"
	"strconv"
	"sync"
)
// 定义一个map
var m1 = make(map[string]string)
func setMap(key, valeu string) {
	m1[key] = valeu
}
func getMap(key string) string {
	return m1[key]
}
func main() {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(i)
			setMap(key, key)
			fmt.Println(getMap(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
	//报错:fatal error: concurrent map writes
}

上面的代码开启少量几个goroutine的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes错误。

像这种场景下就需要为 map 加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发安全版 map——sync.Map。开箱即用表示其不用像内置的 map 一样使用 make 函数初始化就能直接使用。同时sync.Map内置了诸如StoreLoadLoadOrStoreDeleteRange等操作方法。

方法名 功能
func (m *Map) Store(key, value interface{}) 存储key-value数据
func (m *Map) Load(key interface{}) (value interface{}, ok bool) 查询key对应的value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) 查询或存储key对应的value
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) 查询并删除key
func (m *Map) Delete(key interface{}) 删除key
func (m *Map) Range(f func(key, value interface{}) bool) 对map中的每个key-value依次调用f

下面的代码示例演示了并发读写sync.Map

package main
import (
	"fmt"
	"strconv"
	"sync"
)
var m1 sync.Map=sync.Map{}  // 要初始化
func main() {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			m1.Store(key,n)
			res,_:=m1.Load(key)
			fmt.Printf("key为:%s,value为:%d\n",key,res)
			wg.Done()
		}(i)
	}
	wg.Wait()
	//报错:fatal error: concurrent map writes
}

go sync.Map - 基本原理 #

map 在并发下的问题 #

同时对 map 进行读写,而 map 不支持并发读写,所以会报错。如果 map 允许并发读写,那么可能在我们使用的时候会有很多错乱的情况出现。

fatal error: concurrent map read and map write

使用 sync.Mutex 保证并发安全 #

对于 map 并发读写报错的问题,其中一种解决方案就是使用 sync.Mutex 来保证并发安全, 但是这样会导致我们在读写的时候,都需要加锁,这样就会导致性能的下降。

var m = make(map[int]int)
// 互斥锁
var mu sync.Mutex

// 写 map 的协程
go func() {
    for i := 0; i < 10000; i++ {
        mu.Lock() // 写 map,加互斥锁
        m[i] = i
        mu.Unlock()
    }
}()

// 读 map 的协程序
go func() {
    for i := 10000; i > 0; i-- {
        mu.Lock() // 读 map,加互斥锁
        _ = m[i]
        mu.Unlock()
    }
}()

time.Sleep(time.Second)

sync.Mutex 的常见的用法是在结构体中嵌入 sync.Mutex,而不是定义独立的两个变量。

sync.Mutex 来保证并发安全,但是在读和写的时候我们都需要加互斥锁。 这就意味着,就算多个协程进行并发读,也需要等待锁, 互斥锁的粒度太大了。

使用 sync.RWMutex 保证并发安全 #

sync 包中提供了 sync.RWMutex,这个锁可以允许进行并发读,但是写的时候还是需要等待锁。 也就是说,一个协程在持有写锁的时候,其他协程是既不能读也不能写的,只能等待写锁释放才能进行读写

var m = make(map[int]int)
// 读写锁(允许并发读,写的时候是互斥的)
var mu sync.RWMutex

// 写入 map 的协程
go func() {
    for i := 0; i < 10000; i++ {
        // 写入的时候需要加锁
        mu.Lock()
        m[i] = i
        mu.Unlock()
    }
}()

// 读取 map 的协程
go func() {
    for i := 10000; i > 0; i-- {
        // 读取的时候需要加锁,但是这个锁是读锁
        // 多个协程可以同时使用 RLock 而不需要等待
        mu.RLock()
        _ = m[i]
        mu.RUnlock()
    }
}()

// 另外一个读取 map 的协程
go func() {
    for i := 20000; i > 10000; i-- {
        // 读取的时候需要加锁,但是这个锁是读锁
        // 多个协程可以同时使用 RLock 而不需要等待
        mu.RLock()
        _ = m[i]
        mu.RUnlock()
    }
}()

time.Sleep(time.Second)

说明:

  • 多个协程可以同时使用 RLock 而不需要等待,这是读锁。
  • 只有一个协程可以使用 Lock,这是写锁,有写锁的时候,其他协程不能读也不能写。
  • 持有写锁的协程,可以使用 Unlock 来释放锁。
  • 写锁释放之后,其他协程才能获取到锁(读锁或者写锁)。

也就是说,使用 sync.RWMutex 的时候,读操作是可以并发执行的,但是写操作是互斥的。 这样一来,相比 sync.Mutex 来说等待锁的次数就少了,自然也就能获得更好的性能了。

gin 框架里面就使用了 sync.RWMutex 来保证 Keys 读写操作的并发安全。

有了读写锁为什么还要有 sync.Map? #

sync.Map 在锁的基础上做了进一步优化,在一些场景下使用原子操作来保证并发安全,性能更好。

使用原子操作替代读锁 #

但是就算使用 sync.RWMutex,读操作依然还有锁的开销,那么有没有更好的方式呢? 答案是有的,就是使用原子操作来替代读锁。

举一个很常见的例子就是多个协程同时读取一个变量,然后对这个变量进行累加操作:

var a int32

var wg sync.WaitGroup
wg.Add(2)

go func() {
    for i := 0; i < 10000; i++ {
        a++
    }
    wg.Done()
}()

go func() {
    for i := 0; i < 10000; i++ {
        a++
    }
    wg.Done()
}()

wg.Wait()

// a 期望结果应该是 20000才对。
fmt.Println(a) // 实际:17089,而且每次都不一样

这个例子中,我们期望的结果是 a 的值是 20000,但是实际上,每次运行的结果都不一样,而且都不会等于 20000。 其中很简单粗暴的一种解决方法是加锁,但是这样的话,性能就不好了,但是我们可以使用原子操作来解决这个问题:

var a atomic.Int32

var wg sync.WaitGroup
wg.Add(2)

go func() {
    for i := 0; i < 10000; i++ {
        a.Add(1)
    }
    wg.Done()
}()

go func() {
    for i := 0; i < 10000; i++ {
        a.Add(1)
    }
    wg.Done()
}()

wg.Wait()

fmt.Println(a.Load()) // 20000

锁跟原子操作的性能差多少? #

我们来看一下,使用锁和原子操作的性能差多少:

func BenchmarkMutexAdd(b *testing.B) {
   var a int32
   var mu sync.Mutex

   for i := 0; i < b.N; i++ {
      mu.Lock()
      a++
      mu.Unlock()
   }
}

func BenchmarkAtomicAdd(b *testing.B) {
   var a atomic.Int32
   for i := 0; i < b.N; i++ {
      a.Add(1)
   }
}

结果:

BenchmarkMutexAdd-12       100000000          10.07 ns/op
BenchmarkAtomicAdd-12      205196968           5.847 ns/op

我们可以看到,使用原子操作的性能比使用锁的性能要好一些。

也许我们会觉得上面这个例子是写操作,那么读操作呢?我们来看一下:

func BenchmarkMutex(b *testing.B) {
   var mu sync.RWMutex

   for i := 0; i < b.N; i++ {
      mu.RLock()
      mu.RUnlock()
   }
}

func BenchmarkAtomic(b *testing.B) {
   var a atomic.Int32
   for i := 0; i < b.N; i++ {
      _ = a.Load()
   }
}

结果:

BenchmarkMutex-12      100000000          10.12 ns/op
BenchmarkAtomic-12     1000000000          0.3133 ns/op

可以看到,使用原子操作的性能比使用锁的性能要好很多。而且在 BenchmarkMutex 里面甚至还没有做读取数据的操作。

sync.Map 里面的原子操作 #

sync.Map 里面相比 sync.RWMutex,性能更好的原因就是使用了原子操作。 在我们从 sync.Map 里面读取数据的时候,会先使用一个原子 Load 操作来读取 sync.Map 里面的 key(从 read 中读取)。 注意:这里拿到的是 key 的一份快照,我们对其进行读操作的时候也可以同时往 sync.Map 中写入新的 key,这是保证它高性能的一个很关键的设计(类似读写分离)。

sync.Map 里面的 Load 方法里面就包含了上述的流程:

// Load 方法从 sync.Map 里面读取数据。
func (m *Map) Load(key any) (value any, ok bool) {
   // 先从只读 map 里面读取数据。
   // 这一步是不需要锁的,只有一个原子操作。
   read := m.loadReadOnly()
   e, ok := read.m[key]
   if !ok && read.amended { // 如果没有找到,并且 dirty 里面有一些 read 中没有的 key,那么就需要从 dirty 里面读取数据。
      // 这里才需要锁
      m.mu.Lock()
      read = m.loadReadOnly()
      e, ok = read.m[key]
      if !ok && read.amended {
         e, ok = m.dirty[key]
         m.missLocked()
      }
      m.mu.Unlock()
   }
   
   // key 不存在
   if !ok {
      return nil, false
   }
   // 使用原子操作读取
   return e.Load()
}

上面的代码我们可能还看不懂,但是没关系,这里我们只需要知道的是,从 sync.Map 读取数据的时候,会先做原子操作,如果没找到,再进行加锁操作,这样就减少了使用锁的频率了,自然也就可以获得更好的性能(但要注意的是并不是所有情况下都能获得更好的性能)。至于具体实现,在下一篇文章中会进行更加详细的分析。

也就是说,sync.Map 之所以更快,是因为相比 RWMutex,进一步减少了锁的使用,而这也就是 sync.Map 存在的原因了

sync.Map 的基本用法 #

注意:在 sync.Map 中,keyvalue 都是 interface{} 类型的,也就是说,我们可以使用任意类型的 keyvalue。 而不像 map,只能存在一种类型的 keyvalue

LoadOrStore #

如果存在则不插入,读取里面的数据

var m sync.Map
value, ok := m.LoadOrStore(dbName, ep)
if ok { //证明存在
    
}

Store #

var m sync.Map
// 写入/修改
m.Store("foo", 1)

Range 遍历 #

m.Range(func(key, value interface{}) bool {
    fmt.Println(key, value) //  
    return true //返回true 继续遍历,返回false结束遍历
})

Delete #

// 删除
m.Delete("foo")

Load #

// 读取
fmt.Println(m.Load("foo")) // 1 true

判断为空 #

isEmpty := true
MountList.Range(func(key, value any) bool {
	isEmpty = false
	return false // 终止迭代
})s
if !isEmpty {
 return
}

获取长度 #

// 计算长度
    length := 0
    myMap.Range(func(key, value interface{}) bool {
        length++
        return true
    })

总结 #

  • 普通的 map 不支持并发读写。
  • 有以下两种方式可以实现 map的并发读写:
    • 使用 sync.Mutex 互斥锁。读和写的时候都使用互斥锁,性能相比 sync.RWMutex 会差一些。
    • 使用 sync.RWMutex 读写锁。读的锁是可以共享的,但是写锁是独占的。性能相比 sync.Mutex 会好一些。
  • sync.Map 里面会先进行原子操作来读取 key,如果读取不到的时候,才会需要加锁。所以性能相比 sync.Mutexsync.RWMutex 会好一些。
  • sync.Map里面几个常用的方法有(CRUD):
    • Store:我们新增或者修改数据的时候,都可以使用 Store 方法。
    • Load:读取数据的方法。
    • Range:遍历数据的方法。
    • Delete:删除数据的方法。
  • sync.Map 的使用场景,sync.Map 针对以下两种场景做了优化:
    • key 只会写入一次,但是会被读取多次的场景。
    • 多个 goroutine 读取、写入和覆盖不相交的键集的条目。