WebSocket 实时通信教程 — 双向通信

适用人群:后端开发者、前端开发者
学习时长:约1-2天
工具:Socket.io / ws / Gorilla WebSocket
适用场景:聊天应用、实时通知、在线游戏

一、WebSocket是什么?

WebSocket是一种在单个TCP连接上进行全双工通信的协议,实现服务器和客户端的实时双向通信。

对比项HTTPWebSocket
连接短连接长连接
通信单向(请求-响应)双向
实时性轮询实时推送
开销每次请求都有头信息只有握手时有头信息

二、Node.js实现(Socket.io)

2.1 安装

# 服务端
npm install express socket.io

# 客户端
npm install socket.io-client

2.2 服务端

// server.js
import express from 'express'
import { createServer } from 'http'
import { Server } from 'socket.io'

const app = express()
const server = createServer(app)
const io = new Server(server, {
  cors: { origin: '*' }
})

// 在线用户
const onlineUsers = new Map()

io.on('connection', (socket) => {
  console.log(`用户连接:${socket.id}`)
  
  // 用户上线
  socket.on('user_online', (userId) => {
    onlineUsers.set(userId, socket.id)
    io.emit('online_users', Array.from(onlineUsers.keys()))
  })
  
  // 私聊
  socket.on('private_message', ({ to, content }) => {
    const targetSocketId = onlineUsers.get(to)
    if (targetSocketId) {
      io.to(targetSocketId).emit('new_message', {
        from: socket.userId,
        content,
        timestamp: new Date()
      })
    }
  })
  
  // 群聊
  socket.on('join_room', (roomId) => {
    socket.join(roomId)
    console.log(`${socket.id} 加入房间 ${roomId}`)
  })
  
  socket.on('room_message', ({ roomId, content }) => {
    io.to(roomId).emit('new_message', {
      from: socket.userId,
      content,
      roomId,
      timestamp: new Date()
    })
  })
  
  // 输入状态
  socket.on('typing', ({ to }) => {
    const targetSocketId = onlineUsers.get(to)
    if (targetSocketId) {
      io.to(targetSocketId).emit('user_typing', { userId: socket.userId })
    }
  })
  
  // 断开连接
  socket.on('disconnect', () => {
    // 移除在线用户
    for (const [userId, socketId] of onlineUsers.entries()) {
      if (socketId === socket.id) {
        onlineUsers.delete(userId)
        break
      }
    }
    io.emit('online_users', Array.from(onlineUsers.keys()))
    console.log(`用户断开:${socket.id}`)
  })
})

server.listen(3000, () => {
  console.log('服务器运行在 http://localhost:3000')
})

2.3 客户端

// client.js
import { io } from 'socket.io-client'

const socket = io('http://localhost:3000')

// 连接成功
socket.on('connect', () => {
  console.log('已连接')
  socket.emit('user_online', userId)
})

// 接收消息
socket.on('new_message', (message) => {
  console.log('收到消息:', message)
})

// 接收在线用户
socket.on('online_users', (users) => {
  console.log('在线用户:', users)
})

// 发送私聊消息
function sendPrivateMessage(to, content) {
  socket.emit('private_message', { to, content })
}

// 加入房间
function joinRoom(roomId) {
  socket.emit('join_room', roomId)
}

// 发送群聊消息
function sendRoomMessage(roomId, content) {
  socket.emit('room_message', { roomId, content })
}

// 发送输入状态
function sendTyping(to) {
  socket.emit('typing', { to })
}

// 断开连接
socket.on('disconnect', () => {
  console.log('已断开')
})


三、Go实现(Gorilla WebSocket)

go get github.com/gorilla/websocket

// main.go
package main

import (
    "fmt"
    "log"
    "net/http"
    "sync"
    
    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

// 客户端管理
type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
    mu         sync.RWMutex
}

type Client struct {
    hub  *Hub
    conn *websocket.Conn
    send chan []byte
    userId string
}

func newHub() *Hub {
    return &Hub{
        clients:    make(map[*Client]bool),
        broadcast:  make(chan []byte),
        register:   make(chan *Client),
        unregister: make(chan *Client),
    }
}

func (h *Hub) run() {
    for {
        select {
        case client := <-h.register:
            h.mu.Lock()
            h.clients[client] = true
            h.mu.Unlock()
            
        case client := <-h.unregister:
            h.mu.Lock()
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                close(client.send)
            }
            h.mu.Unlock()
            
        case message := <-h.broadcast:
            h.mu.RLock()
            for client := range h.clients {
                select {
                case client.send <- message:
                default:
                    close(client.send)
                    delete(h.clients, client)
                }
            }
            h.mu.RUnlock()
        }
    }
}

func (c *Client) readPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()
    
    for {
        _, message, err := c.conn.ReadMessage()
        if err != nil {
            break
        }
        c.hub.broadcast <- message
    }
}

func (c *Client) writePump() {
    defer c.conn.Close()
    
    for message := range c.send {
        err := c.conn.WriteMessage(websocket.TextMessage, message)
        if err != nil {
            break
        }
    }
}

func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }
    
    client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
    hub.register <- client
    
    go client.writePump()
    go client.readPump()
}

func main() {
    hub := newHub()
    go hub.run()
    
    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        serveWs(hub, w, r)
    })
    
    fmt.Println("服务器运行在 :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}


四、Python实现(websockets)

pip install websockets

# server.py
import asyncio
import websockets
import json

# 在线客户端
clients = {}

async def handler(websocket, path):
    # 用户上线
    user_id = None
    try:
        async for message in websocket:
            data = json.loads(message)
            
            if data['type'] == 'login':
                user_id = data['user_id']
                clients[user_id] = websocket
                print(f"用户 {user_id} 上线")
                # 广播在线用户
                await broadcast({
                    'type': 'online_users',
                    'users': list(clients.keys())
                })
                
            elif data['type'] == 'private_message':
                target_ws = clients.get(data['to'])
                if target_ws:
                    await target_ws.send(json.dumps({
                        'type': 'message',
                        'from': user_id,
                        'content': data['content']
                    }))
                    
            elif data['type'] == 'broadcast':
                await broadcast({
                    'type': 'message',
                    'from': user_id,
                    'content': data['content']
                })
                
    finally:
        # 用户下线
        if user_id:
            del clients[user_id]
            print(f"用户 {user_id} 下线")
            await broadcast({
                'type': 'online_users',
                'users': list(clients.keys())
            })

async def broadcast(message):
    for ws in clients.values():
        try:
            await ws.send(json.dumps(message))
        except:
            pass

async def main():
    async with websockets.serve(handler, "localhost", 8765):
        print("服务器运行在 ws://localhost:8765")
        await asyncio.Future()

asyncio.run(main())


五、应用场景

5.1 实时聊天

// 前端
const socket = io('http://localhost:3000')

// 发送消息
function sendMessage(to, content) {
  socket.emit('private_message', { to, content })
}

// 接收消息
socket.on('new_message', (message) => {
  // 显示消息
  addMessageToUI(message)
})

5.2 实时通知

// 后端
function sendNotification(userId, notification) {
  const socketId = onlineUsers.get(userId)
  if (socketId) {
    io.to(socketId).emit('notification', notification)
  }
}

// 前端
socket.on('notification', (notification) => {
  // 显示通知
  showToast(notification.message)
})

5.3 在线协同编辑

// 前端
socket.on('document_update', ({ userId, changes }) => {
  // 应用远程更改
  applyChanges(changes)
})

function sendChanges(changes) {
  socket.emit('document_update', { documentId, changes })
}


六、最佳实践

✅ 心跳检测:定期发送ping保持连接
✅ 断线重连:客户端自动重连机制
✅ 消息确认:重要消息需要确认
✅ 消息持久化:聊天记录存储到数据库
✅ 认证:WebSocket连接时验证token
✅ 限流:防止恶意发送消息
✅ 负载均衡:多实例时使用Redis广播


学习建议

  1. 先理解WebSocket协议
  2. 从Socket.io开始,封装完善
  3. 实现简单聊天功能
  4. 处理断线重连
  5. 学习消息持久化
返回首页