FastAPI总结(二)
2025-03-19 19:40:57

FastAPI总结(二)

说明

在学习FastAPI的时候,我已经有了一些其他后端的使用经验,所以这篇笔记不是0基础开始的。

这部分总结分成一、二两部分:

  • 第一部分介绍FastAPI的路径、各种参数、依赖注入等关键技术细节。
  • 第二部分介绍安全性关系型数据库这两个重要的设计思想;还会介绍错误处理、中间件、后台任务、多文件项目、测试等辅助技术或设计

大纲

第一部分

  • 路径操作
  • 3种重要参数
  • 参数限定
  • Cookie, Header, Form, File等参数
  • 依赖注入

第二部分

  • 关系型数据库
  • 安全性
  • 多文件项目
  • 错误处理
  • 中间件
  • 后台任务
  • 其他杂项

简介

FastAPI 是一个用于构建 API 的轻量高性能web框架。

关键特性:

  • 快速:可与 NodeJSGo 并肩的极高性能(归功于 Starlette 和 Pydantic)。
  • 高效编码:提高功能开发速度约 200% 至 300%。
  • **并发:**Django和Flask默认的服务方式都是非并发,而FastAPI则良好支持并发。

关系型数据库

接下来介绍FastAPI如何联动SQL,这在技术上是简单的,但在设计上值得探讨。

我们使用sqlmodel来操作数据库,它基于 SQLAlchemy 和 Pydantic 。

安装:

1
pip install sqlmodel

创建模型

使用sqlmodel来创建一个模型,注意,我们目前还没有与数据库进行联动,而只是在服务端层面描述这个数据模型。

  • 如果类中显式定义了 tablename 属性,则使用该属性作为表名。
  • 如果没有定义 tablename,则使用 类名的小写形式 作为表名。
1
2
3
4
5
6
7
8
9
10
11
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select


class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
age: int | None = Field(default=None, index=True)
secret_name: str

Hero 类是一个基于 Pydantic 模型的类,但是它有所不同:

  • table=True 会告诉 SQLModel 这是一个表模型。
  • Field(primary_key=True) 会告诉 SQLModel id 是 SQL 数据库中的主键。
  • 把类型设置为 int | None ,SQLModel 就能知道该列在 SQL 数据库中应该是 INTEGER 类型,并且应该是 NULLABLE 。
  • Field(index=True) 会告诉 SQLModel 应该为此列创建一个 SQL 索引。

注意:后续,sqlmodel的引擎在进行数据库操作的时候,会根据操作的模型自动判断使用哪一个表,因为模型声明的时候就指定了表名。

  • 如果类中显式定义了 tablename 属性,则使用该属性作为表名。
  • 如果没有定义 tablename,则使用 类名的小写形式 作为表名。

总之,使用sqlmodel会给这个类添加上符合数据库记录的性质。

创建引擎

SQLModel 的引擎 engine(实际上它是一个 SQLAlchemy engine )是用来与数据库保持连接的。

您只需构建一个 engine,来让您的所有代码连接到同一个数据库。

1
2
3
4
5
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"

connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, connect_args=connect_args)

使用 check_same_thread=False 可以让 FastAPI 在不同线程中使用同一个 SQLite 数据库。这很有必要,因为单个请求可能会使用多个线程(例如在依赖项中)。

不用担心,FastAPI会按照代码结构确保每个请求使用一个单独的 SQLModel 会话,这实际上就是 check_same_thread 想要实现的。

创建表

1
2
def create_db_and_tables():
SQLModel.metadata.create_all(engine)

创建Session依赖项

Session 会存储内存中的对象并跟踪数据中所需更改的内容,然后它使用 engine 与数据库进行通信。

我们会使用 yield 创建一个 FastAPI 依赖项,为每个请求提供一个新的 Session 。这确保我们每个请求使用一个单独的会话。

然后我们创建一个 Annotated 的依赖项 SessionDep 来简化其他也会用到此依赖的代码。

1
2
3
4
5
6
def get_session():
with Session(engine) as session:
yield session


SessionDep = Annotated[Session, Depends(get_session)]

数据库操作

到此为止,我们已经完成了一切准备工作。

复述

  • 创建引擎,连接数据库
  • 然后使用一个会话依赖项来创建独立会话,创建会话过程中需要使用引擎
  • 创建表模型
  • app开始时就创建所有表
  • 接下来:进行操作

上面就是FastAPI中数据库连接的常见设计。

下面是一个完整的例子,展示了数据库的增删查操作。其他的操作你可以自行搜索文档,因为这是SQLAlchemy的技术,而不是FastAPI的技术。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select


class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
age: int | None = Field(default=None, index=True)
secret_name: str


sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"

connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, connect_args=connect_args)


def create_db_and_tables():
SQLModel.metadata.create_all(engine)


def get_session():
with Session(engine) as session:
yield session


SessionDep = Annotated[Session, Depends(get_session)]

app = FastAPI()


@app.on_event("startup")
def on_startup():
create_db_and_tables()


@app.post("/heroes/")
def create_hero(hero: Hero, session: SessionDep) -> Hero:
session.add(hero)
session.commit()
session.refresh(hero)
return hero


@app.get("/heroes/")
def read_heroes(
session: SessionDep,
offset: int = 0,
limit: Annotated[int, Query(le=100)] = 100,
) -> list[Hero]:
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes


@app.get("/heroes/{hero_id}")
def read_hero(hero_id: int, session: SessionDep) -> Hero:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero


@app.delete("/heroes/{hero_id}")
def delete_hero(hero_id: int, session: SessionDep):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}

安全性

同样的,安全性也是一个设计问题。

我们先来看设计思想,然后看实现代码

一个常用的安全性设计方案

img

这是 OAuth2 认证的一个简单流程。在该安全性过程中:

  1. 用户在浏览器端输入用户名和密码,浏览器将这些信息发送到服务器指定的 URL。
  2. 服务器验证用户名和密码后,生成一个令牌(token),并返回给浏览器。
  3. 之后浏览器在向服务器发起请求时,会在请求标头 Authorization 中带上 “Bearer + 令牌” 的格式,以表明自己的身份。
  4. 服务器通过验证令牌来确认用户身份。同时,令牌设有过期时间,到期后用户需重新登录,这样即使令牌被盗,风险也能降低 。

对于这一章的技术部分,你需要首先去学习哈希值JWT的概念;这并不是FastAPI的内容,因此不展开。

实现

这是一个完整的登陆 + 访问验证的代码实现,你需要耐心阅读,结合AI详细了解。

阅读顺序:

  1. 上半部分的导库和数据结构
  2. login_for_access_token的依赖路径,这部分是登录的功能
  3. read_users_me的依赖路径,这部分是访问验证功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from datetime import datetime, timedelta, timezone
from typing import Annotated

import jwt
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}


class Token(BaseModel):
access_token: str
token_type: str


class TokenData(BaseModel):
username: str | None = None


class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None


class UserInDB(User):
hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
return pwd_context.hash(password)


def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user


def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user


async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user


@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")


@app.get("/users/me/", response_model=User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return current_user


@app.get("/users/me/items/")
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return [{"item_id": "Foo", "owner": current_user.username}]

多文件项目

这里的技术类似于Flask中的Blueprint,Django中的路由机制。我们希望将体量较大的FastAPI项目拆分成多个文件,并有逻辑地紧密编排成一个项目。

一个简单的多文件项目可以分为以下结构:

  • 主APP
  • 依赖项文件或文件夹(取决于体量)
  • 路由文件夹

假设我们有如下项目结构:

1
2
3
4
5
6
7
8
9
10
11
12
.
├── app # 「app」是一个 Python 包
│ ├── __init__.py # 这个文件使「app」成为一个 Python 包
│ ├── main.py # 「main」模块,例如 import app.main
│ ├── dependencies.py # 「dependencies」模块,例如 import app.dependencies
│ └── routers # 「routers」是一个「Python 子包」
│ │ ├── __init__.py # 使「routers」成为一个「Python 子包」
│ │ ├── items.py # 「items」子模块,例如 import app.routers.items
│ │ └── users.py # 「users」子模块,例如 import app.routers.users
│ └── internal # 「internal」是一个「Python 子包」
│ ├── __init__.py # 使「internal」成为一个「Python 子包」
│ └── admin.py # 「admin」子模块,例如 import app.internal.admin

routers

  • 我们可以在路由文件夹中添加多个路由文件,在其中写入路径操作函数。
  • 我们可以使用APIRouter,为整个路由文件的所有路径操作函数添加一个相同的前缀,甚至是相同的一些元信息。
  • 我们同时可以从依赖文件导入需要的依赖函数

这样,我们可以根据不同的功能,创建独立的路由文件和依赖文件,实现:横向上功能之间独立,纵向上路由和依赖独立。

items.py示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from fastapi import APIRouter, Depends, HTTPException

from ..dependencies import get_token_header

router = APIRouter(
prefix="/items",
tags=["items"],
dependencies=[Depends(get_token_header)],
responses={404: {"description": "Not found"}},
)


fake_items_db = {"plumbus": {"name": "Plumbus"}, "gun": {"name": "Portal Gun"}}


@router.get("/")
async def read_items():
return fake_items_db


@router.get("/{item_id}")
async def read_item(item_id: str):
if item_id not in fake_items_db:
raise HTTPException(status_code=404, detail="Item not found")
return {"name": fake_items_db[item_id]["name"], "item_id": item_id}


@router.put(
"/{item_id}",
tags=["custom"],
responses={403: {"description": "Operation forbidden"}},
)
async def update_item(item_id: str):
if item_id != "plumbus":
raise HTTPException(
status_code=403, detail="You can only update the item: plumbus"
)
return {"item_id": item_id, "name": "The great Plumbus"}

至于dependencies.py,这里就不展开说明了,里面都是一些普通的依赖函数,不涉及项目结构的配置。

主体

主题也即main.py,我们需要在这个主APP中注册各种路由。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import Depends, FastAPI

from .dependencies import get_query_token, get_token_header
from .internal import admin
from .routers import items, users

app = FastAPI(dependencies=[Depends(get_query_token)])


app.include_router(users.router)
app.include_router(items.router)
app.include_router(
admin.router,
prefix="/admin",
tags=["admin"],
dependencies=[Depends(get_token_header)],
responses={418: {"description": "I'm a teapot"}},
)


@app.get("/")
async def root():
return {"message": "Hello Bigger Applications!"}

结构图

img

错误处理

HTTPException

向客户端返回 HTTP 错误响应,可以使用 HTTPException

返回错误时,你应该使用raise而不是return

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from fastapi import FastAPI, HTTPException

app = FastAPI()

items = {"foo": "The Foo Wrestlers"}


@app.get("/items-header/{item_id}")
async def read_item_header(item_id: str):
if item_id not in items:
raise HTTPException(
status_code=404,
detail="Item not found",
headers={"X-Error": "There goes my error"},
)
return {"item": items[item_id]}

自定义错误处理

添加自定义处理器,要使用 Starlette 的异常工具,假设要触发的自定义异常叫作 UnicornException。

可以用 @app.exception_handler() 添加自定义异常控制器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse


class UnicornException(Exception):
def __init__(self, name: str):
self.name = name


app = FastAPI()


@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
return JSONResponse(
status_code=418,
content={"message": f"Oops! {exc.name} did something. There goes a rainbow..."},
)


@app.get("/unicorns/{name}")
async def read_unicorn(name: str):
if name == "yolo":
raise UnicornException(name=name)
return {"unicorn_name": name}

状态码

当然,你可以在路径操作装饰器的位置写上应当返回的操作码:

1
2
3
4
5
6
7
8
from fastapi import FastAPI

app = FastAPI()


@app.post("/items/", status_code=201)
async def create_item(name: str):
return {"name": name}

中间件

中间件可以定义一个响应的请求前处理和请求后处理。

一个请求的处理过程:请求前中间件操作 => 路径处理 => 请求后中间件操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time

from fastapi import FastAPI, Request

app = FastAPI()


@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.perf_counter()
response = await call_next(request)
process_time = time.perf_counter() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response

call_next即为下一个中间件处理或者路径操作处理,这取决于中间件的添加顺序。

假设先后定义了a b c三个中间件,则请求的处理顺序为:a => b => c => 路径操作 => c => b => a

后台任务

你可以定义在返回响应后运行的后台任务。

这对需要在请求之后执行的操作很有用,但客户端不必在接收响应之前等待操作完成。

使用BackgroundTasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}

CORS 跨域资源共享

CORS 或者「跨域资源共享」 指浏览器中运行的前端拥有与后端通信的 JavaScript 代码,而后端处于与前端不同的「源」的情况。

源是协议(http,https)、域(myapp.com)以及端口(80)的组合。

因此,这些都是不同的源:

1
2
3
http://localhost
https://localhost
http://localhost:8080

即使它们都在 localhost 中,但是它们使用不同的协议或者端口,所以它们都是不同的「源」。

不同源的资源相互访问可能会受限。如何解决?使用 CORSMiddleware。

使用 CORSMiddleware

你可以在 FastAPI 应用中使用 CORSMiddleware 来配置它:

  • 导入 CORSMiddleware。
  • 将其作为「中间件」添加到你的 FastAPI 应用中。
  • 对它进行你想要的配置。(比如添加前端为允许的源)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

origins = [
"http://localhost.tiangolo.com",
"https://localhost.tiangolo.com",
"http://localhost",
"http://localhost:8080",
]

app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
  • allow_origins: 允许跨域请求的源列表,例如 [“http://localhost:8080“, “https://example.com"],默认 [],[“*”] 允许所有源。
  • allow_origin_regex: 通过正则表达式匹配允许跨域请求的源,例如 “https://.*.example.com”,默认 None。
  • allow_methods: 允许跨域请求的 HTTP 方法列表,例如 [“GET”, “POST”],默认 [“GET”],[“*”] 允许所有方法。
  • allow_headers: 允许跨域请求的 HTTP 请求头列表,例如 [“Authorization”, “Content-Type”],默认 [],[“*”] 允许所有请求头。
  • allow_credentials: 是否允许跨域请求携带凭证(如 cookies),默认 False,若为 True,allow_origins 不能为 [“*”]。
  • expose_headers: 浏览器可以访问的响应头列表,例如 [“X-Custom-Header”],默认 []。
  • max_age: 浏览器缓存 CORS 响应的最长时间(秒),默认 600。

常用类型转换操作

jsonable_encoder

将如Pydantic模型转换为与JSON兼容的数据类型(如dict、list等)。

json.dumps

将与JSON兼容的数据类型转换为JSON

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from datetime import datetime
from fastapi.encoders import jsonable_encoder
import json
from pydantic import BaseModel

# 定义 Pydantic 模型
class Item(BaseModel):
title: str
timestamp: datetime
description: str | None = None

# 创建 Pydantic 模型实例
item = Item(
title="My Item",
timestamp=datetime.now(),
description="A sample item"
)

# 使用 jsonable_encoder 转换为 JSON 兼容的 Python 对象
json_compatible_data = jsonable_encoder(item)

# 使用 json.dumps() 将 Python 对象转换为 JSON 字符串
json_string = json.dumps(json_compatible_data, indent=4)

# 打印结果
print("JSON 兼容的 Python 对象:", json_compatible_data)
print("JSON 字符串:", json_string)

Pydantic 的 exclude_unset 参数

更新部分数据时,可以在 Pydantic 模型的 .dict() 中使用 exclude_unset 参数。

比如,item.dict(exclude_unset=True)。这段代码生成的 dict 只包含创建 item 模型时显式设置的数据,而不包括默认值。

Pydantic 的 update 参数 和 **操作

  • update可以将一个新的键值对添加到字典中。
  • **可以将字典进行解包,形成没有包裹的键值对(这样的数据只可作为中间值,不能作为变量存储)。
1
2
3
4
5
6
7
8
@app.patch("/items/{item_id}", response_model=Item)
async def update_item(item_id: str, item: Item):
stored_item_data = items[item_id]
stored_item_model = Item(**stored_item_data)
update_data = item.dict(exclude_unset=True)
updated_item = stored_item_model.copy(update=update_data)
items[item_id] = jsonable_encoder(updated_item)
return updated_item