消息队列入门教程 — 异步处理必备

适用人群:后端开发者
学习时长:约1-2天
工具:RabbitMQ / Redis / Kafka
重要程度:★★★★☆(分布式系统必备)

一、消息队列是什么?

消息队列是一种异步通信机制,用于在应用程序之间传递消息。

特点说明
异步处理耗时任务异步执行
应用解耦服务之间松耦合
流量削峰平滑处理突发流量
可靠性消息持久化,不丢失

二、常见消息队列

消息队列语言特点适用场景
RabbitMQErlang功能完善、可靠业务消息
RedisC轻量级、高性能简单队列
KafkaJava高吞吐、持久化日志收集、大数据
RocketMQJava阿里出品、事务消息电商订单

三、Redis 作为消息队列

3.1 List实现简单队列

import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 生产者
def publish(task_name, data):
    message = json.dumps({
        'task': task_name,
        'data': data,
        'timestamp': time.time()
    })
    r.lpush('task_queue', message)
    print(f"发布任务:{task_name}")

# 消费者
def consume():
    while True:
        # 阻塞获取
        _, message = r.brpop('task_queue', timeout=0)
        task = json.loads(message)
        print(f"处理任务:{task['task']}")
        # 执行任务...

# 使用
publish('send_email', {'to': 'user@example.com', 'subject': '欢迎'})
consume()

3.2 Pub/Sub发布订阅

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 发布者
def publish(channel, message):
    r.publish(channel, json.dumps(message))

# 订阅者
def subscribe(channel):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            print(f"收到消息:{data}")

# 使用
publish('order:created', {'order_id': 123, 'user_id': 456})
subscribe('order:created')


四、RabbitMQ

4.1 安装

# Docker安装
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=password \
  rabbitmq:3-management

# 访问管理界面:http://localhost:15672

4.2 Python使用RabbitMQ

# pip install pika

import pika
import json

# 连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost', credentials=pika.PlainCredentials('admin', 'password'))
)
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='task_queue', durable=True)

# 生产者
def publish(task_name, data):
    message = json.dumps({'task': task_name, 'data': data})
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 消息持久化
        )
    )
    print(f"发布:{task_name}")

# 消费者
def callback(ch, method, properties, body):
    task = json.loads(body)
    print(f"处理:{task['task']}")
    # 执行任务...
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)  # 每次只处理一条
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

4.3 PHP使用RabbitMQ

// composer require php-amqplib/php-amqplib

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 连接
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'password');
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('task_queue', false, true, false, false);

// 生产者
function publish($channel, $taskName, $data) {
    $message = json_encode(['task' => $taskName, 'data' => $data]);
    $msg = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    $channel->basic_publish($msg, '', 'task_queue');
    echo "发布:{$taskName}\n";
}

// 消费者
$callback = function ($msg) {
    $task = json_decode($msg->body, true);
    echo "处理:{$task['task']}\n";
    // 执行任务...
    $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}


五、消息队列模式

5.1 简单模式

Producer → Queue → Consumer
一个生产者,一个消费者

5.2 工作队列模式

Producer → Queue → Consumer 1
                 → Consumer 2
多个消费者竞争消费,实现负载均衡

5.3 发布订阅模式

Producer → Exchange → Queue 1 → Consumer 1
                   → Queue 2 → Consumer 2
一个消息被多个消费者接收

5.4 路由模式

Producer → Exchange → Queue 1 (routing_key=error) → Consumer 1
                   → Queue 2 (routing_key=info) → Consumer 2
根据routing_key路由到不同队列


六、实际应用场景

6.1 异步发送邮件

# 用户注册后异步发送欢迎邮件
def register(username, email):
    # 1. 创建用户
    user = User.create(username=username, email=email)
    
    # 2. 发送消息到队列
    publish('send_email', {
        'to': email,
        'subject': '欢迎注册',
        'body': f'亲爱的{username},欢迎加入!'
    })
    
    return user

# 消费者处理
def handle_send_email(data):
    send_email(data['to'], data['subject'], data['body'])

6.2 订单超时取消

# 创建订单后,延迟取消
def create_order(user_id, product_id):
    order = Order.create(user_id=user_id, product_id=product_id, status='pending')
    
    # 30分钟后自动取消
    publish('order_timeout', {
        'order_id': order.id,
        'timeout': 1800  # 30分钟
    }, delay=1800)
    
    return order

# 消费者处理
def handle_order_timeout(data):
    order = Order.find(data['order_id'])
    if order.status == 'pending':
        order.update(status='cancelled')
        # 恢复库存
        restore_stock(order)

6.3 日志收集

# 应用日志异步写入
def log(level, message, context=None):
    publish('app_logs', {
        'level': level,
        'message': message,
        'context': context,
        'timestamp': time.time(),
        'server': get_hostname()
    })

# 消费者写入Elasticsearch
def handle_log(data):
    es.index(index='app-logs', body=data)


七、最佳实践

✅ 消息持久化:防止消息丢失
✅ 消息确认:确保消息被处理
✅ 重试机制:处理失败时重试
✅ 死信队列:多次失败的消息
✅ 监控告警:监控队列长度和消费速度
✅ 限流:防止消费者过载
✅ 幂等性:消息可能重复,需要幂等处理


学习建议

  1. 先理解消息队列的作用,解决什么问题
  2. 从Redis开始,最简单的消息队列
  3. 学习RabbitMQ,功能最完善
  4. 掌握消息确认和重试,保证可靠性
  5. 实际项目中应用,如异步邮件、订单超时

下一步学习

返回首页