FastAPI 备忘清单

一个用于构建 API 的现代、快速(高性能)的 web 框架,使用 Python 3.6+ 并基于标准的 Python 类型提示

入门

安装 FastAPI

1
$ pip install "fastapi[all]"

可以分开来安装

假如你想将应用程序部署到生产环境,你可能要执行以下操作:

1
$ pip install fastapi

并且安装 uvicorn 来作为服务器:

1
$ pip install "uvicorn[standard]"

运行代码

1
$ uvicorn main:app --reload

Python: 3.9.5 FastAPI: 0.103.1

最小程序

下面代码会直接启动http服务,也可以使用 uvicorn main:app --reload

1
2
3
4
from fastapi import FastAPI
import uvicorn

app = FastAPI()

添加一个 API 的示例

1
2
3
4
5
6
7
# http://127.0.0.1:8000/
@app.get("/")
async def root():
return {"message": "Hello World"}

if __name__ == '__main__':
uvicorn.run(app='main:app', reload=True)

路径参数

最基本的路径参数

1
2
3
4
# http://127.0.0.1:8000/items/1
@app.get("/items/{item_id}")
async def read_item(item_id):
return {"item_id": item_id} # item_id自定义

多个路径参数

1
2
3
4
# http://127.0.0.1:8000/items/1/2
@app.get("/items/{item_id}/{user_id}")
async def read_item(item_id, user_id):
return {"item_id": item_id, "user_id": user_id}

有类型的路径参数

1
2
3
4
# http://127.0.0.1:8000/items/1
@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id}

文件路径参数

1
2
3
4
# http://127.0.0.1:8000/file//home/my/my.txt
@app.get("/file/{file_path:path}")
async def read_item(file_path):
return {"file_path": file_path}

查询参数

带默认值的查询参数

1
2
3
4
5
6
# http://127.0.0.1:8000/items/?skip=0&limit=2
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}]

@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
return fake_items_db[skip: skip + limit]

可选查询参数

1
2
3
4
5
6
7
8
# http://127.0.0.1:8000/items/1?q=admin
from typing import Union

@app.get("/items/{item_id}")
async def read_item(item_id: str, q: Union[str, None] = None):
if q:
return {"item_id": item_id, "q": q}
return {"item_id": item_id}

多路径多查询参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# http://127.0.0.1:8000/users/1/items/2
# or
# http://127.0.0.1:8000/users/1/items/2?q=query&short=true
@app.get("/users/{user_id}/items/{item_id}")
async def read_user_item(
user_id: int,
item_id: str,
q: Union[str, None] = None,
short: bool = False
):
item = {"item_id": item_id, "owner_id": user_id}
if q:
item.update({"q": q})
if not short:
item.update(
{"description": "这是一个令人惊叹的项目,有很长的描述"}
)
return item

必需查询参数

1
2
3
4
5
# http://127.0.0.1:8000/items/123?needy=yes
@app.get("/items/{item_id}")
async def read_user_item(item_id: str, needy: str):
item = {"item_id": item_id, "needy": needy}
return item

请求体

1
2
3
4
5
6
7
8
9
10
11
12
13
from pydantic import BaseModel
from typing import Union

class Item(BaseModel):
name: str = '小明'
description: Union[str, None] = None
price: float
tax: Union[float, None] = None

@app.post("/items/")
async def create_item(item: Item):
print(item.name)
return item

调用

1
2
3
4
5
6
7
8
9
10
curl -X 'POST' \
'http://127.0.0.1:8000/items/' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "小明",
"description": "string",
"price": 0,
"tax": 0
}'

查询参数和字符串校验

1
2
3
4
5
6
7
8
9
10
from fastapi import Query

@app.get("/items/")
async def read_items(
q: Union[str, None] = Query(default=None, max_length=50)
):
results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
if q:
results.update({"q": q})
return results

参数列表

参数 含义 类型
default 默认值 任意类型或…
max_length 最大长度 int
min_length 最小长度 int
pattern 正则匹配 string
alias 别名参数 string
deprecated 准备弃用参数 bool

多个相同的查询参数

1
2
3
4
5
6
7
# http://127.0.0.1:8000/items/?q=foo&q=bar
@app.get("/items/")
async def read_items(
q: Union[List[str], None] = Query(default=None)
):
query_items = {"q": q}
return query_items

路径参数和数值校验

Path 用法基本和 Query 相同,参考:FastAPI官方文档

导入 Path

1
2
from fastapi import FastAPI, Path, Query
from typing_extensions import Annotated

声明元数据

1
2
3
4
5
6
7
8
9
@app.get("/items/{item_id}")
async def read_items(
item_id: Annotated[int, Path(title="要获取的项目的 ID")],
q: Annotated[str | None, Query(alias="item-query")] = None,
):
results = {"item_id": item_id}
if q:
results.update({"q": q})
return results

参数列表

参数 含义 类型
... 和 Query 具有相同参数
ge 大于等于 int float
gt 大于 int float
le 小于等于 int float
le 小于等于 int float
title api文档的标题 string

其他参数

都具有 Query 的参数,max_lengthmin_length

Cookie参数

1
2
3
4
5
6
7
from fastapi import Cookie

@app.get("/items/")
async def read_items(
ads_id: Annotated[Union[str, None], Cookie()] = None
):
return {"ads_id": ads_id}

Header 参数

1
2
3
4
5
6
7
8
from fastapi import Header

@app.get("/items/")
async def read_items(
user_agent: Annotated[Union[str, None], Header()] = None,
items_id: Annotated[Union[int, None], Header(ge=1)] = None
):
return {"User-Agent": user_agent, "items_id": items_id}

表单数据

接收的不是 JSON,而是表单字段时,要使用 Form。

安装

1
$ pip install python-multipart

HTML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
</head>
<body>
<form method="post" action="http://127.0.0.1:8000/login">
<span>账号:</span><input type="text" name="username">
<br>
<span>密码:</span><input type="password" name="password">
<br>
<input type="submit" value="登录">
</form>
</body>
</html>

FastAPI

1
2
3
4
5
6
7
8
from fastapi import FastAPI, Form
import uvicorn
app = FastAPI()
@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):
return {"username": username}
if __name__ == '__main__':
uvicorn.run(app='main:app', reload=True)

文件上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from fastapi import FastAPI, UploadFile
from fastapi.responses import HTMLResponse

@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile):
print(file.file.read().decode())
return {"filenames": file.filename, "type": str(type(file.file))}

@app.get("/")
async def main():
content = """<body>
<form action="/uploadfile/" enctype="multipart/form-data" method="post">
<input name="file" type="file" multiple>
<input type="submit">
</form>
</body>"""
return HTMLResponse(content=content)

UploadFile 属性

属性名 含义 返回
filename 文件名 上传的文件名
content_type 内容类型 MIME 类型
file 文件 SpooledTemporaryFile 具有 readwrite 方法

UploadFile async 方法

方法名 含义
write(data) data 写入文件
read(size) 按指定数量的字节读取文件内容
seek(offset) 移动至文件 offsetint)字节处的位置
close() 关闭文件

依赖项

依赖项使用场景

  • 共享业务逻辑(复用相同的代码逻辑)
  • 共享数据库连接
  • 实现安全、验证、角色权限
  • 等……

创建依赖项

1
2
3
4
5
from typing import Union

from fastapi import Depends, FastAPI

app = FastAPI()

read_itemsread_users 方法依赖 common_parameters
白话就是 read_itemsread_users 都需要 qskiplimit 查询参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def common_parameters(
q: Union[str, None] = None,
skip: int = 0,
limit: int = 100
):
return {"q": q, "skip": skip, "limit": limit}


@app.get("/items/")
async def read_items(
commons: dict = Depends(common_parameters)
):
return commons


@app.get("/users/")
async def read_users(
commons: dict = Depends(common_parameters)
):
return commons

类作为依赖项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from typing import Union
from fastapi import Depends, FastAPI

app = FastAPI()
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}]

class CommonQueryParams:
def __init__(
self,
q: Union[str, None] = None,
skip: int = 0,
limit: int = 100
):
self.q = q
self.skip = skip
self.limit = limit

read_itemsx 接收一个 commons 参数,类型是 CommonQueryParams
CommonQueryParams 接收三个参数,这三个参数是调用 api 的时候传

1
2
3
4
5
6
7
8
9
10
@app.get("/items/")
async def read_items(
commons: CommonQueryParams = Depends(CommonQueryParams)
):
response = {}
if commons.q:
response.update({"q": commons.q})
items = fake_items_db[commons.skip : commons.skip + commons.limit]
response.update({"items": items})
return response

还可以简写

1
2
3
4
5
6
7
8
9
10
11
@app.get("/items/")
async def read_items(
# 这里的 Depends 没有传参,FastAPI 会自动使用 CommonQueryParams
commons: CommonQueryParams = Depends()
):
response = {}
if commons.q:
response.update({"q": commons.q})
items = fake_items_db[commons.skip : commons.skip + commons.limit]
response.update({"items": items})
return response

子依赖项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from typing import Union
from fastapi import Cookie, Depends, FastAPI

app = FastAPI()

def query_extractor(q: Union[str, None] = None):
return q

def query_or_cookie_extractor(
q: str = Depends(query_extractor),
last_query: Union[str, None] = Cookie(default=None),
):
if not q:
return last_query
return q

# read_query函数依赖query_or_cookie_extractor函数
# query_or_cookie_extractor函数又依赖query_extractor函数
# 就是说依赖项可以依赖其他依赖项,只要你不晕,可以无数次套娃
@app.get("/items/")
async def read_query(
query_or_default: str = Depends(query_or_cookie_extractor)
):
return {"q_or_cookie": query_or_default}

不使用缓存

使用 use_cache = False 参数不使用缓存数据,不使用 use_cache = Falsevaluevalue1 是一样的

1
2
3
4
5
6
7
8
9
10
11
12
13
def result_value():
value = randint(1, 99)
return value

def get_value(
value: int = Depends(result_value, use_cache=False),
value1: int = Depends(result_value, use_cache=False)
):
return value, value1

@app.get('/value/')
async def needy_dependency(value: tuple = Depends(get_value)):
return {"value": value}

全局依赖项

1
2
3
4
5
6
7
8
9
10
from fastapi import Depends, FastAPI, Header, HTTPException

async def verify_token(x_token: str = Header()):
if x_token != "fake-super-secret-token":
raise HTTPException(status_code=400, detail="X-Token 标头无效")

async def verify_key(x_key: str = Header()):
if x_key != "fake-super-secret-key":
raise HTTPException(status_code=400, detail="X-Key 标头无效")
return x_key

全局依赖项很有用,后面的安全性就可以使用全局依赖项

1
2
3
4
5
6
7
8
9
10
11
app = FastAPI(
dependencies=[Depends(verify_token), Depends(verify_key)]
)

@app.get("/items/")
async def read_items():
return [{"item": "Portal Gun"}, {"item": "Plumbus"}]

@app.get("/users/")
async def read_users():
return [{"username": "Rick"}, {"username": "Morty"}]

安全性

基于 Token 的认证

1
2
3
4
5
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

app = FastAPI()

使用 OAuth2PasswordBearer 创建一个 token 依赖

1
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

假设这是你的用户数据库

1
2
3
4
5
6
7
8
9
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
}
}

创建一个用户模型

1
2
3
4
5
class User(BaseModel):
username: str
email: str
full_name: str
disabled: bool

创建一个简单的认证函数

1
2
3
4
5
6
7
8
9
10
11
12
def fake_hash_password(password: str):
return "fakehashed" + password

def get_user(db, username: str):
if username in db:
user_dict = db[username]
return User(**user_dict)

def fake_decode_token(token: str):
# 这个函数应该验证 token 并返回用户信息
# 这里我们只是简单地返回了用户名
return get_user(fake_users_db, token)

创建一个依赖,用于从请求中获取 token 并验证用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=401,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = get_user(fake_users_db, form_data.username)
if not user or user.hashed_password != fake_hash_password(form_data.password):
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user

使用 OAuth2PasswordBearer 来创建一个简单的 token 认证流程。

HTTPS 和证书

1
2
3
from fastapi import FastAPI

app = FastAPI()

在生产环境中,你应该使用一个真正的证书和私钥,你可以从像 Let’s Encrypt 这样的证书颁发机构获得免费的证书,或者使用 OpenSSL 生成自签名证书

1
2
3
@app.get("/https")
async def read_https():
return {"message": "Hello, HTTPS!"}

启动服务器时,使用以下命令来指定证书和私钥:

1
uvicorn main:app --host 0.0.0.0 --port 443 --ssl-keyfile /path/to/your/key.pem --ssl-certfile /path/to/your/cert.pem

FastAPI 默认支持 HTTPS,你只需要提供证书和私钥即可。

待更新

参考