Go 语言高级教程 — 并发编程深入

前置要求:Go语言基础
学习时长:约2-3天
适用场景:高并发服务、微服务、系统编程

一、Goroutine深入

1.1 Goroutine调度器

// Go运行时调度器使用GMP模型:
// G = Goroutine(协程)
// M = Machine(系统线程)
// P = Processor(处理器)

// 查看GOMAXPROCS
package main

import (
    "fmt"
    "runtime"
)

func main() {
    // 默认等于CPU核心数
    fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0))
    
    // 设置为4
    runtime.GOMAXPROCS(4)
}

1.2 Goroutine泄漏

// 错误示例:Goroutine泄漏
func leak() {
    ch := make(chan int)
    go func() {
        ch <- 1  // 永远阻塞,因为没有接收者
    }()
    // 函数返回,但goroutine还在运行
}

// 正确示例:使用context取消
func noLeak(ctx context.Context) {
    ch := make(chan int, 1)
    go func() {
        select {
        case ch <- 1:
            fmt.Println("发送成功")
        case <-ctx.Done():
            fmt.Println("取消")
            return
        }
    }()
}

1.3 WaitGroup详解

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Worker %d 开始\n", id)
            time.Sleep(time.Second)
            fmt.Printf("Worker %d 完成\n", id)
        }(i)
    }
    
    wg.Wait()
    fmt.Println("所有worker完成")
}


二、Channel高级

2.1 Channel方向

// 只写channel
func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

// 只读channel
func consumer(ch <-chan int) {
    for val := range ch {
        fmt.Println(val)
    }
}

func main() {
    ch := make(chan int, 5)
    go producer(ch)
    consumer(ch)
}

2.2 Select多路复用

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "来自ch1"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "来自ch2"
    }()
    
    // 等待第一个完成的
    select {
    case msg := <-ch1:
        fmt.Println(msg)
    case msg := <-ch2:
        fmt.Println(msg)
    case <-time.After(3 * time.Second):
        fmt.Println("超时")
    }
}

// 超时控制
func withTimeout(timeout time.Duration) (string, error) {
    ch := make(chan string, 1)
    
    go func() {
        // 模拟耗时操作
        time.Sleep(2 * time.Second)
        ch <- "结果"
    }()
    
    select {
    case result := <-ch:
        return result, nil
    case <-time.After(timeout):
        return "", fmt.Errorf("超时")
    }
}

2.3 Fan-out/Fan-in模式

// Fan-out: 一个输入分发给多个worker
// Fan-in: 多个输出合并为一个

func fanOut(input <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        channels[i] = worker(input)
    }
    return channels
}

func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                merged <- val
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(merged)
    }()
    
    return merged
}

func worker(input <-chan int) <-chan int {
    output := make(chan int)
    go func() {
        defer close(output)
        for val := range input {
            output <- val * 2  // 处理
        }
    }()
    return output
}


三、Context

3.1 Context基础

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    // 创建带超时的context
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    go worker(ctx)
    
    // 等待或取消
    time.Sleep(3 * time.Second)
    cancel()  // 手动取消
}

func worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("收到取消信号:", ctx.Err())
            return
        default:
            fmt.Println("工作中...")
            time.Sleep(500 * time.Millisecond)
        }
    }
}

3.2 Context传值

type contextKey string

const userIDKey contextKey = "userID"

func middleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        userID := r.Header.Get("X-User-ID")
        ctx := context.WithValue(r.Context(), userIDKey, userID)
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

func handler(w http.ResponseWriter, r *http.Request) {
    userID := r.Context().Value(userIDKey).(string)
    fmt.Fprintf(w, "User ID: %s", userID)
}


四、sync包

4.1 Mutex互斥锁

type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.v[key]++
}

func (c *SafeCounter) Get(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.v[key]
}

4.2 RWMutex读写锁

type SafeMap struct {
    mu sync.RWMutex
    v  map[string]interface{}
}

func (m *SafeMap) Get(key string) (interface{}, bool) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    val, ok := m.v[key]
    return val, ok
}

func (m *SafeMap) Set(key string, value interface{}) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.v[key] = value
}

4.3 Once单次执行

var once sync.Once
var instance *Database

func GetDatabase() *Database {
    once.Do(func() {
        instance = &Database{}
        instance.Connect()
    })
    return instance
}

4.4 Pool对象池

var pool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 1024)
    },
}

func process() {
    buf := pool.Get().([]byte)
    defer pool.Put(buf)
    
    // 使用buf...
}


五、并发模式

5.1 Worker Pool

type WorkerPool struct {
    tasks   chan func()
    workers int
    wg      sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    return &WorkerPool{
        tasks:   make(chan func(), 100),
        workers: workers,
    }
}

func (p *WorkerPool) Start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func(id int) {
            defer p.wg.Done()
            for task := range p.tasks {
                fmt.Printf("Worker %d 执行任务\n", id)
                task()
            }
        }(i)
    }
}

func (p *WorkerPool) Submit(task func()) {
    p.tasks <- task
}

func (p *WorkerPool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

// 使用
pool := NewWorkerPool(5)
pool.Start()

for i := 0; i < 20; i++ {
    pool.Submit(func() {
        time.Sleep(time.Second)
    })
}

pool.Stop()

5.2 Rate Limiter

import "golang.org/x/time/rate"

// 每秒100个请求,突发200
limiter := rate.NewLimiter(100, 200)

func handler(w http.ResponseWriter, r *http.Request) {
    if !limiter.Allow() {
        http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
        return
    }
    // 处理请求
}


六、并发安全

// 1. 使用channel而不是共享内存
// Don't communicate by sharing memory; share memory by communicating.

// 2. 使用sync.Map(并发安全的map)
var m sync.Map

m.Store("key", "value")
val, ok := m.Load("key")

// 3. 使用atomic包
var counter int64

atomic.AddInt64(&counter, 1)
atomic.LoadInt64(&counter)


学习建议

  1. 理解Goroutine调度原理
  2. 掌握Channel的各种用法
  3. 学会使用Context控制生命周期
  4. 理解并发安全问题
  5. 实践并发模式:Worker Pool、Fan-out/Fan-in
返回首页