Go 并发编程:从 goroutine 到 channel 的实践指南
引言
“不要通过共享内存来通信,而要通过通信来共享内存。”
这句 Go 语言的经典箴言,道出了 Go 并发模型的核心思想。与传统的多线程编程不同,Go 并没有让开发者直接面对线程、锁、条件变量这些底层概念,而是提供了 goroutine 和 channel 这两个优雅的抽象。
goroutine 是轻量级的"线程",一个 Go 程序可以轻松创建数十万个 goroutine 而不会耗尽资源;channel 则是 goroutine 之间通信的管道,它让数据传递变得安全而简单。这种设计使得并发编程不再是高阶程序员的专属技能,而是每个 Go 开发者都能轻松掌握的日常工具。
本文将带你从零开始,全面掌握 Go 并发编程的核心知识,并通过大量实战代码,让你真正理解并能够灵活运用 goroutine 和 channel。
一、goroutine:轻量级并发
1.1 什么是 goroutine
goroutine 是 Go 运行时管理的轻量级线程。它的创建成本极低——初始栈大小只有 2KB,而操作系统线程的栈通常为 1-8MB。这意味着你可以轻松创建成千上万个 goroutine。
对比表:
| 特性 | 操作系统线程 | goroutine |
|---|---|---|
| 栈大小 | 固定 1-8MB | 初始 2KB,可动态增长 |
| 创建成本 | 高(需要系统调用) | 低(用户态创建) |
| 切换成本 | 高(陷入内核) | 低(用户态切换) |
| 数量限制 | 通常数千 | 可达数百万 |
1.2 启动 goroutine
只需在函数调用前加上 go 关键字:
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello from goroutine")
}
func main() {
// 启动 goroutine
go sayHello()
// 匿名函数
go func() {
fmt.Println("Hello from anonymous goroutine")
}()
// 等待 goroutine 执行(不推荐,仅用于演示)
time.Sleep(100 * time.Millisecond)
}
1.3 goroutine 的调度模型:GMP
理解 GMP 模型是深入理解 goroutine 的关键:
┌─────────────────────────────────────┐
│ Global Queue │
│ (全局运行队列) │
└──────────────────┬──────────────────┘
│
┌──────────────────────────────┼──────────────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ P0 │ │ P1 │ │ P2 │
│ (处理器) │ │ (处理器) │ │ (处理器) │
│ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │
│ │ Local │ │ │ │ Local │ │ │ │ Local │ │
│ │ Queue │ │ │ │ Queue │ │ │ │ Queue │ │
│ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │
│ │ │ │ │ │ │ │ │
│ ▼ │ │ ▼ │ │ ▼ │
│ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │
│ │ M0 │ │ │ │ M1 │ │ │ │ M2 │ │
│ │ (系统线程) │ │ │ │ (系统线程) │ │ │ │ (系统线程) │ │
│ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │
│ │ │ │ │ │ │ │ │
└───────┼───────┘ └───────┼───────┘ └───────┼───────┘
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Goroutine │ │ Goroutine │ │ Goroutine │
│ (G1, G2...) │ │ (G3, G4...) │ │ (G5, G6...) │
└───────────────┘ └───────────────┘ └───────────────┘
GMP 三要素:
| 组件 | 全称 | 说明 |
|---|---|---|
| G | Goroutine | 用户态的轻量级线程,代表一个任务 |
| M | Machine | 操作系统线程,负责执行 G 的代码 |
| P | Processor | 调度器,负责管理 G 的本地队列,将 G 分配给 M 执行 |
调度策略:
- 本地队列优先:每个 P 有自己的本地队列,避免锁竞争
- 工作窃取(Work Stealing):当 P 的本地队列为空时,会从其他 P 窃取一半任务
- 抢占式调度:Go 1.14+ 实现了基于信号的抢占,防止长时间占用的 goroutine 阻塞其他任务
1.4 等待 goroutine 完成
time.Sleep 显然不是可靠的等待方式。更好的方法是使用 sync.WaitGroup:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 任务完成时计数器减1
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // 计数器加1
go worker(i, &wg)
}
wg.Wait() // 等待所有 goroutine 完成
fmt.Println("所有工作完成")
}
二、channel:goroutine 之间的通信
2.1 channel 基础
channel 是 Go 中 goroutine 之间通信的主要方式。你可以把它理解为一种类型安全的管道:
package main
import "fmt"
func main() {
// 创建 channel
ch := make(chan int) // 无缓冲 channel
bufferedCh := make(chan string, 3) // 有缓冲 channel,容量为 3
// 发送和接收
go func() {
ch <- 42 // 发送数据
}()
value := <-ch // 接收数据
fmt.Println(value)
// 有缓冲 channel:不阻塞直到缓冲区满
bufferedCh <- "hello"
bufferedCh <- "world"
bufferedCh <- "!"
// bufferedCh <- "overflow" // 这行会阻塞
// 关闭 channel
close(bufferedCh)
// 遍历接收(直到 channel 关闭)
for msg := range bufferedCh {
fmt.Println(msg)
}
}
2.2 无缓冲 vs 有缓冲
无缓冲 channel:同步通信,发送和接收必须同时准备好。
func unbufferedDemo() {
ch := make(chan int)
go func() {
fmt.Println("接收者准备就绪")
value := <-ch
fmt.Printf("接收到: %d\n", value)
}()
time.Sleep(100 * time.Millisecond)
fmt.Println("发送者准备发送")
ch <- 42 // 阻塞直到接收者准备好
fmt.Println("发送完成")
}
// 输出顺序:
// 接收者准备就绪
// 发送者准备发送
// 接收到: 42
// 发送完成
有缓冲 channel:异步通信,发送者只需在缓冲区满时才会阻塞。
func bufferedDemo() {
ch := make(chan int, 2)
ch <- 1 // 不阻塞
ch <- 2 // 不阻塞
// ch <- 3 // 阻塞,缓冲区已满
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
}
2.3 单向 channel
可以将 channel 限制为只发送或只接收,增加类型安全性:
// 只发送 channel
func producer(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}
// 只接收 channel
func consumer(ch <-chan int) {
for value := range ch {
fmt.Println("消费:", value)
}
}
func main() {
ch := make(chan int)
go producer(ch)
consumer(ch)
}
2.4 使用 select 多路复用
select 允许一个 goroutine 等待多个 channel 操作:
func selectDemo() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自 ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自 ch2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
case <-time.After(3 * time.Second):
fmt.Println("超时")
}
}
}
select 的典型用法:
// 1. 非阻塞收发
func nonBlocking(ch chan int) {
select {
case val := <-ch:
fmt.Println("收到:", val)
default:
fmt.Println("没有数据,立即返回")
}
select {
case ch <- 42:
fmt.Println("发送成功")
default:
fmt.Println("channel 已满,无法发送")
}
}
// 2. 超时控制
func withTimeout(ch chan int) {
select {
case result := <-ch:
fmt.Println("结果:", result)
case <-time.After(2 * time.Second):
fmt.Println("超时,放弃等待")
}
}
// 3. 退出信号
func workerWithQuit(stop <-chan struct{}) {
for {
select {
case <-stop:
fmt.Println("收到退出信号,停止工作")
return
default:
// 正常工作的代码
fmt.Println("工作中...")
time.Sleep(500 * time.Millisecond)
}
}
}
三、并发同步原语
虽然 channel 是 Go 推荐的通信方式,但某些场景下传统锁仍然更合适。
3.1 Mutex:互斥锁
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := &Counter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("最终计数:", counter.Value()) // 1000
}
3.2 RWMutex:读写锁
适合读多写少的场景:
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func NewCache() *Cache {
return &Cache{
data: make(map[string]string),
}
}
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock() // 读锁
defer c.mu.RUnlock()
val, ok := c.data[key]
return val, ok
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁
defer c.mu.Unlock()
c.data[key] = value
}
// 读锁是可重入的,多个 goroutine 可以同时持有读锁
// 写锁是独占的,与任何锁互斥
3.3 Once:单次执行
var (
once sync.Once
config *Config
)
type Config struct {
Host string
Port int
}
func LoadConfig() *Config {
once.Do(func() {
fmt.Println("加载配置(只执行一次)")
config = &Config{
Host: "localhost",
Port: 8080,
}
})
return config
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cfg := LoadConfig()
fmt.Printf("%p\n", cfg) // 所有 goroutine 拿到同一个实例
}()
}
wg.Wait()
}
3.4 Cond:条件变量
type Queue struct {
items []int
cond *sync.Cond
}
func NewQueue() *Queue {
return &Queue{
items: make([]int, 0),
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (q *Queue) Put(item int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.items = append(q.items, item)
fmt.Printf("生产: %d, 队列长度: %d\n", item, len(q.items))
// 通知一个等待的消费者
q.cond.Signal()
}
func (q *Queue) Get() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.items) == 0 {
// 等待条件满足
q.cond.Wait()
}
item := q.items[0]
q.items = q.items[1:]
fmt.Printf("消费: %d, 剩余: %d\n", item, len(q.items))
return item
}
3.5 Atomic:原子操作
import "sync/atomic"
type AtomicCounter struct {
value int64
}
func (c *AtomicCounter) Increment() {
atomic.AddInt64(&c.value, 1)
}
func (c *AtomicCounter) Load() int64 {
return atomic.LoadInt64(&c.value)
}
// CAS 操作
func (c *AtomicCounter) CompareAndSwap(old, new int64) bool {
return atomic.CompareAndSwapInt64(&c.value, old, new)
}
// 原子地存储并返回旧值
func (c *AtomicCounter) Swap(new int64) int64 {
return atomic.SwapInt64(&c.value, new)
}
四、并发模式
4.1 生产者-消费者模式
func producerConsumer() {
jobs := make(chan int, 5)
results := make(chan int, 5)
// 生产者
go func() {
for i := 1; i <= 10; i++ {
jobs <- i
fmt.Printf("生产任务: %d\n", i)
}
close(jobs)
}()
// 消费者(多个)
var wg sync.WaitGroup
for w := 1; w <= 3; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobs {
result := job * job
results <- result
fmt.Printf("工人 %d: %d² = %d\n", workerID, job, result)
}
}(w)
}
// 等待所有消费者完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("结果: %d\n", result)
}
}
4.2 扇出-扇入模式(Fan-out/Fan-in)
func fanOutFanIn() {
// 原始数据通道
input := make(chan int)
// 扇出:启动多个 worker
workers := 5
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = worker(input)
}
// 扇入:合并所有结果
output := merge(channels...)
// 发送数据
go func() {
for i := 1; i <= 20; i++ {
input <- i
}
close(input)
}()
// 接收结果
for result := range output {
fmt.Printf("最终结果: %d\n", result)
}
}
func worker(input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for n := range input {
// 模拟耗时处理
output <- n * n
}
}()
return output
}
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
output := make(chan int)
// 为每个 channel 启动一个 goroutine
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
output <- val
}
}(ch)
}
// 等待所有 goroutine 完成后关闭 output
go func() {
wg.Wait()
close(output)
}()
return output
}
4.3 管道模式(Pipeline)
func pipeline() {
// 阶段1:生成数字
generate := func(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}
// 阶段2:乘以 2
multiply := func(done <-chan struct{}, in <-chan int, factor int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * factor:
case <-done:
return
}
}
}()
return out
}
// 阶段3:累加
sum := func(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
total := 0
for n := range in {
total += n
select {
case out <- total:
case <-done:
return
}
}
}()
return out
}
done := make(chan struct{})
defer close(done)
nums := generate(done, 1, 2, 3, 4, 5)
doubled := multiply(done, nums, 2)
result := sum(done, doubled)
for v := range result {
fmt.Println("管道输出:", v) // 2, 6, 12, 20, 30
}
}
4.4 工作池模式(Worker Pool)
type Task struct {
ID int
Data string
}
type Result struct {
TaskID int
Output string
}
func workerPool() {
const numWorkers = 5
const numTasks = 20
tasks := make(chan Task, numTasks)
results := make(chan Result, numTasks)
// 启动 worker
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range tasks {
// 模拟处理任务
output := fmt.Sprintf("Worker %d 处理任务 %d: %s",
workerID, task.ID, task.Data)
results <- Result{TaskID: task.ID, Output: output}
time.Sleep(100 * time.Millisecond)
}
}(i)
}
// 发送任务
go func() {
for i := 1; i <= numTasks; i++ {
tasks <- Task{ID: i, Data: fmt.Sprintf("数据-%d", i)}
}
close(tasks)
}()
// 等待所有 worker 完成后关闭 results
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Println(result.Output)
}
}
4.5 超时和取消模式
func timeoutAndCancel() {
// 使用 context 进行取消
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(2 * time.Second)
cancel() // 2秒后取消
}()
// 带超时的 context
ctxWithTimeout, cancelTimeout := context.WithTimeout(context.Background(), 1*time.Second)
defer cancelTimeout()
// 使用 context 的工作函数
worker := func(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 被取消: %v\n", id, ctx.Err())
return
default:
fmt.Printf("Worker %d 工作中...\n", id)
time.Sleep(500 * time.Millisecond)
}
}
}
go worker(ctx, 1)
go worker(ctxWithTimeout, 2)
time.Sleep(3 * time.Second)
}
五、常见陷阱与最佳实践
5.1 goroutine 泄漏
// ❌ 错误示例:goroutine 泄漏
func leakExample() {
ch := make(chan int)
go func() {
// 这个 goroutine 永远不会退出,因为没有人从 ch 读取
value := <-ch
fmt.Println(value)
}()
// 函数返回,但 goroutine 仍然在运行
}
// ✅ 正确示例:使用带超时的 select
func safeExample() {
ch := make(chan int)
go func() {
select {
case value := <-ch:
fmt.Println(value)
case <-time.After(1 * time.Second):
fmt.Println("超时,退出")
}
}()
}
5.2 关闭 channel 的原则
// ✅ 正确的关闭方式:由发送方关闭
func correctClose() {
ch := make(chan int)
// 发送方
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 发送方关闭
}()
// 接收方
for v := range ch {
fmt.Println(v)
}
}
// ❌ 错误:在接收方或未检查的情况下关闭
func wrongClose() {
ch := make(chan int)
// 接收方不应关闭 channel
go func() {
<-ch
// close(ch) // 危险!可能导致发送方 panic
}()
// 重复关闭会 panic
// close(ch)
// close(ch) // panic: close of closed channel
}
5.3 竞态检测
# 使用 -race 标志检测竞态条件
go test -race ./...
go run -race main.go
go build -race myapp
5.4 常见并发模式对比
| 场景 | 推荐方案 |
|---|---|
| 传递数据 | channel |
| 计数器 | atomic 或 Mutex |
| 缓存 | RWMutex |
| 单次初始化 | sync.Once |
| 等待多个 goroutine | WaitGroup |
| 限流 | 带缓冲的 channel |
| 超时控制 | select + time.After |
| 取消传播 | context.Context |
| 复杂状态同步 | Cond |
六、完整实战:并发 URL 检查器
综合运用所学知识,构建一个并发 URL 健康检查器:
package main
import (
"context"
"fmt"
"net/http"
"sync"
"time"
)
type CheckResult struct {
URL string
Status int
Duration time.Duration
Error error
}
type Checker struct {
client *http.Client
maxWorkers int
timeout time.Duration
results chan CheckResult
wg sync.WaitGroup
}
func NewChecker(maxWorkers int, timeout time.Duration) *Checker {
return &Checker{
client: &http.Client{
Timeout: timeout,
},
maxWorkers: maxWorkers,
timeout: timeout,
results: make(chan CheckResult, 100),
}
}
func (c *Checker) Check(ctx context.Context, urls []string) <-chan CheckResult {
// 创建任务队列
jobs := make(chan string, len(urls))
// 启动 workers
for i := 0; i < c.maxWorkers; i++ {
c.wg.Add(1)
go c.worker(ctx, jobs)
}
// 发送任务
go func() {
for _, url := range urls {
select {
case jobs <- url:
case <-ctx.Done():
break
}
}
close(jobs)
}()
// 等待所有 worker 完成后关闭结果 channel
go func() {
c.wg.Wait()
close(c.results)
}()
return c.results
}
func (c *Checker) worker(ctx context.Context, jobs <-chan string) {
defer c.wg.Done()
for url := range jobs {
select {
case <-ctx.Done():
return
default:
result := c.checkSingle(url)
c.results <- result
}
}
}
func (c *Checker) checkSingle(url string) CheckResult {
start := time.Now()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return CheckResult{
URL: url,
Duration: time.Since(start),
Error: err,
}
}
resp, err := c.client.Do(req)
duration := time.Since(start)
if err != nil {
return CheckResult{
URL: url,
Duration: duration,
Error: err,
}
}
defer resp.Body.Close()
return CheckResult{
URL: url,
Status: resp.StatusCode,
Duration: duration,
Error: nil,
}
}
// 结果聚合器
func AggregateResults(results <-chan CheckResult) {
var (
success int
failed int
total time.Duration
count int
)
fmt.Println("\n=== 检查结果 ===\n")
for result := range results {
count++
total += result.Duration
if result.Error != nil {
failed++
fmt.Printf("❌ %s - 失败: %v\n", result.URL, result.Error)
} else if result.Status >= 400 {
failed++
fmt.Printf("⚠️ %s - HTTP %d\n", result.URL, result.Status)
} else {
success++
fmt.Printf("✅ %s - HTTP %d (%.2fms)\n",
result.URL, result.Status, float64(result.Duration.Microseconds())/1000)
}
}
fmt.Printf("\n=== 统计 ===\n")
fmt.Printf("总数: %d, 成功: %d, 失败: %d\n", count, success, failed)
if count > 0 {
fmt.Printf("平均响应时间: %.2fms\n", float64(total.Microseconds())/1000/float64(count))
}
}
func main() {
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://invalid-url-example.com",
"https://www.golang.org",
}
// 创建 checker
checker := NewChecker(5, 10*time.Second)
// 带超时的 context
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 执行检查
results := checker.Check(ctx, urls)
// 聚合结果
AggregateResults(results)
}
总结
Go 的并发模型简洁而强大。记住以下核心原则:
- 使用 channel 进行通信:不要通过共享内存来通信,而要通过通信来共享内存
- 不要担心 goroutine 泄漏:但要注意 channel 阻塞和未退出的 goroutine
- 使用 context 进行取消传播:优雅地处理超时和取消
- 遵循关闭原则:谁创建 channel,谁负责关闭它
- 使用 -race 检测竞态:让工具帮你发现问题
快速参考表:
| 需要 | 使用 |
|---|---|
| 启动并发任务 | go func(){}() |
| 等待完成 | sync.WaitGroup |
| 传递数据 | channel |
| 同步访问共享变量 | sync.Mutex |
| 单次初始化 | sync.Once |
| 限制并发数 | 带缓冲的 channel / worker pool |
| 超时控制 | select + time.After |
| 取消传播 | context.WithCancel |
| 原子操作 | sync/atomic |
Go 并发的优雅之处在于,它让你能够专注于问题本身,而不是被底层细节所困扰。希望本文能帮助你写出正确、高效、优雅的 Go 并发代码!