适用人群:后端开发者
学习时长:约1-2天
工具:RabbitMQ / Redis / Kafka
重要程度:★★★★☆(分布式系统必备)
一、消息队列是什么?
消息队列是一种异步通信机制,用于在应用程序之间传递消息。
| 特点 | 说明 |
|---|---|
| 异步处理 | 耗时任务异步执行 |
| 应用解耦 | 服务之间松耦合 |
| 流量削峰 | 平滑处理突发流量 |
| 可靠性 | 消息持久化,不丢失 |
二、常见消息队列
| 消息队列 | 语言 | 特点 | 适用场景 |
|---|---|---|---|
| RabbitMQ | Erlang | 功能完善、可靠 | 业务消息 |
| Redis | C | 轻量级、高性能 | 简单队列 |
| Kafka | Java | 高吞吐、持久化 | 日志收集、大数据 |
| RocketMQ | Java | 阿里出品、事务消息 | 电商订单 |
三、Redis 作为消息队列
3.1 List实现简单队列
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 生产者
def publish(task_name, data):
message = json.dumps({
'task': task_name,
'data': data,
'timestamp': time.time()
})
r.lpush('task_queue', message)
print(f"发布任务:{task_name}")
# 消费者
def consume():
while True:
# 阻塞获取
_, message = r.brpop('task_queue', timeout=0)
task = json.loads(message)
print(f"处理任务:{task['task']}")
# 执行任务...
# 使用
publish('send_email', {'to': 'user@example.com', 'subject': '欢迎'})
consume()
3.2 Pub/Sub发布订阅
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 发布者
def publish(channel, message):
r.publish(channel, json.dumps(message))
# 订阅者
def subscribe(channel):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
print(f"收到消息:{data}")
# 使用
publish('order:created', {'order_id': 123, 'user_id': 456})
subscribe('order:created')
四、RabbitMQ
4.1 安装
# Docker安装
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3-management
# 访问管理界面:http://localhost:15672
4.2 Python使用RabbitMQ
# pip install pika
import pika
import json
# 连接
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', credentials=pika.PlainCredentials('admin', 'password'))
)
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='task_queue', durable=True)
# 生产者
def publish(task_name, data):
message = json.dumps({'task': task_name, 'data': data})
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
)
)
print(f"发布:{task_name}")
# 消费者
def callback(ch, method, properties, body):
task = json.loads(body)
print(f"处理:{task['task']}")
# 执行任务...
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) # 每次只处理一条
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
4.3 PHP使用RabbitMQ
// composer require php-amqplib/php-amqplib
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 连接
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'password');
$channel = $connection->channel();
// 声明队列
$channel->queue_declare('task_queue', false, true, false, false);
// 生产者
function publish($channel, $taskName, $data) {
$message = json_encode(['task' => $taskName, 'data' => $data]);
$msg = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', 'task_queue');
echo "发布:{$taskName}\n";
}
// 消费者
$callback = function ($msg) {
$task = json_decode($msg->body, true);
echo "处理:{$task['task']}\n";
// 执行任务...
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
五、消息队列模式
5.1 简单模式
Producer → Queue → Consumer
一个生产者,一个消费者
5.2 工作队列模式
Producer → Queue → Consumer 1
→ Consumer 2
多个消费者竞争消费,实现负载均衡
5.3 发布订阅模式
Producer → Exchange → Queue 1 → Consumer 1
→ Queue 2 → Consumer 2
一个消息被多个消费者接收
5.4 路由模式
Producer → Exchange → Queue 1 (routing_key=error) → Consumer 1
→ Queue 2 (routing_key=info) → Consumer 2
根据routing_key路由到不同队列
六、实际应用场景
6.1 异步发送邮件
# 用户注册后异步发送欢迎邮件
def register(username, email):
# 1. 创建用户
user = User.create(username=username, email=email)
# 2. 发送消息到队列
publish('send_email', {
'to': email,
'subject': '欢迎注册',
'body': f'亲爱的{username},欢迎加入!'
})
return user
# 消费者处理
def handle_send_email(data):
send_email(data['to'], data['subject'], data['body'])
6.2 订单超时取消
# 创建订单后,延迟取消
def create_order(user_id, product_id):
order = Order.create(user_id=user_id, product_id=product_id, status='pending')
# 30分钟后自动取消
publish('order_timeout', {
'order_id': order.id,
'timeout': 1800 # 30分钟
}, delay=1800)
return order
# 消费者处理
def handle_order_timeout(data):
order = Order.find(data['order_id'])
if order.status == 'pending':
order.update(status='cancelled')
# 恢复库存
restore_stock(order)
6.3 日志收集
# 应用日志异步写入
def log(level, message, context=None):
publish('app_logs', {
'level': level,
'message': message,
'context': context,
'timestamp': time.time(),
'server': get_hostname()
})
# 消费者写入Elasticsearch
def handle_log(data):
es.index(index='app-logs', body=data)
七、最佳实践
✅ 消息持久化:防止消息丢失
✅ 消息确认:确保消息被处理
✅ 重试机制:处理失败时重试
✅ 死信队列:多次失败的消息
✅ 监控告警:监控队列长度和消费速度
✅ 限流:防止消费者过载
✅ 幂等性:消息可能重复,需要幂等处理
学习建议
- 先理解消息队列的作用,解决什么问题
- 从Redis开始,最简单的消息队列
- 学习RabbitMQ,功能最完善
- 掌握消息确认和重试,保证可靠性
- 实际项目中应用,如异步邮件、订单超时
下一步学习
- RabbitMQ官方文档
- Kafka — 大数据消息队列
- Redis Pub/Sub