news 2026/5/7 22:17:00

FastAPI 数据库集成

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
FastAPI 数据库集成

FastAPI 数据库集成学习笔记

FastAPI 本身不绑定特定数据库,但通过SQLAlchemy(ORM) 或SQLModel(SQLAlchemy 的扩展) 可以非常优雅地集成关系型数据库(PostgreSQL, MySQL, SQLite)。对于 NoSQL,通常使用Motor(MongoDB) 或Redis客户端。

本笔记重点讲解SQLAlchemy 2.0+ (Async)SQLModel的集成方案,这是目前最主流和推荐的方式。


一、核心架构

FastAPI 数据库集成的标准模式是依赖注入 (Dependency Injection)管理数据库会话:

请求 → 依赖函数 (get_db) → 创建 Session → 执行业务逻辑 → 提交/回滚 → 关闭 Session

关键组件

  1. Engine: 数据库连接池,管理底层连接。
  2. Session: 一次数据库操作的上下文,管理事务。
  3. Model: 数据库表结构定义 (ORM)。
  4. Dependency: 提供 Session 给路由,确保自动关闭。

二、方案 A:使用 SQLAlchemy (原生 ORM)

1. 安装依赖

pipinstall"sqlalchemy[asyncio]""aiosqlite""asyncpg""databases"# 如果是 MySQL: pip install "mysql-async"# 如果是 PostgreSQL: pip install asyncpg

2. 数据库配置 (database.py)

fromsqlalchemy.ext.asyncioimportcreate_async_engine,AsyncSession,async_sessionmakerfromsqlalchemy.ormimportdeclarative_base# 数据库 URL# SQLite: sqlite+aiosqlite:///./test.db# PostgreSQL: postgresql+asyncpg://user:pass@localhost/dbname# MySQL: mysql+aiomysql://user:pass@localhost/dbnameDATABASE_URL="sqlite+aiosqlite:///./test.db"# 创建引擎 (异步)engine=create_async_engine(DATABASE_URL,echo=True,# 打印 SQL 日志,生产环境设为 Falsefuture=True,)# 创建会话工厂AsyncSessionLocal=async_sessionmaker(engine,class_=AsyncSession,expire_on_commit=False,autocommit=False,autoflush=False,)# 基类,用于定义模型Base=declarative_base()# 依赖函数:获取 Sessionasyncdefget_db():asyncwithAsyncSessionLocal()assession:try:yieldsessionawaitsession.commit()# 成功则提交exceptException:awaitsession.rollback()# 失败则回滚raisefinally:awaitsession.close()

3. 定义模型 (models.py)

fromsqlalchemyimportColumn,Integer,String,DateTimefromsqlalchemy.sqlimportfuncfromdatabaseimportBaseclassUser(Base):__tablename__="users"id=Column(Integer,primary_key=True,index=True)email=Column(String,unique=True,index=True,nullable=False)username=Column(String,unique=True,index=True,nullable=False)hashed_password=Column(String,nullable=False)is_active=Column(Integer,default=1)created_at=Column(DateTime(timezone=True),server_default=func.now())

4. 创建表 (main.py)

fromfastapiimportFastAPIfromdatabaseimportengine,BasefrommodelsimportUser app=FastAPI()@app.on_event("startup")asyncdefstartup():# 异步创建表asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.create_all)

5. 路由操作 (routers/users.py)

fromfastapiimportAPIRouter,Depends,HTTPException,statusfromsqlalchemy.ext.asyncioimportAsyncSessionfromsqlalchemyimportselectfromtypingimportListfromdatabaseimportget_dbfrommodelsimportUserfrompydanticimportBaseModel router=APIRouter(prefix="/users",tags=["users"])# Pydantic 模型classUserCreate(BaseModel):email:strusername:strpassword:strclassUserResponse(BaseModel):id:intemail:strusername:stris_active:boolclassConfig:from_attributes=True# SQLAlchemy 2.0 替代 orm_mode@router.post("/",response_model=UserResponse)asyncdefcreate_user(user:UserCreate,db:AsyncSession=Depends(get_db)):# 检查是否存在result=awaitdb.execute(select(User).where(User.email==user.email))ifresult.scalar_one_or_none():raiseHTTPException(status_code=400,detail="Email already registered")# 创建新对象db_user=User(email=user.email,username=user.username,hashed_password=user.password# 实际应哈希)db.add(db_user)awaitdb.commit()awaitdb.refresh(db_user)returndb_user@router.get("/",response_model=List[UserResponse])asyncdefread_users(skip:int=0,limit:int=10,db:AsyncSession=Depends(get_db)):result=awaitdb.execute(select(User).offset(skip).limit(limit))users=result.scalars().all()returnusers@router.get("/{user_id}",response_model=UserResponse)asyncdefread_user(user_id:int,db:AsyncSession=Depends(get_db)):result=awaitdb.execute(select(User).where(User.id==user_id))user=result.scalar_one_or_none()ifnotuser:raiseHTTPException(status_code=404,detail="User not found")returnuser

三、方案 B:使用 SQLModel (推荐)

SQLModel是 FastAPI 作者开发的库,无缝集成 SQLAlchemy + Pydantic,代码更简洁,类型提示更友好。

1. 安装

pipinstallsqlmodel

2. 配置 (database.py)

fromsqlmodelimportSQLModel,create_engine,SessionfromtypingimportGenerator# 注意:SQLModel 默认使用同步引擎,但也可以配置异步# 这里演示同步 Session (FastAPI 对同步支持也很好,性能差异不大)SQLITE_URL="sqlite:///./test.db"engine=create_engine(SQLITE_URL,echo=True,connect_args={"check_same_thread":False})defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defget_session()->Generator[Session,None,None]:withSession(engine)assession:yieldsession

3. 定义模型 (models.py)

fromtypingimportOptionalfromsqlmodelimportSQLModel,FieldclassUser(SQLModel,table=True):__tablename__="users"id:Optional[int]=Field(default=None,primary_key=True)email:str=Field(unique=True,index=True)username:str=Field(unique=True,index=True)hashed_password:stris_active:bool=True

4. 路由 (routers/users.py)

fromfastapiimportAPIRouter,Depends,HTTPException,statusfromsqlmodelimportSession,selectfromtypingimportListfromdatabaseimportget_sessionfrommodelsimportUser,UserCreate# 假设 UserCreate 是纯 Pydantic 模型router=APIRouter(prefix="/users",tags=["users"])@router.post("/",response_model=User)defcreate_user(user:UserCreate,session:Session=Depends(get_session)):# 检查存在existing=session.exec(select(User).where(User.email==user.email)).first()ifexisting:raiseHTTPException(status_code=400,detail="Email exists")db_user=User(**user.dict())session.add(db_user)session.commit()session.refresh(db_user)returndb_user@router.get("/",response_model=List[User])defread_users(skip:int=0,limit:int=10,session:Session=Depends(get_session)):users=session.exec(select(User).offset(skip).limit(limit)).all()returnusers

优势SQLModel自动处理from_attributes,无需手动配置 Pydantic 的orm_mode


四、异步 vs 同步

特性异步 (Async)同步 (Sync)
依赖sqlalchemy[asyncio],aiosqlite,asyncpgsqlalchemy
性能高并发下更优,不阻塞事件循环简单场景足够,代码更易读
代码风格async/await普通函数
适用场景高并发 I/O 密集型一般业务,快速开发
FastAPI 支持完美支持完美支持 (自动在线程池运行)

建议

  • 新项目优先尝试SQLModel (同步),开发效率最高。
  • 高并发场景或已有异步生态,使用SQLAlchemy Async

五、数据库迁移 (Alembic)

生产环境不能直接create_all,需要使用Alembic进行版本控制迁移。

1. 安装与初始化

pipinstallalembic alembic init alembic

2. 配置alembic.inienv.py

alembic/env.py中配置target_metadatasqlalchemy.url

# env.pyfromsqlmodelimportSQLModelfromdatabaseimportengine# 确保 target_metadata 指向你的 Base 或 SQLModel.metadatatarget_metadata=SQLModel.metadata# 配置 URLconfig.set_main_option("sqlalchemy.url","sqlite:///./test.db")

3. 生成迁移

# 生成迁移脚本alembic revision--autogenerate-m"Initial migration"# 应用迁移alembic upgradehead

4. 回滚

alembic downgrade-1

六、事务管理

1. 自动提交 (依赖注入中)

get_db依赖中,yield后自动commit,异常时rollback

2. 手动控制事务

fromfastapiimportDependsfromsqlalchemy.ext.asyncioimportAsyncSessionfromsqlalchemyimportselect@router.post("/batch")asyncdefbatch_create(users:List[UserCreate],db:AsyncSession=Depends(get_db)):try:foruinusers:db_user=User(**u.dict())db.add(db_user)awaitdb.commit()# 全部成功才提交return{"msg":"All created"}exceptException:awaitdb.rollback()raise

七、常见错误与排查

错误原因解决方案
sqlite3.OperationalError: database is locked同步 SQLite 并发写冲突使用check_same_thread=False或改用 PostgreSQL
RuntimeError: This event loop is already running在异步环境中使用同步驱动改用aiosqlite/asyncpg
DetachedInstanceErrorSession 关闭后访问对象使用lazy="joined"session.refresh()
Column not found模型修改后未迁移运行alembic revision --autogenerate
Circular dependency模型间循环引用使用ForeignKey字符串引用,或relationshipback_populates

八、最佳实践总结

  1. 使用依赖注入:永远通过Depends(get_db)获取 Session,不要全局单例。
  2. 分离模型
    • ORM Model: 对应数据库表 (SQLAlchemy/SQLModel)。
    • Pydantic Model: 对应请求/响应 (不含敏感字段如hashed_password)。
  3. 异步优先:高并发场景使用AsyncSession
  4. 迁移管理:生产环境必须使用 Alembic,禁止自动建表。
  5. 连接池:配置合理的pool_sizemax_overflow
  6. 错误处理:捕获数据库异常,返回友好的 HTTP 错误。

九、完整项目结构示例

project/ ├── app/ │ ├── __init__.py │ ├── main.py # 启动,注册路由,创建表 │ ├── config.py # 配置 (DB_URL, SECRET_KEY) │ ├── database.py # Engine, Session, get_db │ ├── models.py # SQLAlchemy/SQLModel 模型 │ ├── schemas.py # Pydantic 请求/响应模型 │ ├── dependencies.py # 认证、权限等依赖 │ └── routers/ │ ├── auth.py │ └── users.py ├── alembic/ # 迁移脚本 ├── alembic.ini └── requirements.txt

通过这种结构,FastAPI 项目可以保持高可维护性、类型安全和良好的扩展性。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/7 22:07:33

如何利用宝塔面板进行数据迁移_使用宝塔整机备份功能

整机备份前必须确认三件事:检查www用户是否存在并创建、确认备份目录权限为www:www、确保gzip和tar命令可用;否则备份会卡在“正在打包”或生成空包。整机备份前必须确认的三件事宝塔的整机备份不是点一下就完事的“全自动”,它依赖底层权限、…

作者头像 李华