前置要求:Python基础
学习时长:约2-3天
适用场景:高并发服务、爬虫、实时应用
一、异步编程基础
1.1 什么是异步?
# 同步:阻塞等待
import time
def sync_task():
time.sleep(1) # 阻塞1秒
return "完成"
# 异步:非阻塞
import asyncio
async def async_task():
await asyncio.sleep(1) # 非阻塞
return "完成"
1.2 asyncio基础
import asyncio
# 定义协程
async def say_hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# 运行协程
asyncio.run(say_hello())
# 并发执行多个任务
async def main():
# 方式1:gather
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3)
)
# 方式2:TaskGroup (Python 3.11+)
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_user(1))
task2 = tg.create_task(fetch_user(2))
task3 = tg.create_task(fetch_user(3))
# 获取结果
print(task1.result(), task2.result(), task3.result())
asyncio.run(main())
二、aiohttp异步HTTP
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
urls = [
'https://api.example.com/users/1',
'https://api.example.com/users/2',
'https://api.example.com/users/3',
]
async with aiohttp.ClientSession() as session:
# 并发请求
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())
2.1 限流并发
import asyncio
import aiohttp
async def fetch_with_limit(urls, max_concurrent=10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(session, url):
async with semaphore:
async with session.get(url) as response:
return await response.json()
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)
# 使用
urls = [f'https://api.example.com/users/{i}' for i in range(100)]
results = asyncio.run(fetch_with_limit(urls, max_concurrent=10))
三、FastAPI异步框架
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import asyncio
app = FastAPI()
# 异步数据库操作
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "mysql+aiomysql://user:password@localhost/db"
engine = create_async_engine(DATABASE_URL)
async_session = sessionmaker(engine, class_=AsyncSession)
# 模型
class UserCreate(BaseModel):
username: str
email: str
class UserResponse(BaseModel):
id: int
username: str
email: str
# 异步接口
@app.get("/api/users", response_model=list[UserResponse])
async def get_users():
async with async_session() as session:
result = await session.execute("SELECT * FROM users")
return result.fetchall()
@app.post("/api/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
async with async_session() as session:
# 异步插入
result = await session.execute(
"INSERT INTO users (username, email) VALUES (:username, :email) RETURNING id",
{"username": user.username, "email": user.email}
)
await session.commit()
return {**user.dict(), "id": result.scalar()}
@app.get("/api/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
async with async_session() as session:
result = await session.execute(
"SELECT * FROM users WHERE id = :id",
{"id": user_id}
)
user = result.first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return user
四、异步数据库(SQLAlchemy)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
email = Column(String(100), unique=True)
# 异步CRUD
async def create_user(username: str, email: str):
async with async_session() as session:
user = User(username=username, email=email)
session.add(user)
await session.commit()
return user
async def get_user(user_id: int):
async with async_session() as session:
return await session.get(User, user_id)
async def get_users():
async with async_session() as session:
result = await session.execute("SELECT * FROM users")
return result.scalars().all()
五、并发模型对比
# 1. 多线程(适合IO密集型)
import threading
def thread_task():
import requests
return requests.get('https://api.example.com')
threads = [threading.Thread(target=thread_task) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
# 2. 多进程(适合CPU密集型)
from multiprocessing import Pool
def cpu_task(n):
return sum(range(n))
with Pool(4) as p:
results = p.map(cpu_task, [10**7] * 4)
# 3. 异步(适合IO密集型)
import asyncio
import aiohttp
async def async_task():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as resp:
return await resp.json()
asyncio.run(async_task())
六、异步上下文管理器
# 自定义异步上下文管理器
class AsyncDatabase:
async def __aenter__(self):
self.conn = await async_connect()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.conn.close()
async def main():
async with AsyncDatabase() as conn:
await conn.execute("SELECT * FROM users")
七、异步生成器
# 异步生成器
async def async_range(n):
for i in range(n):
await asyncio.sleep(0.1)
yield i
async def main():
async for i in async_range(10):
print(i)
# 异步推导式
async def main():
results = [x async for x in async_range(10) if x % 2 == 0]
print(results)
学习建议
- 理解协程概念,async/await的本质
- 掌握asyncio基本用法
- 学习FastAPI,异步Web框架
- 理解并发模型,选择合适的方案
- 实践异步IO操作,HTTP请求、数据库操作
下一步学习