通道用例大全一文中介紹了很多通過(guò)使用通道來(lái)實(shí)現(xiàn)并發(fā)同步的用例。 事實(shí)上,通道并不是Go支持的唯一的一種并發(fā)同步技術(shù)。而且對(duì)于一些特定的情形,通道并不是最有效和可讀性最高的同步技術(shù)。 本文下面將介紹?sync
?標(biāo)準(zhǔn)庫(kù)包中提供的各種并發(fā)同步技術(shù)。相對(duì)于通道,這些技術(shù)對(duì)于某些情形更加適用。
?sync
?標(biāo)準(zhǔn)庫(kù)包提供了一些用于實(shí)現(xiàn)并發(fā)同步的類型。這些類型適用于各種不同的內(nèi)存順序需求。 對(duì)于這些特定的需求,這些類型使用起來(lái)比通道效率更高,代碼實(shí)現(xiàn)更簡(jiǎn)潔。
(請(qǐng)注意:為了避免各種異常行為,最好不要復(fù)制sync
標(biāo)準(zhǔn)庫(kù)包中提供的類型的值。)
每個(gè)sync.WaitGroup
值在內(nèi)部維護(hù)著一個(gè)計(jì)數(shù),此計(jì)數(shù)的初始默認(rèn)值為零。
*sync.WaitGroup
類型有三個(gè)方法:Add(delta int)
、Done()
和Wait()
。
對(duì)于一個(gè)可尋址的sync.WaitGroup
值wg
,
wg.Add(delta)
來(lái)改變值wg
維護(hù)的計(jì)數(shù)。wg.Done()
和wg.Add(-1)
是完全等價(jià)的。wg.Add(delta)
或者wg.Done()
調(diào)用將wg
維護(hù)的計(jì)數(shù)更改成一個(gè)負(fù)數(shù),一個(gè)恐慌將產(chǎn)生。wg.Wait()
時(shí),
wg
維護(hù)的計(jì)數(shù)為零,則此wg.Wait()
此操作為一個(gè)空操作(no-op);wg.Done()
),此協(xié)程將重新進(jìn)入運(yùn)行狀態(tài)(即wg.Wait()
將返回)。請(qǐng)注意wg.Add(delta)
、wg.Done()
和wg.Wait()
分別是(&wg).Add(delta)
、(&wg).Done()
和(&wg).Wait()
的簡(jiǎn)寫(xiě)形式。
一般,一個(gè)sync.WaitGroup
值用來(lái)讓某個(gè)協(xié)程等待其它若干協(xié)程都先完成它們各自的任務(wù)。 一個(gè)例子:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
const N = 5
var values [N]int32
var wg sync.WaitGroup
wg.Add(N)
for i := 0; i < N; i++ {
i := i
go func() {
values[i] = 50 + rand.Int31n(50)
fmt.Println("Done:", i)
wg.Done() // <=> wg.Add(-1)
}()
}
wg.Wait()
// 所有的元素都保證被初始化了。
fmt.Println("values:", values)
}
在此例中,主協(xié)程等待著直到其它5個(gè)協(xié)程已經(jīng)將各自負(fù)責(zé)的元素初始化完畢此會(huì)打印出各個(gè)元素值。 這里是一個(gè)可能的程序執(zhí)行輸出結(jié)果:
Done: 4
Done: 1
Done: 3
Done: 0
Done: 2
values: [71 89 50 62 60]
我們可以將上例中的Add
方法調(diào)用拆分成多次調(diào)用:
...
var wg sync.WaitGroup
for i := 0; i < N; i++ {
wg.Add(1) // 將被執(zhí)行5次
i := i
go func() {
values[i] = 50 + rand.Int31n(50)
wg.Done()
}()
}
...
一個(gè)*sync.WaitGroup
值的Wait
方法可以在多個(gè)協(xié)程中調(diào)用。 當(dāng)對(duì)應(yīng)的sync.WaitGroup
值維護(hù)的計(jì)數(shù)降為0,這些協(xié)程都將得到一個(gè)(廣播)通知而結(jié)束阻塞狀態(tài)。
func main() {
rand.Seed(time.Now().UnixNano())
const N = 5
var values [N]int32
var wgA, wgB sync.WaitGroup
wgA.Add(N)
wgB.Add(1)
for i := 0; i < N; i++ {
i := i
go func() {
wgB.Wait() // 等待廣播通知
log.Printf("values[%v]=%v \n", i, values[i])
wgA.Done()
}()
}
// 下面這個(gè)循環(huán)保證將在上面的任何一個(gè)
// wg.Wait調(diào)用結(jié)束之前執(zhí)行。
for i := 0; i < N; i++ {
values[i] = 50 + rand.Int31n(50)
}
wgB.Done() // 發(fā)出一個(gè)廣播通知
wgA.Wait()
}
一個(gè)WaitGroup
可以在它的一個(gè)Wait
方法返回之后被重用。 但是請(qǐng)注意,當(dāng)一個(gè)WaitGroup
值維護(hù)的基數(shù)為零時(shí),它的帶有正整數(shù)實(shí)參的Add
方法調(diào)用不能和它的Wait
方法調(diào)用并發(fā)運(yùn)行,否則將可能出現(xiàn)數(shù)據(jù)競(jìng)爭(zhēng)。
每個(gè)*sync.Once
值有一個(gè)Do(f func())
方法。 此方法只有一個(gè)類型為func()
的參數(shù)。
對(duì)一個(gè)可尋址的sync.Once
值o
,o.Do()
(即(&o).Do()
的簡(jiǎn)寫(xiě)形式)方法調(diào)用可以在多個(gè)協(xié)程中被多次并發(fā)地執(zhí)行, 這些方法調(diào)用的實(shí)參應(yīng)該(但并不強(qiáng)制)為同一個(gè)函數(shù)值。 在這些方法調(diào)用中,有且只有一個(gè)調(diào)用的實(shí)參函數(shù)(值)將得到調(diào)用。 此被調(diào)用的實(shí)參函數(shù)保證在任何o.Do()
方法調(diào)用返回之前退出。 換句話說(shuō),被調(diào)用的實(shí)參函數(shù)內(nèi)的代碼將在任何o.Do()
方法返回調(diào)用之前被執(zhí)行。
一般來(lái)說(shuō),一個(gè)sync.Once
值被用來(lái)確保一段代碼在一個(gè)并發(fā)程序中被執(zhí)行且僅被執(zhí)行一次。
一個(gè)例子:
package main
import (
"log"
"sync"
)
func main() {
log.SetFlags(0)
x := 0
doSomething := func() {
x++
log.Println("Hello")
}
var wg sync.WaitGroup
var once sync.Once
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
once.Do(doSomething)
log.Println("world!")
}()
}
wg.Wait()
log.Println("x =", x) // x = 1
}
在此例中,Hello
將僅被輸出一次,而world!
將被輸出5次,并且Hello
肯定在所有的5個(gè)world!
之前輸出。
*sync.Mutex
和*sync.RWMutex
類型都實(shí)現(xiàn)了sync.Locker
接口類型。 所以這兩個(gè)類型都有兩個(gè)方法:Lock()
和Unlock()
,用來(lái)保護(hù)一份數(shù)據(jù)不會(huì)被多個(gè)使用者同時(shí)讀取和修改。
除了Lock()
和Unlock()
這兩個(gè)方法,*sync.RWMutex
類型還有兩個(gè)另外的方法:RLock()
和RUnlock()
,用來(lái)支持多個(gè)讀取者并發(fā)讀取一份數(shù)據(jù)但防止此份數(shù)據(jù)被某個(gè)數(shù)據(jù)寫(xiě)入者和其它數(shù)據(jù)訪問(wèn)者(包括讀取者和寫(xiě)入者)同時(shí)使用。
(注意:這里的數(shù)據(jù)讀取者和數(shù)據(jù)寫(xiě)入者不應(yīng)該從字面上理解。有時(shí)候某些數(shù)據(jù)讀取者可能修改數(shù)據(jù),而有些數(shù)據(jù)寫(xiě)入者可能只讀取數(shù)據(jù)。)
一個(gè)Mutex
值常稱為一個(gè)互斥鎖。 一個(gè)Mutex
零值為一個(gè)尚未加鎖的互斥鎖。 一個(gè)(可尋址的)Mutex
值m
只有在未加鎖狀態(tài)時(shí)才能通過(guò)m.Lock()
方法調(diào)用被成功加鎖。 換句話說(shuō),一旦m
值被加了鎖(亦即某個(gè)m.Lock()
方法調(diào)用成功返回), 一個(gè)新的加鎖試圖將導(dǎo)致當(dāng)前協(xié)程進(jìn)入阻塞狀態(tài),直到此Mutex
值被解鎖為止(通過(guò)m.Unlock()
方法調(diào)用)。
注意:m.Lock()
和m.Unlock()
分別是(&m).Lock()
和(&m).Unlock()
的簡(jiǎn)寫(xiě)形式。
一個(gè)使用sync.Mutex
的例子:
package main
import (
"fmt"
"runtime"
"sync"
)
type Counter struct {
m sync.Mutex
n uint64
}
func (c *Counter) Value() uint64 {
c.m.Lock()
defer c.m.Unlock()
return c.n
}
func (c *Counter) Increase(delta uint64) {
c.m.Lock()
c.n += delta
c.m.Unlock()
}
func main() {
var c Counter
for i := 0; i < 100; i++ {
go func() {
for k := 0; k < 100; k++ {
c.Increase(1)
}
}()
}
// 此循環(huán)僅為演示目的。
for c.Value() < 10000 {
runtime.Gosched()
}
fmt.Println(c.Value()) // 10000
}
在上面這個(gè)例子中,一個(gè)Counter
值使用了一個(gè)Mutex
字段來(lái)確保它的字段n
永遠(yuǎn)不會(huì)被多個(gè)協(xié)程同時(shí)使用。
一個(gè)RWMutex
值常稱為一個(gè)讀寫(xiě)互斥鎖,它的內(nèi)部包含兩個(gè)鎖:一個(gè)寫(xiě)鎖和一個(gè)讀鎖。 對(duì)于一個(gè)可尋址的RWMutex
值rwm
,數(shù)據(jù)寫(xiě)入者可以通過(guò)方法調(diào)用rwm.Lock()
對(duì)rwm
加寫(xiě)鎖,或者通過(guò)rwm.RLock()
方法調(diào)用對(duì)rwm
加讀鎖。 方法調(diào)用rwm.Unlock()
和rwm.RUnlock()
用來(lái)解開(kāi)rwm
的寫(xiě)鎖和讀鎖。 rwm
的讀鎖維護(hù)著一個(gè)計(jì)數(shù)。當(dāng)rwm.RLock()
調(diào)用成功時(shí),此計(jì)數(shù)增1;當(dāng)rwm.Unlock()
調(diào)用成功時(shí),此計(jì)數(shù)減1;
一個(gè)零計(jì)數(shù)表示rwm
的讀鎖處于未加鎖狀態(tài);反之,一個(gè)非零計(jì)數(shù)(肯定大于零)表示rwm
的讀鎖處于加鎖狀態(tài)。
注意rwm.Lock()
、rwm.Unlock()
、rwm.RLock()
和rwm.RUnlock()
分別是(&rwm).Lock()
、(&rwm).Unlock()
、(&rwm).RLock()
和(&rwm).RUnlock()
的簡(jiǎn)寫(xiě)形式。
對(duì)于一個(gè)可尋址的RWMutex
值rwm
,下列規(guī)則存在:
rwm
的寫(xiě)鎖只有在它的寫(xiě)鎖和讀鎖都處于未加鎖狀態(tài)時(shí)才能被成功加鎖。 換句話說(shuō),rwm
的寫(xiě)鎖在任何時(shí)刻最多只能被一個(gè)數(shù)據(jù)寫(xiě)入者成功加鎖,并且rwm
的寫(xiě)鎖和讀鎖不能同時(shí)處于加鎖狀態(tài)。rwm
的寫(xiě)鎖正處于加鎖狀態(tài)的時(shí)候,任何新的對(duì)之加寫(xiě)鎖或者加讀鎖的操作試圖都將導(dǎo)致當(dāng)前協(xié)程進(jìn)入阻塞狀態(tài),直到此寫(xiě)鎖被解鎖,這樣的操作試圖才有機(jī)會(huì)成功。rwm
的讀鎖正處于加鎖狀態(tài)的時(shí)候,新的加寫(xiě)鎖的操作試圖將導(dǎo)致當(dāng)前協(xié)程進(jìn)入阻塞狀態(tài)。 但是,一個(gè)新的加讀鎖的操作試圖將成功,只要此操作試圖發(fā)生在任何被阻塞的加寫(xiě)鎖的操作試圖之前(見(jiàn)下一條規(guī)則)。 換句話說(shuō),一個(gè)讀寫(xiě)互斥鎖的讀鎖可以同時(shí)被多個(gè)數(shù)據(jù)讀取者同時(shí)加鎖而持有。 當(dāng)rwm
的讀鎖維護(hù)的計(jì)數(shù)清零時(shí),讀鎖將返回未加鎖狀態(tài)。rwm
的讀鎖正處于加鎖狀態(tài)的時(shí)候,為了防止后續(xù)數(shù)據(jù)寫(xiě)入者沒(méi)有機(jī)會(huì)成功加寫(xiě)鎖,后續(xù)發(fā)生在某個(gè)被阻塞的加寫(xiě)鎖操作試圖之后的所有加讀鎖的試圖都將被阻塞。rwm
的寫(xiě)鎖正處于加鎖狀態(tài)的時(shí)候,(至少對(duì)于標(biāo)準(zhǔn)編譯器來(lái)說(shuō),)為了防止后續(xù)數(shù)據(jù)讀取者沒(méi)有機(jī)會(huì)成功加讀鎖,發(fā)生在此寫(xiě)鎖下一次被解鎖之前的所有加讀鎖的試圖都將在此寫(xiě)鎖下一次被解鎖之后肯定取得成功,即使所有這些加讀鎖的試圖發(fā)生在一些仍被阻塞的加寫(xiě)鎖的試圖之后。后兩條規(guī)則是為了確保數(shù)據(jù)讀取者和寫(xiě)入者都有機(jī)會(huì)執(zhí)行它們的操作。
請(qǐng)注意:一個(gè)鎖并不會(huì)綁定到一個(gè)協(xié)程上,即一個(gè)鎖并不記錄哪個(gè)協(xié)程成功地加鎖了它。 換句話說(shuō),一個(gè)鎖的加鎖者和此鎖的解鎖者可以不是同一個(gè)協(xié)程,盡管在實(shí)踐中這種情況并不多見(jiàn)。
在上一個(gè)例子中,如果Value
方法被十分頻繁調(diào)用而Increase
方法并不頻繁被調(diào)用,則Counter
類型的m
字段的類型可以更改為sync.RWMutex
,從而使得執(zhí)行效率更高,如下面的代碼所示。
...
type Counter struct {
//m sync.Mutex
m sync.RWMutex
n uint64
}
func (c *Counter) Value() uint64 {
//c.m.Lock()
//defer c.m.Unlock()
c.m.RLock()
defer c.m.RUnlock()
return c.n
}
...
sync.RWMutex
值的另一個(gè)應(yīng)用場(chǎng)景是將一個(gè)寫(xiě)任務(wù)分隔成若干小的寫(xiě)任務(wù)。下一節(jié)中展示了一個(gè)這樣的例子。
根據(jù)上面列出的后兩條規(guī)則,下面這個(gè)程序最有可能輸出abdc
。
package main
import (
"fmt"
"time"
"sync"
)
func main() {
var m sync.RWMutex
go func() {
m.RLock()
fmt.Print("a")
time.Sleep(time.Second)
m.RUnlock()
}()
go func() {
time.Sleep(time.Second * 1 / 4)
m.Lock()
fmt.Print("b")
time.Sleep(time.Second)
m.Unlock()
}()
go func() {
time.Sleep(time.Second * 2 / 4)
m.Lock()
fmt.Print("c")
m.Unlock()
}()
go func () {
time.Sleep(time.Second * 3 / 4)
m.RLock()
fmt.Print("d")
m.RUnlock()
}()
time.Sleep(time.Second * 3)
fmt.Println()
}
請(qǐng)注意,上例這個(gè)程序僅僅是為了解釋和驗(yàn)證上面列出的讀寫(xiě)鎖的后兩條加鎖規(guī)則。 此程序使用了time.Sleep
調(diào)用來(lái)做協(xié)程間的同步。這種所謂的同步方法不應(yīng)該被使用在生產(chǎn)代碼中。
sync.Mutex
和sync.RWMutex
值也可以用來(lái)實(shí)現(xiàn)通知,盡管這不是Go中最優(yōu)雅的方法來(lái)實(shí)現(xiàn)通知。 下面是一個(gè)使用了Mutex
值來(lái)實(shí)現(xiàn)通知的例子。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var m sync.Mutex
m.Lock()
go func() {
time.Sleep(time.Second)
fmt.Println("Hi")
m.Unlock() // 發(fā)出一個(gè)通知
}()
m.Lock() // 等待通知
fmt.Println("Bye")
}
在此例中,Hi
將確保在Bye
之前打印出來(lái)。 關(guān)于sync.Mutex
和sync.RWMutex
值相關(guān)的內(nèi)存順序保證,請(qǐng)閱讀Go中的內(nèi)存順序保證一文。
sync.Cond
類型提供了一種有效的方式來(lái)實(shí)現(xiàn)多個(gè)協(xié)程間的通知。
每個(gè)sync.Cond
值擁有一個(gè)sync.Locker
類型的名為L
的字段。 此字段的具體值常常為一個(gè)*sync.Mutex
值或者*sync.RWMutex
值。
*sync.Cond
類型有三個(gè)方法:Wait()
、Signal()
和Broadcast()
。
每個(gè)Cond
值維護(hù)著一個(gè)先進(jìn)先出等待協(xié)程隊(duì)列。 對(duì)于一個(gè)可尋址的Cond
值c
,
c.Wait()
必須在c.L
字段值的鎖處于加鎖狀態(tài)的時(shí)候調(diào)用;否則,c.Wait()
調(diào)用將造成一個(gè)恐慌。
一個(gè)c.Wait()
調(diào)用將
c
所維護(hù)的等待協(xié)程隊(duì)列;
c.L.Unlock()
對(duì)c.L
的鎖解鎖;
然后使當(dāng)前協(xié)程進(jìn)入阻塞狀態(tài);
(當(dāng)前協(xié)程將被另一個(gè)協(xié)程通過(guò)c.Signal()
或c.Broadcast()
調(diào)用喚醒而重新進(jìn)入運(yùn)行狀態(tài)。)
c.L.Lock()
將被調(diào)用以試圖重新對(duì)c.L
字段值的鎖加鎖。
此c.Wait()
調(diào)用將在此試圖成功之后退出。
c.Signal()
調(diào)用將喚醒并移除c
所維護(hù)的等待協(xié)程隊(duì)列中的第一個(gè)協(xié)程(如果此隊(duì)列不為空的話)。
c.Broadcast()
調(diào)用將喚醒并移除c
所維護(hù)的等待協(xié)程隊(duì)列中的所有協(xié)程(如果此隊(duì)列不為空的話)。
請(qǐng)注意:c.Wait()
、c.Signal()
和c.Broadcast()
分別為(&c).Wait()
、(&c).Signal()
和(&c).Broadcast()
的簡(jiǎn)寫(xiě)形式。
c.Signal()
和c.Broadcast()
調(diào)用常用來(lái)通知某個(gè)條件的狀態(tài)發(fā)生了變化。 一般說(shuō)來(lái),c.Wait()
應(yīng)該在一個(gè)檢查某個(gè)條件是否已經(jīng)得到滿足的循環(huán)中調(diào)用。
下面是一個(gè)典型的sync.Cond
用例。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
const N = 10
var values [N]string
cond := sync.NewCond(&sync.Mutex{})
for i := 0; i < N; i++ {
d := time.Second * time.Duration(rand.Intn(10)) / 10
go func(i int) {
time.Sleep(d) // 模擬一個(gè)工作負(fù)載
cond.L.Lock()
// 下面的修改必須在cond.L被鎖定的時(shí)候執(zhí)行
values[i] = string('a' + i)
cond.Broadcast() // 可以在cond.L被解鎖后發(fā)出通知
cond.L.Unlock()
// 上面的通知也可以在cond.L未鎖定的時(shí)候發(fā)出。
//cond.Broadcast() // 上面的調(diào)用也可以放在這里
}(i)
}
// 此函數(shù)必須在cond.L被鎖定的時(shí)候調(diào)用。
checkCondition := func() bool {
fmt.Println(values)
for i := 0; i < N; i++ {
if values[i] == "" {
return false
}
}
return true
}
cond.L.Lock()
defer cond.L.Unlock()
for !checkCondition() {
cond.Wait() // 必須在cond.L被鎖定的時(shí)候調(diào)用
}
}
一個(gè)可能的輸出:
[ ]
[ f ]
[ c f ]
[ c f h ]
[ b c f h ]
[a b c f h j]
[a b c f g h i j]
[a b c e f g h i j]
[a b c d e f g h i j]
因?yàn)樯侠兄挥幸粋€(gè)協(xié)程(主協(xié)程)在等待通知,所以其中的cond.Broadcast()
調(diào)用也可以換為cond.Signal()
。 如上例中的注釋所示,cond.Broadcast()
和cond.Signal()
不必在cond.L
的鎖處于加鎖狀態(tài)時(shí)調(diào)用。
為了防止數(shù)據(jù)競(jìng)爭(zhēng),對(duì)自定義條件的修改必須在cond.L
的鎖處于加鎖狀態(tài)時(shí)才能執(zhí)行。 另外,checkCondition
函數(shù)和cond.Wait
方法也必須在cond.L
的鎖處于加鎖狀態(tài)時(shí)才可被調(diào)用。
事實(shí)上,對(duì)于上面這個(gè)特定的例子,cond.L
字段的也可以為一個(gè)*sync.RWMutex
值。 對(duì)自定義條件的十個(gè)部分的修改可以在RWMutex
值的讀鎖處于加鎖狀態(tài)時(shí)執(zhí)行。這十個(gè)修改可以并發(fā)進(jìn)行,因?yàn)樗鼈兪腔ゲ桓蓴_的。 如下面的代碼所示:
...
cond := sync.NewCond(&sync.RWMutex{})
cond.L.Lock()
for i := 0; i < N; i++ {
d := time.Second * time.Duration(rand.Intn(10)) / 10
go func(i int) {
time.Sleep(d)
cond.L.(*sync.RWMutex).RLock()
values[i] = string('a' + i)
cond.L.(*sync.RWMutex).RUnlock()
cond.Signal()
}(i)
}
...
在上面的代碼中,此sync.RWMutex
值的用法有些不符常規(guī)。 它的讀鎖被一些修改數(shù)組元素的協(xié)程所加鎖并持有,而它的寫(xiě)鎖被主協(xié)程加鎖持有用來(lái)讀取并檢查各個(gè)數(shù)組元素的值。
Cond
值所表示的自定義條件可以是一個(gè)虛無(wú)。對(duì)于這種情況,此Cond
值純粹被用來(lái)實(shí)現(xiàn)通知。 比如,下面這個(gè)程序?qū)⒋蛴〕?code>abc或者bac
。
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
cond := sync.NewCond(&sync.Mutex{})
cond.L.Lock()
go func() {
cond.L.Lock()
go func() {
cond.L.Lock()
cond.Broadcast()
cond.L.Unlock()
}()
cond.Wait()
fmt.Print("a")
cond.L.Unlock()
wg.Done()
}()
cond.Wait()
fmt.Print("b")
cond.L.Unlock()
wg.Wait()
fmt.Println("c")
}
如果需要,多個(gè)sync.Cond
值可以共享一個(gè)sync.Locker
值。但是這種情形在實(shí)踐中并不多見(jiàn)。
更多建議: