前置要求:Go语言基础
学习时长:约2-3天
适用场景:高并发服务、微服务、系统编程
一、Goroutine深入
1.1 Goroutine调度器
// Go运行时调度器使用GMP模型:
// G = Goroutine(协程)
// M = Machine(系统线程)
// P = Processor(处理器)
// 查看GOMAXPROCS
package main
import (
"fmt"
"runtime"
)
func main() {
// 默认等于CPU核心数
fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0))
// 设置为4
runtime.GOMAXPROCS(4)
}
1.2 Goroutine泄漏
// 错误示例:Goroutine泄漏
func leak() {
ch := make(chan int)
go func() {
ch <- 1 // 永远阻塞,因为没有接收者
}()
// 函数返回,但goroutine还在运行
}
// 正确示例:使用context取消
func noLeak(ctx context.Context) {
ch := make(chan int, 1)
go func() {
select {
case ch <- 1:
fmt.Println("发送成功")
case <-ctx.Done():
fmt.Println("取消")
return
}
}()
}
1.3 WaitGroup详解
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d 开始\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成\n", id)
}(i)
}
wg.Wait()
fmt.Println("所有worker完成")
}
二、Channel高级
2.1 Channel方向
// 只写channel
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
// 只读channel
func consumer(ch <-chan int) {
for val := range ch {
fmt.Println(val)
}
}
func main() {
ch := make(chan int, 5)
go producer(ch)
consumer(ch)
}
2.2 Select多路复用
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自ch2"
}()
// 等待第一个完成的
select {
case msg := <-ch1:
fmt.Println(msg)
case msg := <-ch2:
fmt.Println(msg)
case <-time.After(3 * time.Second):
fmt.Println("超时")
}
}
// 超时控制
func withTimeout(timeout time.Duration) (string, error) {
ch := make(chan string, 1)
go func() {
// 模拟耗时操作
time.Sleep(2 * time.Second)
ch <- "结果"
}()
select {
case result := <-ch:
return result, nil
case <-time.After(timeout):
return "", fmt.Errorf("超时")
}
}
2.3 Fan-out/Fan-in模式
// Fan-out: 一个输入分发给多个worker
// Fan-in: 多个输出合并为一个
func fanOut(input <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = worker(input)
}
return channels
}
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
merged <- val
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func worker(input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for val := range input {
output <- val * 2 // 处理
}
}()
return output
}
三、Context
3.1 Context基础
package main
import (
"context"
"fmt"
"time"
)
func main() {
// 创建带超时的context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go worker(ctx)
// 等待或取消
time.Sleep(3 * time.Second)
cancel() // 手动取消
}
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("收到取消信号:", ctx.Err())
return
default:
fmt.Println("工作中...")
time.Sleep(500 * time.Millisecond)
}
}
}
3.2 Context传值
type contextKey string
const userIDKey contextKey = "userID"
func middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userID := r.Header.Get("X-User-ID")
ctx := context.WithValue(r.Context(), userIDKey, userID)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
func handler(w http.ResponseWriter, r *http.Request) {
userID := r.Context().Value(userIDKey).(string)
fmt.Fprintf(w, "User ID: %s", userID)
}
四、sync包
4.1 Mutex互斥锁
type SafeCounter struct {
mu sync.Mutex
v map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.v[key]++
}
func (c *SafeCounter) Get(key string) int {
c.mu.Lock()
defer c.mu.Unlock()
return c.v[key]
}
4.2 RWMutex读写锁
type SafeMap struct {
mu sync.RWMutex
v map[string]interface{}
}
func (m *SafeMap) Get(key string) (interface{}, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
val, ok := m.v[key]
return val, ok
}
func (m *SafeMap) Set(key string, value interface{}) {
m.mu.Lock()
defer m.mu.Unlock()
m.v[key] = value
}
4.3 Once单次执行
var once sync.Once
var instance *Database
func GetDatabase() *Database {
once.Do(func() {
instance = &Database{}
instance.Connect()
})
return instance
}
4.4 Pool对象池
var pool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}
func process() {
buf := pool.Get().([]byte)
defer pool.Put(buf)
// 使用buf...
}
五、并发模式
5.1 Worker Pool
type WorkerPool struct {
tasks chan func()
workers int
wg sync.WaitGroup
}
func NewWorkerPool(workers int) *WorkerPool {
return &WorkerPool{
tasks: make(chan func(), 100),
workers: workers,
}
}
func (p *WorkerPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(id int) {
defer p.wg.Done()
for task := range p.tasks {
fmt.Printf("Worker %d 执行任务\n", id)
task()
}
}(i)
}
}
func (p *WorkerPool) Submit(task func()) {
p.tasks <- task
}
func (p *WorkerPool) Stop() {
close(p.tasks)
p.wg.Wait()
}
// 使用
pool := NewWorkerPool(5)
pool.Start()
for i := 0; i < 20; i++ {
pool.Submit(func() {
time.Sleep(time.Second)
})
}
pool.Stop()
5.2 Rate Limiter
import "golang.org/x/time/rate"
// 每秒100个请求,突发200
limiter := rate.NewLimiter(100, 200)
func handler(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
return
}
// 处理请求
}
六、并发安全
// 1. 使用channel而不是共享内存
// Don't communicate by sharing memory; share memory by communicating.
// 2. 使用sync.Map(并发安全的map)
var m sync.Map
m.Store("key", "value")
val, ok := m.Load("key")
// 3. 使用atomic包
var counter int64
atomic.AddInt64(&counter, 1)
atomic.LoadInt64(&counter)
学习建议
- 理解Goroutine调度原理
- 掌握Channel的各种用法
- 学会使用Context控制生命周期
- 理解并发安全问题
- 实践并发模式:Worker Pool、Fan-out/Fan-in