一架梯子,一头程序猿,仰望星空!
Golang程序设计教程(2024版) > 内容正文

学习Golang同步机制


1. 同步机制的作用

在并发编程中,当多个协程(goroutines)共享资源时,就需要确保资源在同一时刻只能被一个协程访问,防止产生竞态条件。这就需要用到同步机制。同步机制能够协调不同协程对共享资源的访问顺序,确保并发环境下的数据一致性和状态同步。

Go语言提供了丰富的同步机制,包括但不限于:

  • 互斥锁(sync.Mutex)和读写互斥锁(sync.RWMutex)
  • 通道(channels)
  • WaitGroups
  • 原子函数(atomic package)
  • 条件变量(sync.Cond)

2. 同步原语

2.1 互斥锁(sync.Mutex)

2.1.1 互斥锁的概念和作用

互斥锁(Mutex)是一种保证共享资源操作安全的同步机制,它确保在任何时刻只有一个协程可以持有该锁来访问共享资源。互斥锁通过LockUnlock方法来实现同步。调用Lock方法会阻塞,直到锁被释放,此时其他尝试获取该锁的协程会等待;调用Unlock会释放锁,让其他等待的协程有机会获取它。

var mu sync.Mutex

func criticalSection() {
    // 请求锁,以便独占资源
    mu.Lock()
    // 在这里访问共享资源
    // ...
    // 释放锁,其他协程可以获取锁
    mu.Unlock()
}

2.1.2 互斥锁的实用案例

假如我们要维护一个全局计数器,多个协程需要增加该计数器的值,使用互斥锁可以保证计数器的准确性。

var (
    mu      sync.Mutex
    counter int
)

func increment() {
    mu.Lock()         // 在修改counter之前加锁
    counter++         // 安全地对counter进行递增操作
    mu.Unlock()       // 操作完成后解锁,允许其他协程访问counter
}

func main() {
    for i := 0; i < 10; i++ {
        go increment()  // 启动多个协程增加计数器的值
    }
    // 等待一段时间(实际编程中需要使用WaitGroup或其他方式等待所有协程完成)
    time.Sleep(1 * time.Second)
    fmt.Println(counter)  // 输出计数器的值
}

2.2 读写互斥锁(sync.RWMutex)

2.2.1 读写互斥锁的概念

读写互斥锁(RWMutex)是一种特殊的锁,它允许多个协程同时读共享资源,但是写操作是独占的。与互斥锁相比,读写锁可以提高程序在多读场景中的性能。它有四个方法:RLockRUnlock对应读操作的加锁和解锁,LockUnlock对应写操作的加锁和解锁。

2.2.2 读写互斥锁的实用案例

在一个数据库应用中,读操作可能远远多于写操作。使用读写锁可以提高系统性能,因为它允许多个协程并发读。

var (
    rwMu  sync.RWMutex
    data  int
)

func readData() int {
    rwMu.RLock()         // 加读锁,其他读操作可以并发进行
    defer rwMu.RUnlock() // 使用defer确保锁能被释放
    return data          // 安全读取data
}

func writeData(newValue int) {
    rwMu.Lock()          // 加写锁,此时无法进行其他读或写操作
    data = newValue      // 安全写入新值
    rwMu.Unlock()        // 写入完成后解锁
}

func main() {
    go writeData(42)     // 开启协程执行写操作
    fmt.Println(readData()) // 主协程执行读操作
    // 再次使用WaitGroup或者其他同步方式确保所有协程完成
}

在上述例子中,多个读者可以同时执行readData函数,但写者在执行writeData时会阻止新的读者和其他写者。这种机制为读多写少的场景带来了性能优势。

2.3 条件变量(sync.Cond)

2.3.1 条件变量的概念

在 Go 语言的同步机制中,条件变量是用于等待或通知一些条件变化的同步原语。条件变量总是与互斥锁(sync.Mutex)一起使用,互斥锁用于保护条件本身的一致性。

条件变量的概念来自操作系统领域,它允许一组goroutines互相等待某个条件成立。更具体地说,一个goroutine可能在某个条件尚未满足时暂停执行等待,而另外的goroutine在更改了条件之后,可能会通过条件变量通知其他的goroutine重新开始执行。

在 Go 标准库中,条件变量通过 sync.Cond 类型提供,其主要方法包括:

  • Wait:调用这个方法会释放持有的锁并阻塞,直到其他goroutine在相同条件变量上调用SignalBroadcast唤醒它,之后它会再次尝试获取锁。
  • Signal:唤醒等待该条件变量的一个goroutine。如果没有goroutine在等待,调用该方法不会有任何效果。
  • Broadcast:唤醒等待该条件变量的所有goroutine。

条件变量不应被拷贝,所以一般作为某个结构体的指针字段来使用。

2.3.2 条件变量的实用案例

以下是一个使用条件变量的例子,它展示了一个简单的生产者-消费者模型:

package main

import (
    "fmt"
    "sync"
    "time"
)

// SafeQueue 使用互斥锁加以保护的安全队列
type SafeQueue struct {
    mu    sync.Mutex
    cond  *sync.Cond
    queue []interface{}
}

// Enqueue 将元素添加到队列尾部,并通知等待的goroutine
func (sq *SafeQueue) Enqueue(item interface{}) {
    sq.mu.Lock()
    defer sq.mu.Unlock()

    sq.queue = append(sq.queue, item)
    sq.cond.Signal() // 通知等待的goroutine,队列不为空
}

// Dequeue 从队列头部移除元素,如果队列为空则等待
func (sq *SafeQueue) Dequeue() interface{} {
    sq.mu.Lock()
    defer sq.mu.Unlock()

    // 当队列为空时等待
    for len(sq.queue) == 0 {
        sq.cond.Wait() // 等待条件变化
    }

    item := sq.queue[0]
    sq.queue = sq.queue[1:]
    return item
}

func main() {
    queue := make([]interface{}, 0)
    sq := SafeQueue{
        mu:    sync.Mutex{},
        cond:  sync.NewCond(&sync.Mutex{}),
        queue: queue,
    }

    // 生产者Goroutine
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(1 * time.Second)         // 模拟生产耗时
            sq.Enqueue(fmt.Sprintf("item%d", i)) // 生产一个元素
            fmt.Println("Produce:", i)
        }
    }()

    // 消费者Goroutine
    go func() {
        for i := 0; i < 5; i++ {
            item := sq.Dequeue() // 消费一个元素,如果队列为空则等待
            fmt.Printf("Consume: %v\n", item)
        }
    }()

    // 等待足够长的时间,确保所有的生产和消费都完成
    time.Sleep(10 * time.Second)
}

在这个例子中,我们定义了一个SafeQueue结构体,其内部有一个队列和一个条件变量。消费者调用Dequeue方法时,如果队列为空,它将使用Wait方法等待。当生产者调用Enqueue方法将新的元素入队时,它将调用Signal方法唤醒等待的消费者。

2.4 WaitGroup

2.4.1 WaitGroup的概念和用法

sync.WaitGroup是一个用来等待一组goroutine完成的同步机制。当你启动一个goroutine时,可以通过调用WaitGroupAdd方法增加计数器,每个goroutine完成时调用Done方法(实际上执行的是Add(-1))减少计数器。主goroutine可以通过调用Wait方法阻塞,直到计数器为0,即所有的goroutine都完成了它们的任务。

使用WaitGroup需要注意以下几点:

  • Add, DoneWait方法不是线程安全的,不应该在多个goroutine中并发调用。
  • Add方法应该在新创建的goroutine启动之前被调用。

2.4.2 WaitGroup的实用案例

下面是一个使用WaitGroup的例子:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 完成时通知WaitGroup

    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second) // 模拟耗时操作
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1) // 启动goroutine之前增加计数器
        go worker(i, &wg)
    }

    wg.Wait() // 等待所有工作goroutine完成
    fmt.Println("All workers done")
}

在这个例子中,worker函数模拟执行一项工作。主函数中,我们启动了五个worker goroutines。在启动每个goroutine之前,我们调用wg.Add(1)来通知WaitGroup有一个新的任务开始执行。在每个工作函数完成时,它会调用defer wg.Done()来通知WaitGroup任务已完成。在所有的goroutine启动之后,主函数会在wg.Wait()处阻塞,直到所有工作者都报告完成。

2.5 原子操作(sync/atomic)

2.5.1 原子操作的概念

原子操作 是指在并发编程中,一个操作在执行过程中不会被其他操作打断,即这个操作是不可分割的。对于多个goroutine,使用原子操作可以无需加锁的情况下保证数据的一致性和状态同步,因为原子操作自身就保证了执行的原子性。

在Go语言中,sync/atomic 包提供了底层的原子级内存操作。对于基本数据类型如int32, int64, uint32, uint64, uintptr, pointer等,可以使用sync/atomic包中的方法进行安全的并发操作。原子操作的重要性在于它是构建其他并发原语(如锁和条件变量)的基石,并且效率比锁机制通常要高。

2.5.2 原子操作的实用案例

考虑这样一个场景,我们需要统计一个网站的并发访客数。直观地使用一个计数器变量,当一个访客到来时增加计数器,当访客离开时减少计数器。但在并发环境中,这样做会导致数据竞争,因此我们可以使用sync/atomic包来安全地操作计数器。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

var visitorCount int32

func incrementVisitorCount() {
    atomic.AddInt32(&visitorCount, 1)
}

func decrementVisitorCount() {
    atomic.AddInt32(&visitorCount, -1)
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            incrementVisitorCount()
            time.Sleep(time.Second) // 访客访问的时间
            decrementVisitorCount()
            wg.Done()
        }()
    }
    wg.Wait()
    fmt.Printf("当前访客数: %d\n", visitorCount)
}

在这个示例中,我们创建了100个goroutine来模拟访客的到来与离开。通过atomic.AddInt32()函数保证了计数器的增加和减少是原子的,即使在并发很高的情况下,也能保证visitorCount的准确性。

2.6 通道(Channel)同步机制

2.6.1 通道的同步特性

通道(Channel)是Go语言在语言级提供的goroutine间通信方式。一个通道提供了发送和接收数据的能力。当一个goroutine尝试从一个通道读取数据时,如果通道里没有数据,它会阻塞直到有数据为止。同样,如果通道满了(对于无缓冲通道而言就是已经有数据),尝试发送数据的goroutine也会阻塞,直到有空间可写。这种特性使得通道在goroutine之间同步非常有用。

2.6.2 使用通道同步的案例

假设我们有一个任务需要多个goroutine分别完成其子任务,然后汇总所有子任务的结果。我们可以使用通道来等待所有的goroutine完成。

package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup, resultChan chan<- int) {
    defer wg.Done()
    // 执行某些操作...
    fmt.Printf("Worker %d starting\n", id)
    // 假设子任务结果就是worker的id号
    resultChan <- id
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 5
    resultChan := make(chan int, numWorkers)

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg, resultChan)
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    // 收集所有结果
    for result := range resultChan {
        fmt.Printf("Received result: %d\n", result)
    }
}

在本例中,我们启动了5个goroutine来执行任务,并且通过resultChan通道来收集结果。主goroutine在一个单独的goroutine中等待所有工作完成后关闭结果通道。随后,主goroutine遍历resultChan通道,收集并打印所有goroutine的运算结果。

2.7 一次性执行(sync.Once)

sync.Once 是一个同步原语,它可以保证在程序运行过程中只有一次执行一个操作。sync.Once 的典型用法是在单例对象的初始化中,或者是需要延迟初始化的场景。无论这个操作被多少goroutine调用,它都只会执行一次,这也是函数名 Do 的由来。

sync.Once 已经完美地处理了并发问题和执行效率的平衡,使得开发者不必担心重复初始化带来的性能问题。

举个简单的例子,演示sync.Once的用法:

package main

import (
    "fmt"
    "sync"
)

var once sync.Once
var instance *Singleton

type Singleton struct{}

func Instance() *Singleton {
    once.Do(func() {
        fmt.Println("Creating single instance now.")
        instance = &Singleton{}
    })
    return instance
}

func main() {
    for i := 0; i < 10; i++ {
        go Instance()
    }
    fmt.Scanln() // 等待以查看输出
}

在这个例子中,即使Instance函数被多次并发调用,创建Singleton实例的操作也只会执行一次,后续的调用都会直接返回第一次创建的实例,从而保证实例的唯一性。

2.8 ErrGroup

ErrGroup 是 Go 语言中用于同步多个go协程(goroutines)并收集它们的错误的库。它是 "golang.org/x/sync/errgroup" 包的一部分,并提供了一种简洁的方式来处理在并发操作中出错的情况。

2.8.1 ErrGroup的概念

ErrGroup 的核心思想是将一组相关的工作(通常是并发执行的)绑定在一起,并且如果其中一个工作出错,整个组的执行就会被取消。同时,如果这些并发操作中有任何一个返回错误,则 ErrGroup 会捕获并返回这个错误。

要使用 ErrGroup,首先需要引入包:

import "golang.org/x/sync/errgroup"

然后,创建一个 ErrGroup 的实例:

var g errgroup.Group

接着,就可以将任务以闭包的形式传递给 ErrGroup,通过调用 Go 方法启动一个新的 Goroutine:

g.Go(func() error {
    // 执行某项任务
    // 如果一切顺利
    return nil
    // 如果有错误发生
    // return fmt.Errorf("error occurred")
})

最后,调用 Wait 方法,这将会阻塞等待所有的任务完成。如果这些任务中有任何一个返回了错误,Wait 会返回这个错误:

if err := g.Wait(); err != nil {
    // 处理错误
    log.Fatalf("任务执行出错: %v", err)
}

2.8.2 ErrGroup的实用案例

假设有一个场景,我们需要从三个不同的数据源并发地获取数据,并且任何一个数据源如果失败了,我们希望立即取消其他的数据获取操作。使用 ErrGroup 可以轻松完成这个任务:

package main

import (
    "fmt"
    "golang.org/x/sync/errgroup"
)

func fetchDataFromSource1() error {
    // 模拟从数据源 1 获取数据
    return nil // 或者返回一个错误来模拟失败的情况
}

func fetchDataFromSource2() error {
    // 模拟从数据源 2 获取数据
    return nil // 或者返回一个错误来模拟失败的情况
}

func fetchDataFromSource3() error {
    // 模拟从数据源 3 获取数据
    return nil // 或者返回一个错误来模拟失败的情况
}

func main() {
    var g errgroup.Group

    g.Go(fetchDataFromSource1)
    g.Go(fetchDataFromSource2)
    g.Go(fetchDataFromSource3)

    // 等待所有的 goroutine 完成,并收集它们的错误
    if err := g.Wait(); err != nil {
        fmt.Printf("获取数据时发生错误: %v\n", err)
        return
    }

    fmt.Println("所有数据成功获取!")
}

在这个例子中,fetchDataFromSource1fetchDataFromSource2fetchDataFromSource3 函数模拟从不同的数据源中获取数据。它们被传递给 g.Go 方法,在独立的 Goroutine 中执行。如果任何一个函数返回了错误,g.Wait 会立即返回那个错误,我们可以在该错误出现时实现适当的错误处理。如果所有的函数都成功执行,g.Wait 就会返回 nil,表示所有任务都成功完成。

ErrGroup 还有一个重要的特性是,如果其中一个 Goroutine 出现 panic,它会尝试恢复(recover)此 panic,并将这个 panic 当作 error 返回。这样可以避免其他同时执行的 Goroutine 未能优雅的被关闭。当然,如果想要任务能够响应外部的取消信号,那么可以结合使用 errgroupWithContext 函数和 context 包来提供一个可以取消的上下文。

通过这种方式,ErrGroup 成为了 Go 语言并发编程实践中非常实用的同步和错误处理机制。