Python 高级教程 — 异步编程与并发

前置要求:Python基础
学习时长:约2-3天
适用场景:高并发服务、爬虫、实时应用

一、异步编程基础

1.1 什么是异步?

# 同步:阻塞等待
import time

def sync_task():
    time.sleep(1)  # 阻塞1秒
    return "完成"

# 异步:非阻塞
import asyncio

async def async_task():
    await asyncio.sleep(1)  # 非阻塞
    return "完成"

1.2 asyncio基础

import asyncio

# 定义协程
async def say_hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 运行协程
asyncio.run(say_hello())

# 并发执行多个任务
async def main():
    # 方式1:gather
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3)
    )
    
    # 方式2:TaskGroup (Python 3.11+)
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_user(1))
        task2 = tg.create_task(fetch_user(2))
        task3 = tg.create_task(fetch_user(3))
    
    # 获取结果
    print(task1.result(), task2.result(), task3.result())

asyncio.run(main())


二、aiohttp异步HTTP

import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    urls = [
        'https://api.example.com/users/1',
        'https://api.example.com/users/2',
        'https://api.example.com/users/3',
    ]
    
    async with aiohttp.ClientSession() as session:
        # 并发请求
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for result in results:
            print(result)

asyncio.run(main())

2.1 限流并发

import asyncio
import aiohttp

async def fetch_with_limit(urls, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_one(session, url):
        async with semaphore:
            async with session.get(url) as response:
                return await response.json()
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# 使用
urls = [f'https://api.example.com/users/{i}' for i in range(100)]
results = asyncio.run(fetch_with_limit(urls, max_concurrent=10))


三、FastAPI异步框架

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import asyncio

app = FastAPI()

# 异步数据库操作
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "mysql+aiomysql://user:password@localhost/db"
engine = create_async_engine(DATABASE_URL)
async_session = sessionmaker(engine, class_=AsyncSession)

# 模型
class UserCreate(BaseModel):
    username: str
    email: str

class UserResponse(BaseModel):
    id: int
    username: str
    email: str

# 异步接口
@app.get("/api/users", response_model=list[UserResponse])
async def get_users():
    async with async_session() as session:
        result = await session.execute("SELECT * FROM users")
        return result.fetchall()

@app.post("/api/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
    async with async_session() as session:
        # 异步插入
        result = await session.execute(
            "INSERT INTO users (username, email) VALUES (:username, :email) RETURNING id",
            {"username": user.username, "email": user.email}
        )
        await session.commit()
        return {**user.dict(), "id": result.scalar()}

@app.get("/api/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    async with async_session() as session:
        result = await session.execute(
            "SELECT * FROM users WHERE id = :id",
            {"id": user_id}
        )
        user = result.first()
        if not user:
            raise HTTPException(status_code=404, detail="用户不存在")
        return user


四、异步数据库(SQLAlchemy)

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"

engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True)
    email = Column(String(100), unique=True)

# 异步CRUD
async def create_user(username: str, email: str):
    async with async_session() as session:
        user = User(username=username, email=email)
        session.add(user)
        await session.commit()
        return user

async def get_user(user_id: int):
    async with async_session() as session:
        return await session.get(User, user_id)

async def get_users():
    async with async_session() as session:
        result = await session.execute("SELECT * FROM users")
        return result.scalars().all()


五、并发模型对比

# 1. 多线程(适合IO密集型)
import threading

def thread_task():
    import requests
    return requests.get('https://api.example.com')

threads = [threading.Thread(target=thread_task) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

# 2. 多进程(适合CPU密集型)
from multiprocessing import Pool

def cpu_task(n):
    return sum(range(n))

with Pool(4) as p:
    results = p.map(cpu_task, [10**7] * 4)

# 3. 异步(适合IO密集型)
import asyncio
import aiohttp

async def async_task():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com') as resp:
            return await resp.json()

asyncio.run(async_task())


六、异步上下文管理器

# 自定义异步上下文管理器
class AsyncDatabase:
    async def __aenter__(self):
        self.conn = await async_connect()
        return self.conn
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.conn.close()

async def main():
    async with AsyncDatabase() as conn:
        await conn.execute("SELECT * FROM users")


七、异步生成器

# 异步生成器
async def async_range(n):
    for i in range(n):
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for i in async_range(10):
        print(i)

# 异步推导式
async def main():
    results = [x async for x in async_range(10) if x % 2 == 0]
    print(results)


学习建议

  1. 理解协程概念,async/await的本质
  2. 掌握asyncio基本用法
  3. 学习FastAPI,异步Web框架
  4. 理解并发模型,选择合适的方案
  5. 实践异步IO操作,HTTP请求、数据库操作

下一步学习

返回首页