
关于
使用 SQLAlchemy 实现 Python 数据库模式,涵盖 ORM 模型、迁移、连接池和查询优化。
name: python-database-ops description: "SQLAlchemy 和 Python 数据库模式。触发条件:sqlalchemy、database、orm、migration、alembic、async database、connection pool、repository pattern、unit of work。" license: MIT compatibility: "SQLAlchemy 2.0+, Python 3.10+。异步需要 asyncpg(PostgreSQL)或 aiosqlite。" allowed-tools: "Read Write Bash" metadata: author: claude-mods depends-on: python-typing-ops, python-async-ops related-skills: python-fastapi-ops, postgres-ops
Python 数据库模式
SQLAlchemy 2.0 和数据库最佳实践。
SQLAlchemy 2.0 基础
from sqlalchemy import create_engine, select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
email: Mapped[str] = mapped_column(String(255), unique=True)
is_active: Mapped[bool] = mapped_column(default=True)
# Create engine and tables
engine = create_engine("postgresql://user:pass@localhost/db")
Base.metadata.create_all(engine)
# Query with 2.0 style
with Session(engine) as session:
stmt = select(User).where(User.is_active == True)
users = session.execute(stmt).scalars().all()
异步 SQLAlchemy
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy import select
# Async engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
echo=False,
pool_size=5,
max_overflow=10,
)
# Session factory
async_session = async_sessionmaker(engine, expire_on_commit=False)
# Usage
async with async_session() as session:
result = await session.execute(select(User).where(User.id == 1))
user = result.scalar_one_or_none()
模型关系
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship, Mapped, mapped_column
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
# 一对多
posts: Mapped[list["Post"]] = relationship(back_populates="author")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# 多对一
author: Mapped["User"] = relationship(back_populates="posts")
常用查询模式
from sqlalchemy import select, and_, or_, func
# 基本查询
stmt = select(User).where(User.is_active == True)
# 多条件
stmt = select(User).where(
and_(
User.is_active == True,
User.age >= 18
)
)
# OR 条件
stmt = select(User).where(
or_(User.role == "admin", User.role == "moderator")
)
# 排序和限制
stmt = select(User).order_by(User.created_at.desc()).limit(10)
# 聚合
stmt = select(func.count(User.id)).where(User.is_active == True)
# 连接
stmt = select(User, Post).join(Post, User.id == Post.author_id)
# 预加载
from sqlalchemy.orm import selectinload
stmt = select(User).options(selectinload(User.posts))
FastAPI 集成
from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
yield session
DB = Annotated[AsyncSession, Depends(get_db)]
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: DB):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404)
return user
快速参考
| 操作 | SQLAlchemy 2.0 风格 |
|-----------|---------------------|
| 查询全部 | select(User) |
| 过滤 | .where(User.id == 1) |
| 第一条 | .scalar_one_or_none() |
| 全部 | .scalars().all() |
| 计数 | select(func.count(User.id)) |
| 连接 | .join(Post) |
| 预加载 | .options(selectinload(User.posts)) |
附加资源
./references/sqlalchemy-async.md- 异步模式、会话管理./references/connection-pooling.md- 连接池配置、健康检查./references/transactions.md- 事务模式、隔离级别./references/migrations.md- Alembic 设置、迁移策略
另请参阅
前置条件:
python-typing-ops- Mapped 类型和注解python-async-ops- 异步数据库会话
相关技能:
python-fastapi-ops- 数据库会话的依赖注入python-pytest-ops- 数据库 fixtures 和测试
兼容工具
Claude CodeCursorGitHub Copilot
标签
数据工程
