如何在Go中同步线程(go语言同步锁)
单线程代码已经带来头痛。添加第二个线程,就是从基础头痛升级了。
解决方案?互斥锁:线程和数据的交通警察。
一旦你理解了它们,线程同步就变成了第二本能,语言无关。
在C++和Go中工作,我遇到过所有常见的混乱:
- 有时会吞噬数据的竞态条件
- 线程践踏内存导致的段错误
- 还有沉默的杀手:死锁
最后一个是最糟糕的,没有崩溃,没有错误。只是一个死程序,卡在永恒的线程对峙中。
但当你理解了互斥锁背后的核心思想时,一切都开始变得清晰。
最好的部分?每种语言都说互斥锁:
- Go → sync.Mutex
- C++ → std::mutex
- Python → threading.Lock()
- Java → ReentrantLock
在这篇文章中,我将分解互斥锁的概念,展示死锁是如何发生的,并给你足够的直觉来处理任何语言中的线程代码。
学一次 → 到处应用。
互斥锁:互斥锁
线程引入了全新类别的问题,特别是在Go中,生成数千个线程实际上是免费的。
现在想象两个线程在完全相同的时间击中同一个数据源。那就是混乱。竞态条件、数据损坏、神秘bug,你不想调试的东西,更不用说向你的团队解释了。
进入互斥锁: 线程和共享数据之间的交通警察。
没有锁:
线程A ---> 数据源 <--- 线程B
有锁(两个线程之间共享):
线程A [锁]---> 数据源 <---[锁] 线程B
互斥锁的工作很简单:一次只有一个线程进入。
如果线程A拥有锁,线程B被告知:"等待你的轮次。"
这里是一个简单的切片访问示例,有和没有锁:
没有锁:
package main
import (
"fmt"
"time"
)
func main() {
var numbers []int
// 启动5个goroutine,都向同一个切片追加
for i := 0; i < 5; i++ {
go func(n int) {
// 这里没有锁定,这很可能会导致数据竞争
numbers = append(numbers, n)
fmt.Println("追加", n, "→", numbers)
}(i)
}
// 给它们一点时间运行
time.Sleep(1 * time.Second)
}
有锁:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var (
numbers []int
mu sync.Mutex
)
for i := 0; i < 5; i++ {
go func(n int) {
mu.Lock() // 获取锁
defer mu.Unlock() // 确保它被释放,即使在panic时
numbers = append(numbers, n)
fmt.Println("追加", n, "→", numbers)
}(i)
}
time.Sleep(1 * time.Second)
}
注意我们如何做:
mu.Lock()
defer mu.Unlock()
defer保证无论我们如何退出那个goroutine,正常返回或panic,锁都会被释放。
一旦goroutine接触共享数据,就锁定它。 相信我,未来的你会感激的。
那么,死锁到底是什么?
死锁
回到我们的交通警察类比:
线程A [锁]---> 数据源 <---[锁] 线程B
这有效是因为一个共享锁控制访问。但当我们引入同一车道中的两个共享锁时会发生什么?
线程A [锁]--[锁]-> 数据源 <---[锁] 线程B
现在你有两个交通警察,都不知道谁负责。线程A卡在等待两者,永远在困惑中来回弹跳。这就是经典死锁。
通常的嫌疑人?相同的嵌套锁,调用一个从已经持有锁的另一个函数内部获取锁的函数。
这是一个真实世界的例子:
func (m *ScheduledTask) Create(...) (task, error) {
m.mu.Lock() // 锁1
defer m.mu.Unlock() // 在结束时解锁1
// ... 设置任务 ...
if err := m.saveTasks(); err != nil { // 内部的锁2
return task{}, err
}
return t, nil
}
现在看看saveTasks内部:
func (m *ScheduledTask) saveTasks() error {
m.mu.Lock() // 锁2(再次)
defer m.mu.Unlock()
data, err := json.MarshalIndent(m.tasks, "", " ")
if err != nil {
return err
}
return os.WriteFile(tasks, data, 0644)
}
死锁。
为什么?因为Create()已经持有锁,而saveTasks()试图再次获取它,在第一个被释放之前。Go例程不会抱怨,它们只是默默地冻结。没有崩溃,没有堆栈跟踪,只是一个僵尸线程吞噬资源。
主线程呢?完全不知道。继续运行,而你的程序挂在边缘。
如果你认真对待构建真实世界的软件,你需要理解同步。
这些概念适用于所有语言。这是C++版本:
std::lock_guard<std::mutex> lk(globalIPCData.mapMutex); // 在访问前锁定
UIelement& u = uiSet.get(entityId);
学好这个。
一旦你将互斥锁视为具有绝对权威的交通警察,大多数线程问题就消失了。
我将在Substack上发布更多关于后端主题、JavaScript、Golang、C++和低级系统的深度探讨。希望你在那里;来打个招呼:
Coffee & Kernels
更多内容:
深度学习指南
深度学习搭便车指南:Python和JS示例。
深度学习:Pytorch和Tensorflow.js示例。
如何不那么糟糕地使用数据库
如何不那么糟糕地使用数据库和数据系统,带JavaScript示例。
数据库、数据系统和意图语言。
多语言互斥锁示例
C++ →std::mutex
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
int main() {
int counter = 0;
std::mutex m;
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.emplace_back([&](){
std::lock_guard<std::mutex> lock(m); // RAII:构造函数锁定,析构函数解锁
++counter; // 临界区
std::cout << "C++计数器: " << counter << "\n";
});
}
for (auto &t : threads) t.join();
return 0;
}
Python →threading.Lock()
import threading
counter = 0
lock = threading.Lock()
def worker():
global counter
with lock: # 上下文管理器获取和释放
counter += 1 # 临界区
print(f"Python计数器: {counter}")
threads = []
for _ in range(5):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for t in threads:
t.join()
Java →ReentrantLock
import java.util.concurrent.locks.ReentrantLock;
public class Main {
private static int counter = 0;
private static final ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
threads[i] = new Thread(() -> {
lock.lock(); // 获取
try {
counter++; // 临界区
System.out.println("Java计数器: " + counter);
} finally {
lock.unlock(); // 释放
}
});
threads[i].start();
}
for (Thread t : threads) t.join();
}
}
关键要点
无论什么语言,配方都是:
- 获取锁/互斥锁,然后接触共享数据
- 做你最小的临界工作
- 释放锁/互斥锁(或使用作用域/RAII/上下文自动释放)
实际应用示例
1. 银行账户示例
package main
import (
"fmt"
"sync"
"time"
)
type BankAccount struct {
balance int
mu sync.Mutex
}
func (ba *BankAccount) Deposit(amount int) {
ba.mu.Lock()
defer ba.mu.Unlock()
ba.balance += amount
fmt.Printf("存款 %d,余额: %d\n", amount, ba.balance)
}
func (ba *BankAccount) Withdraw(amount int) bool {
ba.mu.Lock()
defer ba.mu.Unlock()
if ba.balance >= amount {
ba.balance -= amount
fmt.Printf("取款 %d,余额: %d\n", amount, ba.balance)
return true
}
fmt.Printf("余额不足,无法取款 %d\n", amount)
return false
}
func (ba *BankAccount) GetBalance() int {
ba.mu.Lock()
defer ba.mu.Unlock()
return ba.balance
}
func main() {
account := &BankAccount{balance: 1000}
// 启动多个goroutine同时操作账户
for i := 0; i < 5; i++ {
go func(id int) {
account.Deposit(100)
time.Sleep(10 * time.Millisecond)
account.Withdraw(50)
}(i)
}
time.Sleep(1 * time.Second)
fmt.Printf("最终余额: %d\n", account.GetBalance())
}
2. 缓存示例
package main
import (
"fmt"
"sync"
"time"
)
type Cache struct {
data map[string]interface{}
mu sync.RWMutex // 读写锁,允许多个读取者
}
func NewCache() *Cache {
return &Cache{
data: make(map[string]interface{}),
}
}
func (c *Cache) Set(key string, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
fmt.Printf("设置 %s = %v\n", key, value)
}
func (c *Cache) Get(key string) (interface{}, bool) {
c.mu.RLock() // 读锁,允许多个goroutine同时读取
defer c.mu.RUnlock()
value, exists := c.data[key]
if exists {
fmt.Printf("获取 %s = %v\n", key, value)
} else {
fmt.Printf("键 %s 不存在\n", key)
}
return value, exists
}
func (c *Cache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.data, key)
fmt.Printf("删除键 %s\n", key)
}
func main() {
cache := NewCache()
// 启动多个读取者
for i := 0; i < 3; i++ {
go func(id int) {
for j := 0; j < 5; j++ {
cache.Get("key1")
time.Sleep(50 * time.Millisecond)
}
}(i)
}
// 启动写入者
go func() {
for i := 0; i < 3; i++ {
cache.Set("key1", fmt.Sprintf("value%d", i))
time.Sleep(100 * time.Millisecond)
}
}()
time.Sleep(1 * time.Second)
}
3. 工作池示例
package main
import (
"fmt"
"sync"
"time"
)
type WorkerPool struct {
jobs chan int
results chan int
wg sync.WaitGroup
}
func NewWorkerPool(numWorkers int) *WorkerPool {
return &WorkerPool{
jobs: make(chan int, 100),
results: make(chan int, 100),
}
}
func (wp *WorkerPool) Start(numWorkers int) {
for i := 0; i < numWorkers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
fmt.Printf("工作者 %d 处理任务 %d\n", id, job)
time.Sleep(100 * time.Millisecond) // 模拟工作
wp.results <- job * 2 // 返回结果
}
}
func (wp *WorkerPool) AddJob(job int) {
wp.jobs <- job
}
func (wp *WorkerPool) Close() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
}
func main() {
pool := NewWorkerPool(3)
pool.Start(3)
// 添加任务
for i := 1; i <= 10; i++ {
pool.AddJob(i)
}
// 关闭工作池并等待完成
go func() {
pool.Close()
}()
// 收集结果
for result := range pool.results {
fmt.Printf("收到结果: %d\n", result)
}
}
4. 条件变量示例
package main
import (
"fmt"
"sync"
"time"
)
type ProducerConsumer struct {
buffer []int
mu sync.Mutex
cond *sync.Cond
size int
}
func NewProducerConsumer(size int) *ProducerConsumer {
pc := &ProducerConsumer{
buffer: make([]int, 0, size),
size: size,
}
pc.cond = sync.NewCond(&pc.mu)
return pc
}
func (pc *ProducerConsumer) Produce(item int) {
pc.mu.Lock()
defer pc.mu.Unlock()
// 等待缓冲区有空间
for len(pc.buffer) >= pc.size {
fmt.Printf("生产者等待,缓冲区已满\n")
pc.cond.Wait()
}
pc.buffer = append(pc.buffer, item)
fmt.Printf("生产: %d,缓冲区大小: %d\n", item, len(pc.buffer))
// 通知消费者
pc.cond.Signal()
}
func (pc *ProducerConsumer) Consume() int {
pc.mu.Lock()
defer pc.mu.Unlock()
// 等待缓冲区有数据
for len(pc.buffer) == 0 {
fmt.Printf("消费者等待,缓冲区为空\n")
pc.cond.Wait()
}
item := pc.buffer[0]
pc.buffer = pc.buffer[1:]
fmt.Printf("消费: %d,缓冲区大小: %d\n", item, len(pc.buffer))
// 通知生产者
pc.cond.Signal()
return item
}
func main() {
pc := NewProducerConsumer(3)
// 启动生产者
go func() {
for i := 1; i <= 10; i++ {
pc.Produce(i)
time.Sleep(200 * time.Millisecond)
}
}()
// 启动消费者
go func() {
for i := 0; i < 10; i++ {
pc.Consume()
time.Sleep(300 * time.Millisecond)
}
}()
time.Sleep(5 * time.Second)
}
最佳实践
1. 锁的粒度
// 不好的做法:锁太大
type BadExample struct {
mu sync.Mutex
data1 map[string]int
data2 map[string]int
}
func (b *BadExample) UpdateData1(key string, value int) {
b.mu.Lock()
defer b.mu.Unlock()
b.data1[key] = value // 只需要锁定data1
// 但锁住了整个结构体
}
// 好的做法:细粒度锁
type GoodExample struct {
mu1 sync.Mutex
mu2 sync.Mutex
data1 map[string]int
data2 map[string]int
}
func (g *GoodExample) UpdateData1(key string, value int) {
g.mu1.Lock()
defer g.mu1.Unlock()
g.data1[key] = value // 只锁定需要的部分
}
2. 避免死锁
// 可能导致死锁的代码
func (s *Service) Method1() {
s.mu1.Lock()
defer s.mu1.Unlock()
s.Method2() // 可能尝试获取mu1,导致死锁
}
func (s *Service) Method2() {
s.mu1.Lock() // 死锁!
defer s.mu1.Unlock()
}
// 解决方案:使用sync.RWMutex或重新设计
type Service struct {
mu sync.RWMutex
}
func (s *Service) Method1() {
s.mu.Lock()
defer s.mu.Unlock()
s.Method2() // 现在安全了
}
func (s *Service) Method2() {
s.mu.RLock() // 读锁,不会死锁
defer s.mu.RUnlock()
}
3. 使用sync.Once
package main
import (
"fmt"
"sync"
)
type Singleton struct {
data string
}
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{data: "单例数据"}
fmt.Println("创建单例实例")
})
return instance
}
func main() {
// 多个goroutine同时调用
for i := 0; i < 5; i++ {
go func(id int) {
instance := GetInstance()
fmt.Printf("Goroutine %d 获取实例: %v\n", id, instance)
}(i)
}
// 等待所有goroutine完成
time.Sleep(1 * time.Second)
}
性能考虑
1. 锁竞争
// 高竞争情况
func HighContention() {
var counter int
var mu sync.Mutex
for i := 0; i < 1000; i++ {
go func() {
mu.Lock()
counter++
mu.Unlock()
}()
}
}
// 减少竞争:使用原子操作
import "sync/atomic"
func LowContention() {
var counter int64
for i := 0; i < 1000; i++ {
go func() {
atomic.AddInt64(&counter, 1)
}()
}
}
2. 读写锁优化
type OptimizedCache struct {
data map[string]interface{}
mu sync.RWMutex
}
func (c *OptimizedCache) Get(key string) (interface{}, bool) {
c.mu.RLock() // 多个读取者可以同时访问
defer c.mu.RUnlock()
return c.data[key], true
}
func (c *OptimizedCache) Set(key string, value interface{}) {
c.mu.Lock() // 写入者独占访问
defer c.mu.Unlock()
c.data[key] = value
}
调试技巧
1. 使用race detector
go run -race your_program.go
2. 添加调试信息
type DebugMutex struct {
sync.Mutex
name string
}
func (dm *DebugMutex) Lock() {
fmt.Printf("尝试锁定: %s\n", dm.name)
dm.Mutex.Lock()
fmt.Printf("已锁定: %s\n", dm.name)
}
func (dm *DebugMutex) Unlock() {
fmt.Printf("解锁: %s\n", dm.name)
dm.Mutex.Unlock()
}