【Golang】并发之锁

众所周知,并发需要处理的最大的问题就是锁,因为并发情况下可能会重复访问同一个变量导致变量污染,这个时候就需要用锁来保证变量不会被重复操作

变量污染的例子

首先串行的执行一个加法运算:

var x int

func main() {
	add()
	add()
	add()
	fmt.Println(x) // 150015000
}

func add() {
	for i := 1; i <= 10000; i++ {
		x += i
	}
}

然后将这个计算并发执行以下

var x int
var wg sync.WaitGroup

func main() {
	wg.Add(3)
	go add()
	go add()
	go add()
	wg.Wait()
	fmt.Println(x) // 100641126
}

func add() {
	for i := 1; i <= 10000; i++ {
		x += i
	}
	wg.Done()
}

由于并发的时候可能会同时操作全局变量x,导致x计算不准。这个时候就可以加锁

互斥锁:type Mutex

type Mutex struct {
    // 包含隐藏或非导出字段
}

Mutex是一个互斥锁,可以创建为其他结构体的字段;零值为解锁状态。Mutex类型的锁和线程无关,可以由不同的线程加锁和解锁。

func (m *Mutex) Lock()

Lock方法锁住m,如果m已经加锁,则阻塞直到m解锁。

func (m *Mutex) Unlock()

Unlock方法解锁m,如果m未加锁会导致运行时错误。锁和线程无关,可以由不同的线程加锁和解锁。

利用互斥锁防止变量污染:

var x int
var l sync.Mutex
var wg sync.WaitGroup

func main() {
	wg.Add(3)
	go add()
	go add()
	go add()
	wg.Wait()
	fmt.Println(x) // 150015000
}

func add() {
	for i := 1; i <= 10000; i++ {
		l.Lock()
		x += i
		l.Unlock()
	}
	wg.Done()
}

利用加锁让共享的变量x在并发操作中保证一次只有一个协程会访问到,达到隔离的效果,防止变量污染。

读写互斥锁:type RWMutex

type RWMutex struct {
    // 包含隐藏或非导出字段
}

RWMutex是读写互斥锁。该锁可以被同时多个读取者持有或唯一个写入者持有。RWMutex可以创建为其他结构体的字段;零值为解锁状态。RWMutex类型的锁也和线程无关,可以由不同的线程加读取锁/写入和解读取锁/写入锁。

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。

和mysql中的读写锁一样,读锁和读锁之间是不互斥的,写锁和读锁,写锁和写锁之间是互斥的。也就是文档中说的 该锁可以被同时多个读取者持有或唯一个写入者持有

func (rw *RWMutex) Lock()

Lock方法将rw锁定为写入状态,禁止其他线程读取或者写入。

func (rw *RWMutex) Unlock()

Unlock方法解除rw的写入锁状态,如果m未加写入锁会导致运行时错误。

func (rw *RWMutex) RLock()

RLock方法将rw锁定为读取状态,禁止其他线程写入,但不禁止读取。

func (rw *RWMutex) RUnlock()

Runlock方法解除rw的读取锁状态,如果m未加读取锁会导致运行时错误。

func (rw *RWMutex) RLocker() Locker

Rlocker方法返回一个互斥锁,通过调用rw.Rlock和rw.Runlock实现了Locker接口。

var x int
var l sync.RWMutex
var wg sync.WaitGroup

func main() {
	start := time.Now()
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go write()
	}

	for i := 0; i < 100; i++ {
		wg.Add(1)
		go read()
	}
	wg.Wait()
	fmt.Println(time.Now().Sub(start))
	fmt.Println(x)
}

func write() {
	defer wg.Done()
	l.Lock()
	time.Sleep(time.Millisecond * 50)
	x += 1
	l.Unlock()
}

func read() {
	defer wg.Done()
	l.RLock()
	time.Sleep(time.Millisecond * 10)
	// fmt.Println(x)
	l.RUnlock()
}

上面就是一个读写的例子,在读的时候加读锁,在写的时候加写锁,防止写的过程中读到旧数据导致数据不一致。

条件锁:type Cond

type Cond struct {
    // 在观测或更改条件时L会冻结
    L Locker
    // 包含隐藏或非导出字段
}

Cond实现了一个条件变量,一个线程集合地,供线程等待或者宣布某事件的发生。

每个Cond实例都有一个相关的锁(一般是Mutex或RWMutex类型的值),它必须在改变条件时或者调用Wait方法时保持锁定。Cond可以创建为其他结构体的字段,Cond在开始使用后不能被拷贝。

顾名思义,条件锁就是让程序可以在满足一定条件时解锁。

func NewCond(l Locker) *Cond

使用锁l创建一个*Cond。

func (c *Cond) Broadcast()

Broadcast唤醒所有等待c的线程。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。

func (c *Cond) Signal()

Signal唤醒等待c的一个线程(如果存在)。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。

func (c *Cond) Wait()

Wait自行解锁c.L并阻塞当前线程,在之后线程恢复执行时,Wait方法会在返回前锁定c.L。和其他系统不同,Wait除非被Broadcast或者Signal唤醒,不会主动返回。

var x int
var l sync.Mutex
var cond sync.Cond
var wg sync.WaitGroup

func main() {
	cond = *sync.NewCond(&l)
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go condition()
	}
	var s string
	for {
		fmt.Println("请输入:s = 唤醒一个锁,b = 唤醒所有锁")
		n, _ := fmt.Scan(&s)
		if n != 1 {
			fmt.Println("参数匹配失败")
			continue
		}
		switch true {
		case strings.ToLower(s) == "s":
			cond.Signal()
		case strings.ToLower(s) == "b":
			cond.Broadcast()
		default:
			fmt.Println("输入指令无法识别")
		}
	}
}

func condition() {
	defer wg.Done()
	cond.L.Lock() // 拿锁
	cond.Wait()   // 阻塞等待条件
	x += 1
	fmt.Println(x)
	cond.L.Unlock() // 解锁
}
请输入:s = 唤醒一个锁,b = 唤醒所有锁
s
请输入:s = 唤醒一个锁,b = 唤醒所有锁
1
s
请输入:s = 唤醒一个锁,b = 唤醒所有锁
2
s
请输入:s = 唤醒一个锁,b = 唤醒所有锁
3
b
请输入:s = 唤醒一个锁,b = 唤醒所有锁
4
5
6
7
8
9
10

条件锁依赖于他内部的互斥锁或读写锁,和互斥锁或读写锁不同的是,可以使用cond.wait()来阻塞运行,使用cond.Signal()来解开一个锁,用cond.Broadcast()来解开所有锁

type WaitGroup

这个在协程中说过了,是用来阻塞主进程等待线程执行的。

池:type Pool

type Pool struct {
    // 可选参数New指定一个函数在Get方法可能返回nil时来生成一个值
    // 该参数不能在调用Get方法时被修改
    New func() interface{}
    // 包含隐藏或非导出字段
}

Pool是一个可以分别存取的临时对象的集合。

Pool中保存的任何item都可能随时不做通告的释放掉。如果Pool持有该对象的唯一引用,这个item就可能被回收。

Pool可以安全的被多个线程同时使用。

Pool的目的是缓存申请但未使用的item用于之后的重用,以减轻GC的压力。也就是说,让创建高效而线程安全的空闲列表更容易。但Pool并不适用于所有空闲列表。

Pool的合理用法是用于管理一组静静的被多个独立并发线程共享并可能重用的临时item。Pool提供了让多个线程分摊内存申请消耗的方法。

Pool的一个好例子在fmt包里。该Pool维护一个动态大小的临时输出缓存仓库。该仓库会在过载(许多线程活跃的打印时)增大,在沉寂时缩小。

另一方面,管理着短寿命对象的空闲列表不适合使用Pool,因为这种情况下内存申请消耗不能很好的分配。这时应该由这些对象自己实现空闲列表。

func (p *Pool) Get() interface{}

Get方法从池中选择任意一个item,删除其在池中的引用计数,并提供给调用者。Get方法也可能选择无视内存池,将其当作空的。调用者不应认为Get的返回这和传递给Put的值之间有任何关系。

假使Get方法没有取得item:如p.New非nil,Get返回调用p.New的结果;否则返回nil。

func (p *Pool) Put(x interface{})

Put方法将x放入池中。

pool就是用来维护一堆临时变量的工具,常见的地方例如:连接池

串行map sync.Map

在go中,map不是一个并发安全的类型,当在多个协程中操作同一个map时程序会报错:

var wg sync.WaitGroup

func main() {
	var m map[string]string
	m = make(map[string]string, 10)
	for i := 0;i<200 ;i++ {
		wg.Add(1)
		go editMap(m)
	}
	wg.Wait()
	fmt.Println(m)
}

func editMap(m map[string]string) {
	defer wg.Done()
	m["k1"] = "v1"
	m["k2"] = "v2"
	m["k3"] = "v3"
	m["k4"] = "v4"
	m["k5"] = "v5"
	m["k6"] = "v6"
	m["k7"] = "v7"
	m["k8"] = "v8"
}
fatal error: concurrent map writes

这个时候,为了避免出现读写的并发操作导致冲突,你可以通过加锁来处理,不过Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map内置了诸如StoreLoadLoadOrStoreDeleteRange等操作方法。

  1. func (m *Map) Load(key interface{}) (value interface{}, ok bool)

  2. func (m *Map) Store(key, value interface{})

  3. func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) 存在则读, 不存在则写了之后读, 有点像优化cache的感觉.

  4. func (m *Map) Delete(key interface{}) 删除

  5. func (m *Map) Range(f func(key, value interface{}) bool) 遍历Map中所有key.

var wg sync.WaitGroup

func main() {
	m := sync.Map{}
	for i := 0; i < 50; i++ {
		wg.Add(2)
		go editMap(&m)
		go readMap(&m)
	}
	wg.Wait()
	m.Range(func(key, value interface{}) bool {
		fmt.Println(key, value)
		return true
	})
}

func editMap(m *sync.Map) {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		m.Store("k"+strconv.Itoa(i), "v"+strconv.Itoa(i))
	}
}

func readMap(m *sync.Map) {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		m.Load("k" + strconv.Itoa(i))
	}
}

原子操作:atomic

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。

这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。

应通过通信来共享内存,而不通过共享内存实现通信。

读取操作:

func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64) func LoadUint32(addr *uint32) (val uint32) func LoadUint64(addr *uint64) (val uint64) func LoadUintptr(addr *uintptr) (val uintptr) func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

写入操作:

func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

修改操作: func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

交换操作: func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

比较操作(值与old相同则会变为new):

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

var x int32
var wg sync.WaitGroup

func main() {
	wg.Add(3)
	go add()
	go add()
	go add()
	wg.Wait()
	fmt.Println(x) // 150015000
}
func add() {
	for i := 1; i <= 10000; i++ {
		atomic.AddInt32(&x, int32(i))
	}
	wg.Done()
}

小结

协程相关思维导图下载(使用xmind打开):golang-goroutine.xmind

程序幼儿员-龚学鹏
请先登录后发表评论
  • latest comments
  • 总共0条评论