状态的协程。
sync.Cond
的代码实现比较简单,协程的唤醒和阻塞已经由运行时包实现了,sync.Cond
的实现直接调用了运行时包提供的API。
3.2 实现
3.2.1 Wait方法实现
Wait
方法首先调用runtime_notifyListAd
方法,将自己加入到等待队列中,然后释放锁,等待其他协程的唤醒。
func (c *Cond) Wait() {
// 将自己放到等待队列中
t := runtime_notifyListAdd(&c.notify)
// 释放锁
c.L.Unlock()
// 等待唤醒
runtime_notifyListWait(&c.notify, t)
// 重新获取锁
c.L.Lock()
}
3.2.2 Singal方法实现
Singal
方法调用runtime_notifyListNotifyOne
唤醒等待队列中的一个协程。
func (c *Cond) Signal() {
// 唤醒等待队列中的一个协程
runtime_notifyListNotifyOne(&c.notify)
}
3.2.3 Broadcast方法实现
Broadcast
方法调用runtime_notifyListNotifyAll
唤醒所有处于等待状态的协程。
func (c *Cond) Broadcast() {
// 唤醒等待队列中所有的协程
runtime_notifyListNotifyAll(&c.notify)
}
4.使用注意事项
4.1 调用Wait方法前未加锁
4.1.1 问题
如果在调用Wait
方法前未加锁,此时会直接panic
,下面是一个简单例子的说明:
package main
import (
"fmt"
"sync"
"time"
)
var (
count int
cond *sync.Cond
lk sync.Mutex
)
func main() {
cond = sync.NewCond(&lk)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
time.Sleep(time.Second)
count++
cond.Broadcast()
}
}()
go func() {
defer wg.Done()
for {
time.Sleep(time.Millisecond * 500)
//cond.L.Lock()
for count%10 != 0 {
cond.Wait()
}
t.Logf("count = %d", count)
//cond.L.Unlock()
}
}()
wg.Wait()
}
上面代码中,协程一每隔1s,将count字段的值自增1,然后唤醒所有处于等待状态的协程。协程二执行的条件为count的值为10的倍数,此时满足执行条件,唤醒后将会继续往下执行。
但是这里在调用sync.Wait
方法前,没有先获取锁,下面是其执行结果,会抛出 fatal error: sync: unlock of unlocked mutex 错误,结果如下:
count = 0
fatal error: sync: unlock of unlocked mutex
因此,在调用Wait
方法前,需要先获取到与sync.Cond
关联的锁,否则会直接抛出异常。
4.1.2 为什么调用Wait方法前需要先获取该锁
强制调用Wait方法前需要先获取该锁。这里的原因在于调用Wait
方法如果不加锁,有可能会出现竞态条件。
这里假设多个协程都处于等待状态,然后一个协程调用了Broadcast唤醒了其中一个或多个协程,此时这些协程都会被唤醒。
如下,假设调用Wait
方法前没有加锁的话,那么所有协程都会去调用condition
方法去判断是否满足条件,然后都通过验证,执行后续操作。
for !condition() {
c.Wait()
}
c.L.Lock()
// 满足条件情况下,执行的逻辑
c.L.Unlock()
此时会出现的情况为,本来是需要在满足condition
方法的前提下,才能执行的操作。现在有可能的效果,为前面一部分协程执行时,还是满足condition
条件的;但是后面的协程,尽管不满足condition
条件,还是执行了后续操作,可能导致程序出错。
正常的用法应该是,在调用Wait
方法前便加锁,只会有一个协程判断是否满足condition
条件,然后执行后续操作。这样子就不会出现即使不满足条件,也会执行后续操作的情况出现。
c.L.Lock()
for !condition() {
c.Wait()
}
// 满足条件情况下,执行的逻辑
c.L.Unlock()
4.2 Wait方法接收到通知后,未重新检查条件变量
调用sync.Wait
方法,协程进入阻塞状态后被唤醒,没有重新检查条件变量,此时有可能仍然处于不满足条件变量的场景下。然后直接执行后续操作,有可能会导致程序出错。下面举一个简单的例子:
package main
import (
"fmt"
"sync"
"time"
)
var (
count int
cond *sync.Cond
lk sync.Mutex
)
func main() {
cond = sync.NewCond(&lk)
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
for {
time.Sleep(time.Second)
cond.L.Lock()
// 将flag 设置为true
flag = true
// 唤醒所有处于等待状态的协程
cond.Broadcast()
cond.L.Unlock()
}
}()
for i := 0; i < 2; i++ {
go func(i int) {
defer wg.Done()
for {
time.Sleep(time.Millisecond * 500)
cond.L.Lock()
// 不满足条件,此时进入等待状态
if !flag {
cond.Wait()
}
// 被唤醒后,此时可能仍然不满足条件
fmt.Printf("协程 %d flag = %t", i, flag)
flag = false
cond.L.Unlock()
}
}(i)
}
wg.Wait()
}
在这个例子,我们启动了一个协程,定时将flag
设置为true,相当于每隔一段时间,便满足执行条件,然后唤醒所有处于等待状态的协程。
然后又启动了两个协程,在满足条件的前提下,开始执行后续操作,但是这里协程被唤醒后,没有重新检查条件变量,具体看第39行。这里会出现的场景是,第一个协程被唤醒后,此时执行后续操作,然后将flag
重新设置为false,此时已经不满足条件了。之后第二个协程唤醒后,获取到锁,没有重新检查此时是否满足执行条件,直接向下执行,这个就和我们预期不符,可能会导致程序出错,代码执行效果如下:
协程 1 flag = true
协程 0 flag = false
协程 1 flag = true
协程 0 flag = false