当前位置:首页 > 技术知识 > 正文内容

如何在Go中同步线程(go语言同步锁)

maynowei2周前 (08-03)技术知识10

单线程代码已经带来头痛。添加第二个线程,就是从基础头痛升级了。

解决方案?互斥锁:线程和数据的交通警察。
一旦你理解了它们,线程同步就变成了第二本能,语言无关。

在C++和Go中工作,我遇到过所有常见的混乱:

  • 有时会吞噬数据的竞态条件
  • 线程践踏内存导致的段错误
  • 还有沉默的杀手:死锁

最后一个是最糟糕的,没有崩溃,没有错误。只是一个死程序,卡在永恒的线程对峙中。

但当你理解了互斥锁背后的核心思想时,一切都开始变得清晰。

最好的部分?每种语言都说互斥锁:

  • Go → sync.Mutex
  • C++ → std::mutex
  • Python → threading.Lock()
  • Java → ReentrantLock

在这篇文章中,我将分解互斥锁的概念,展示死锁是如何发生的,并给你足够的直觉来处理任何语言中的线程代码。

学一次 → 到处应用。


互斥锁:互斥锁

线程引入了全新类别的问题,特别是在Go中,生成数千个线程实际上是免费的。

现在想象两个线程在完全相同的时间击中同一个数据源。那就是混乱。竞态条件、数据损坏、神秘bug,你不想调试的东西,更不用说向你的团队解释了。

进入互斥锁: 线程和共享数据之间的交通警察。

没有锁:

线程A  --->     数据源  <--- 线程B

有锁(两个线程之间共享):

线程A  [锁]--->     数据源  <---[锁] 线程B

互斥锁的工作很简单:一次只有一个线程进入。
如果线程A拥有锁,线程B被告知:"等待你的轮次。"

这里是一个简单的切片访问示例,有和没有锁:

没有锁:

package main

import (
    "fmt"
    "time"
)

func main() {
    var numbers []int

    // 启动5个goroutine,都向同一个切片追加
    for i := 0; i < 5; i++ {
        go func(n int) {
            // 这里没有锁定,这很可能会导致数据竞争
            numbers = append(numbers, n)
            fmt.Println("追加", n, "→", numbers)
        }(i)
    }

    // 给它们一点时间运行
    time.Sleep(1 * time.Second)
}

有锁:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var (
        numbers []int
        mu      sync.Mutex
    )

    for i := 0; i < 5; i++ {
        go func(n int) {
            mu.Lock()           // 获取锁
            defer mu.Unlock()   // 确保它被释放,即使在panic时

            numbers = append(numbers, n)
            fmt.Println("追加", n, "→", numbers)
        }(i)
    }

    time.Sleep(1 * time.Second)
}

注意我们如何做:

mu.Lock()
defer mu.Unlock()

defer保证无论我们如何退出那个goroutine,正常返回或panic,锁都会被释放。

一旦goroutine接触共享数据,就锁定它。 相信我,未来的你会感激的。


那么,死锁到底是什么?


死锁

回到我们的交通警察类比:

线程A  [锁]--->     数据源  <---[锁] 线程B

这有效是因为一个共享锁控制访问。但当我们引入同一车道中的两个共享锁时会发生什么?

线程A  [锁]--[锁]->     数据源  <---[锁] 线程B

现在你有两个交通警察,都不知道谁负责。线程A卡在等待两者,永远在困惑中来回弹跳。这就是经典死锁。

通常的嫌疑人?相同的嵌套锁,调用一个从已经持有锁的另一个函数内部获取锁的函数。

这是一个真实世界的例子:

func (m *ScheduledTask) Create(...) (task, error) {
    m.mu.Lock()                // 锁1
    defer m.mu.Unlock()        // 在结束时解锁1

    // ... 设置任务 ...

    if err := m.saveTasks(); err != nil {  // 内部的锁2
        return task{}, err
    }

    return t, nil
}

现在看看saveTasks内部:

func (m *ScheduledTask) saveTasks() error {
    m.mu.Lock()               // 锁2(再次)
    defer m.mu.Unlock()

    data, err := json.MarshalIndent(m.tasks, "", "  ")
    if err != nil {
        return err
    }

    return os.WriteFile(tasks, data, 0644)
}

死锁。
为什么?因为Create()已经持有锁,而saveTasks()试图再次获取它,第一个被释放之前。Go例程不会抱怨,它们只是默默地冻结。没有崩溃,没有堆栈跟踪,只是一个僵尸线程吞噬资源。

主线程呢?完全不知道。继续运行,而你的程序挂在边缘。


如果你认真对待构建真实世界的软件,你需要理解同步。

这些概念适用于所有语言。这是C++版本:

std::lock_guard<std::mutex> lk(globalIPCData.mapMutex);  // 在访问前锁定
UIelement& u = uiSet.get(entityId);

学好这个。

一旦你将互斥锁视为具有绝对权威的交通警察,大多数线程问题就消失了。

我将在Substack上发布更多关于后端主题、JavaScript、Golang、C++和低级系统的深度探讨。希望你在那里;来打个招呼:

Coffee & Kernels

更多内容:

深度学习指南

深度学习搭便车指南:Python和JS示例。

深度学习:Pytorch和Tensorflow.js示例。

如何不那么糟糕地使用数据库

如何不那么糟糕地使用数据库和数据系统,带JavaScript示例。

数据库、数据系统和意图语言。


多语言互斥锁示例

C++ →std::mutex

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

int main() {
    int counter = 0;
    std::mutex m;
    std::vector<std::thread> threads;

    for (int i = 0; i < 5; ++i) {
        threads.emplace_back([&](){
            std::lock_guard<std::mutex> lock(m); // RAII:构造函数锁定,析构函数解锁
            ++counter;                            // 临界区
            std::cout << "C++计数器: " << counter << "\n";
        });
    }

    for (auto &t : threads) t.join();
    return 0;
}

Python →threading.Lock()

import threading

counter = 0
lock = threading.Lock()

def worker():
    global counter
    with lock:                # 上下文管理器获取和释放
        counter += 1          # 临界区
        print(f"Python计数器: {counter}")

threads = []
for _ in range(5):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for t in threads:
    t.join()

Java →ReentrantLock

import java.util.concurrent.locks.ReentrantLock;

public class Main {
    private static int counter = 0;
    private static final ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[5];

        for (int i = 0; i < 5; i++) {
            threads[i] = new Thread(() -> {
                lock.lock();     //  获取
                try {
                    counter++;   // 临界区
                    System.out.println("Java计数器: " + counter);
                } finally {
                    lock.unlock(); //  释放
                }
            });
            threads[i].start();
        }

        for (Thread t : threads) t.join();
    }
}

关键要点

无论什么语言,配方都是:

  1. 获取锁/互斥锁,然后接触共享数据
  2. 你最小的临界工作
  3. 释放锁/互斥锁(或使用作用域/RAII/上下文自动释放)

实际应用示例

1. 银行账户示例

package main

import (
    "fmt"
    "sync"
    "time"
)

type BankAccount struct {
    balance int
    mu      sync.Mutex
}

func (ba *BankAccount) Deposit(amount int) {
    ba.mu.Lock()
    defer ba.mu.Unlock()
    
    ba.balance += amount
    fmt.Printf("存款 %d,余额: %d\n", amount, ba.balance)
}

func (ba *BankAccount) Withdraw(amount int) bool {
    ba.mu.Lock()
    defer ba.mu.Unlock()
    
    if ba.balance >= amount {
        ba.balance -= amount
        fmt.Printf("取款 %d,余额: %d\n", amount, ba.balance)
        return true
    }
    
    fmt.Printf("余额不足,无法取款 %d\n", amount)
    return false
}

func (ba *BankAccount) GetBalance() int {
    ba.mu.Lock()
    defer ba.mu.Unlock()
    
    return ba.balance
}

func main() {
    account := &BankAccount{balance: 1000}
    
    // 启动多个goroutine同时操作账户
    for i := 0; i < 5; i++ {
        go func(id int) {
            account.Deposit(100)
            time.Sleep(10 * time.Millisecond)
            account.Withdraw(50)
        }(i)
    }
    
    time.Sleep(1 * time.Second)
    fmt.Printf("最终余额: %d\n", account.GetBalance())
}

2. 缓存示例

package main

import (
    "fmt"
    "sync"
    "time"
)

type Cache struct {
    data map[string]interface{}
    mu   sync.RWMutex // 读写锁,允许多个读取者
}

func NewCache() *Cache {
    return &Cache{
        data: make(map[string]interface{}),
    }
}

func (c *Cache) Set(key string, value interface{}) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    c.data[key] = value
    fmt.Printf("设置 %s = %v\n", key, value)
}

func (c *Cache) Get(key string) (interface{}, bool) {
    c.mu.RLock() // 读锁,允许多个goroutine同时读取
    defer c.mu.RUnlock()
    
    value, exists := c.data[key]
    if exists {
        fmt.Printf("获取 %s = %v\n", key, value)
    } else {
        fmt.Printf("键 %s 不存在\n", key)
    }
    return value, exists
}

func (c *Cache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    delete(c.data, key)
    fmt.Printf("删除键 %s\n", key)
}

func main() {
    cache := NewCache()
    
    // 启动多个读取者
    for i := 0; i < 3; i++ {
        go func(id int) {
            for j := 0; j < 5; j++ {
                cache.Get("key1")
                time.Sleep(50 * time.Millisecond)
            }
        }(i)
    }
    
    // 启动写入者
    go func() {
        for i := 0; i < 3; i++ {
            cache.Set("key1", fmt.Sprintf("value%d", i))
            time.Sleep(100 * time.Millisecond)
        }
    }()
    
    time.Sleep(1 * time.Second)
}

3. 工作池示例

package main

import (
    "fmt"
    "sync"
    "time"
)

type WorkerPool struct {
    jobs    chan int
    results chan int
    wg      sync.WaitGroup
}

func NewWorkerPool(numWorkers int) *WorkerPool {
    return &WorkerPool{
        jobs:    make(chan int, 100),
        results: make(chan int, 100),
    }
}

func (wp *WorkerPool) Start(numWorkers int) {
    for i := 0; i < numWorkers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    
    for job := range wp.jobs {
        fmt.Printf("工作者 %d 处理任务 %d\n", id, job)
        time.Sleep(100 * time.Millisecond) // 模拟工作
        wp.results <- job * 2 // 返回结果
    }
}

func (wp *WorkerPool) AddJob(job int) {
    wp.jobs <- job
}

func (wp *WorkerPool) Close() {
    close(wp.jobs)
    wp.wg.Wait()
    close(wp.results)
}

func main() {
    pool := NewWorkerPool(3)
    pool.Start(3)
    
    // 添加任务
    for i := 1; i <= 10; i++ {
        pool.AddJob(i)
    }
    
    // 关闭工作池并等待完成
    go func() {
        pool.Close()
    }()
    
    // 收集结果
    for result := range pool.results {
        fmt.Printf("收到结果: %d\n", result)
    }
}

4. 条件变量示例

package main

import (
    "fmt"
    "sync"
    "time"
)

type ProducerConsumer struct {
    buffer []int
    mu     sync.Mutex
    cond   *sync.Cond
    size   int
}

func NewProducerConsumer(size int) *ProducerConsumer {
    pc := &ProducerConsumer{
        buffer: make([]int, 0, size),
        size:   size,
    }
    pc.cond = sync.NewCond(&pc.mu)
    return pc
}

func (pc *ProducerConsumer) Produce(item int) {
    pc.mu.Lock()
    defer pc.mu.Unlock()
    
    // 等待缓冲区有空间
    for len(pc.buffer) >= pc.size {
        fmt.Printf("生产者等待,缓冲区已满\n")
        pc.cond.Wait()
    }
    
    pc.buffer = append(pc.buffer, item)
    fmt.Printf("生产: %d,缓冲区大小: %d\n", item, len(pc.buffer))
    
    // 通知消费者
    pc.cond.Signal()
}

func (pc *ProducerConsumer) Consume() int {
    pc.mu.Lock()
    defer pc.mu.Unlock()
    
    // 等待缓冲区有数据
    for len(pc.buffer) == 0 {
        fmt.Printf("消费者等待,缓冲区为空\n")
        pc.cond.Wait()
    }
    
    item := pc.buffer[0]
    pc.buffer = pc.buffer[1:]
    fmt.Printf("消费: %d,缓冲区大小: %d\n", item, len(pc.buffer))
    
    // 通知生产者
    pc.cond.Signal()
    
    return item
}

func main() {
    pc := NewProducerConsumer(3)
    
    // 启动生产者
    go func() {
        for i := 1; i <= 10; i++ {
            pc.Produce(i)
            time.Sleep(200 * time.Millisecond)
        }
    }()
    
    // 启动消费者
    go func() {
        for i := 0; i < 10; i++ {
            pc.Consume()
            time.Sleep(300 * time.Millisecond)
        }
    }()
    
    time.Sleep(5 * time.Second)
}

最佳实践

1. 锁的粒度

// 不好的做法:锁太大
type BadExample struct {
    mu    sync.Mutex
    data1 map[string]int
    data2 map[string]int
}

func (b *BadExample) UpdateData1(key string, value int) {
    b.mu.Lock()
    defer b.mu.Unlock()
    
    b.data1[key] = value // 只需要锁定data1
    // 但锁住了整个结构体
}

// 好的做法:细粒度锁
type GoodExample struct {
    mu1   sync.Mutex
    mu2   sync.Mutex
    data1 map[string]int
    data2 map[string]int
}

func (g *GoodExample) UpdateData1(key string, value int) {
    g.mu1.Lock()
    defer g.mu1.Unlock()
    
    g.data1[key] = value // 只锁定需要的部分
}

2. 避免死锁

// 可能导致死锁的代码
func (s *Service) Method1() {
    s.mu1.Lock()
    defer s.mu1.Unlock()
    
    s.Method2() // 可能尝试获取mu1,导致死锁
}

func (s *Service) Method2() {
    s.mu1.Lock() // 死锁!
    defer s.mu1.Unlock()
}

// 解决方案:使用sync.RWMutex或重新设计
type Service struct {
    mu sync.RWMutex
}

func (s *Service) Method1() {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    s.Method2() // 现在安全了
}

func (s *Service) Method2() {
    s.mu.RLock() // 读锁,不会死锁
    defer s.mu.RUnlock()
}

3. 使用sync.Once

package main

import (
    "fmt"
    "sync"
)

type Singleton struct {
    data string
}

var (
    instance *Singleton
    once     sync.Once
)

func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{data: "单例数据"}
        fmt.Println("创建单例实例")
    })
    return instance
}

func main() {
    // 多个goroutine同时调用
    for i := 0; i < 5; i++ {
        go func(id int) {
            instance := GetInstance()
            fmt.Printf("Goroutine %d 获取实例: %v\n", id, instance)
        }(i)
    }
    
    // 等待所有goroutine完成
    time.Sleep(1 * time.Second)
}

性能考虑

1. 锁竞争

// 高竞争情况
func HighContention() {
    var counter int
    var mu sync.Mutex
    
    for i := 0; i < 1000; i++ {
        go func() {
            mu.Lock()
            counter++
            mu.Unlock()
        }()
    }
}

// 减少竞争:使用原子操作
import "sync/atomic"

func LowContention() {
    var counter int64
    
    for i := 0; i < 1000; i++ {
        go func() {
            atomic.AddInt64(&counter, 1)
        }()
    }
}

2. 读写锁优化

type OptimizedCache struct {
    data map[string]interface{}
    mu   sync.RWMutex
}

func (c *OptimizedCache) Get(key string) (interface{}, bool) {
    c.mu.RLock() // 多个读取者可以同时访问
    defer c.mu.RUnlock()
    
    return c.data[key], true
}

func (c *OptimizedCache) Set(key string, value interface{}) {
    c.mu.Lock() // 写入者独占访问
    defer c.mu.Unlock()
    
    c.data[key] = value
}

调试技巧

1. 使用race detector

go run -race your_program.go

2. 添加调试信息

type DebugMutex struct {
    sync.Mutex
    name string
}

func (dm *DebugMutex) Lock() {
    fmt.Printf("尝试锁定: %s\n", dm.name)
    dm.Mutex.Lock()
    fmt.Printf("已锁定: %s\n", dm.name)
}

func (dm *DebugMutex) Unlock() {
    fmt.Printf("解锁: %s\n", dm.name)
    dm.Mutex.Unlock()
}

相关文章

高效办公,你值得拥有之原型工具AXURE篇

简介 Axure RP是美国Axure Software Solution公司旗舰产品,是一个专业的快速原型设计工具,让负责定义需求和规格、设计功能和界面的专家能够快速创建应用软件或Web网站的线框图...

Objective-C :Category(category什么意思)

Category 引入在日常的开发中,可能会碰到这样的需求:给某个类增加方法。比如说,需要给NSString类增加一个打印的方法。当然,我们可以新建一个类比如TestString,并继承NSStrin...

Windows 加密盘BitLocker爆锁屏绕过严重漏洞

BitLocker Windows内置现代设备级数据加密保护功能,BitLocker与Windows内核深度集成。有大量的企业和个人使用BitLocker加密自己关键数据,以防止数据泄密。BitLoc...

真来了,iOS 16.6 beta 利用,隐藏 Dock 栏

昨天提到!iOS 16.5 kfd 漏洞可以隐藏 Dock 栏消息,现在已经确定 iOS 16.6 beta 内测也是支持使用 kfd 漏洞,当然!也是支持隐藏 Dock 栏,主要验证该系统是否可用。...

微软宣布SQL Server 2016,2005版将结束支持

IT之家讯 在芝加哥Ignite大会上,微软宣布了SQL Server 2016,并将于今年夏季发布公开预览版。SQL Server是由微软开发的关系型数据库管理系统,用于软件应用请求数据的存储和管理...

微软明年要停止SQL Server 2005的技术支持了

站长之家(Chinaz.com)12月28日消息据外媒消息称,微软将于明年停止为SQL Server 2005提供技术支持,即不再为其提供新的安全补丁、新功能、应用升级等服务。且表示在停止技术支持后,...