FastAPI Best Practice¶
๐จ๐จ๐จ๐จ๐จ์์ ์ค...๐จ๐จ๐จ๐จ๐จ¶
1. Project Structure. Consistent & predictable¶
์ผ๊ด์ฑ ์๊ณ ์์ธก ๊ฐ๋ฅํ ํ๋ก์ ํธ ๊ตฌ์กฐ
fastapi-project
โโโ alembic/
โโโ src
โ โโโ auth
โ โ โโโ router.py
โ โ โโโ schemas.py # pydantic models
โ โ โโโ models.py # db models
โ โ โโโ dependencies.py
โ โ โโโ config.py # local configs
โ โ โโโ constants.py
โ โ โโโ exceptions.py
โ โ โโโ service.py
โ โ โโโ utils.py
โ โโโ aws
โ โ โโโ client.py # client model for external service communication
โ โ โโโ schemas.py
โ โ โโโ config.py
โ โ โโโ constants.py
โ โ โโโ exceptions.py
โ โ โโโ utils.py
โ โโโ posts
โ โ โโโ router.py
โ โ โโโ schemas.py
โ โ โโโ models.py
โ โ โโโ dependencies.py
โ โ โโโ constants.py
โ โ โโโ exceptions.py
โ โ โโโ service.py
โ โ โโโ utils.py
โ โโโ config.py # global configs
โ โโโ models.py # global models
โ โโโ exceptions.py # global exceptions
โ โโโ pagination.py # global module e.g. pagination
โ โโโ database.py # db connection related stuff
โ โโโ main.py
โโโ tests/
โ โโโ auth
โ โโโ aws
โ โโโ posts
โโโ templates/
โ โโโ index.html
โโโ requirements
โ โโโ base.txt
โ โโโ dev.txt
โ โโโ prod.txt
โโโ .env
โโโ .gitignore
โโโ logging.ini
โโโ alembic.ini
-
src
Folder : ๋ชจ๋ ๋๋ฉ์ธ ๋๋ ํฐ๋ฆฌ ์ ์ฅsrc/
: highest level of an app, contains common models, configs, and constants, etc.src/main.py
: root of the project, which inits the FastAPI app
-
๊ฐ๊ฐ์ package๋ router, schema, model ๋ฑ์ผ๋ก ๊ตฌ์ฑ
router.py
: Is a core of each module with all the endpointsschemas.py
: For pydantic modelsmodels.py
: For DB modelsservice.py
: Module specific business logicdependencies.py
: Router dependenciesconstants.py
: Module specific constants and error codesconfig.py
: e.g. Env varsutils.py
: Non-business logic functions (response normalization, data enrichment, ...)exceptions
: Module specific exceptions (PostNotFound, InvalidUserData, ...)
-
ํจํค์ง์ "๋ค๋ฅธ ํจํค์ง service/dependency/constant"๊ฐ ํ์ํ ๊ฒฝ์ฐ๋ผ๋ฉด, Import them with an explicit module name
2. Excessively use Pydantic for data validation¶
Pydantic์ ๋ฐ์ดํฐ๋ฅผ ๊ฒ์ฆํ๊ณ ๋ณํํ ์ ์๋ ํ๋ถํ ๊ธฐ๋ฅ์ ์ง์ํ๋ค
- Regular features like required & non-required fields with default values
- Comprehensive data processing tools like regex
- Enums for limited allowed options
- Length validation
- Email validation
- etc ...
Example
from enum import Enum from pydantic import AnyUrl, BaseModel, EmailStr, Field, constr class MusicBand(str, Enum): AEROSMITH = "AEROSMITH" QUEEN = "QUEEN" ACDC = "AC/DC" class UserBase(BaseModel): first_name: str = Field(min_length=1, max_length=128) username: constr(regex="^[A-Za-z0-9-_]+$", to_lower=True, strip_whitespace=True) email: EmailStr age: int = Field(ge=18, default=None) # must be greater or equal to 18 favorite_band: MusicBand = None # only "AEROSMITH", "QUEEN", "AC/DC" values are allowed to be inputted website: AnyUrl = None
3. Use dependencies for data validation vs DB¶
Pydantic์ Client input๋ง ๊ฒ์ฆํ ์ ์๋ค
๋ฐ๋ผ์, ์ ์ ๋ฉ์ผ์ด ์ด๋ฏธ ์กด์ฌํ๊ฑฐ๋ ์ฌ์ฉ์๋ฅผ ์ฐพ์ ์ ์๋ ๋ฑ๋ฑ..
๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ์ฝ์กฐ๊ฑด์ ๋ํ ๋ฐ์ดํฐ ์ ํจ์ฑ ๊ฒ์ฆ์ Dependency๋ฅผ ์ฌ์ฉํ๋ค
# dependencies.py
async def valid_post_id(post_id: UUID4) -> Mapping:
post = await service.get_by_id(post_id)
if not post:
raise PostNotFound()
return post
# router.py
@router.get("/posts/{post_id}", response_model=PostResponse)
async def get_post_by_id(post: Mapping = Depends(valid_post_id)):
return post
@router.put("/posts/{post_id}", response_model=PostResponse)
async def update_post(
update_data: PostUpdate,
post: Mapping = Depends(valid_post_id),
):
updated_post: Mapping = await service.update(id=post["id"], data=update_data)
return updated_post
@router.get("/posts/{post_id}/reviews", response_model=list[ReviewsResponse])
async def get_post_reviews(post: Mapping = Depends(valid_post_id)):
post_reviews: list[Mapping] = await reviews_service.get_by_post_id(post["id"])
return post_reviews
4. Chain dependencies¶
Dependency๋ ๋ค๋ฅธ dependency๋ฅผ ์ฌ์ฉํ ์ ์๊ณ , ์ ์ฌ logic์ ๋ํ ์ฝ๋ ๋ฐ๋ณต์ ํผํ ์ ์๋ค
# dependencies.py
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
async def valid_post_id(post_id: UUID4) -> Mapping:
post = await service.get_by_id(post_id)
if not post:
raise PostNotFound()
return post
async def parse_jwt_data(
token: str = Depends(OAuth2PasswordBearer(tokenUrl="/auth/token"))
) -> dict:
try:
payload = jwt.decode(token, "JWT_SECRET", algorithms=["HS256"])
except JWTError:
raise InvalidCredentials()
return {"user_id": payload["id"]}
async def valid_owned_post(
post: Mapping = Depends(valid_post_id),
token_data: dict = Depends(parse_jwt_data),
) -> Mapping:
if post["creator_id"] != token_data["user_id"]:
raise UserNotOwner()
return post
# router.py
@router.get("/users/{user_id}/posts/{post_id}", response_model=PostResponse)
async def get_user_post(post: Mapping = Depends(valid_owned_post)):
return post
5. Decouple & Reuse dependencies. Dependency calls are cached¶
Dependency๋ ์ฌ๋ฌ ๋ฒ ์ฌ์ฌ์ฉํ ์ ์์ง๋ง, ์ฌ๊ฒํ ๋์ง๋ ์๋๋ค
FastAPI๋ request ๋ฒ์ ๋ด์์ dependency ๊ฒฐ๊ณผ๋ฅผ cacheํ ํ๋ ๊ฒ์ด default ์ค์ ์ด๊ธฐ ๋๋ฌธ์ด๋ค
๋ง์ฝ, Service-get_post_by_id
๋ฅผ ํธ์ถํ๋ dependency์ ๊ฒฝ์ฐ, ํด๋น dependency๋ฅผ ํธ์ถํ ๋๋ง๋ค DB๋ฅผ ๊ฒ์ํ์ง ์๋๋ค
์ฒซ๋ฒ์งธ ํธ์ถ์์๋ง DB๋ฅผ ๊ฒ์ํ๋ค
์ด๋ฅผ ์๋ฉด, Multiple smaller function์ ๋ํ dependency๋ฅผ ์ฝ๊ฒ ๋ถ๋ฆฌํ ์ ์๋ค
valid_owned_post
valid_active_creator
get_user_post
butparse_jwt_data
is called only once, in the very first call
# dependencies.py
from fastapi import BackgroundTasks
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
async def valid_post_id(post_id: UUID4) -> Mapping:
post = await service.get_by_id(post_id)
if not post:
raise PostNotFound()
return post
async def parse_jwt_data(
token: str = Depends(OAuth2PasswordBearer(tokenUrl="/auth/token"))
) -> dict:
try:
payload = jwt.decode(token, "JWT_SECRET", algorithms=["HS256"])
except JWTError:
raise InvalidCredentials()
return {"user_id": payload["id"]}
async def valid_owned_post(
post: Mapping = Depends(valid_post_id),
token_data: dict = Depends(parse_jwt_data),
) -> Mapping:
if post["creator_id"] != token_data["user_id"]:
raise UserNotOwner()
return post
async def valid_active_creator(
token_data: dict = Depends(parse_jwt_data),
):
user = await users_service.get_by_id(token_data["user_id"])
if not user["is_active"]:
raise UserIsBanned()
if not user["is_creator"]:
raise UserNotCreator()
return user
# router.py
@router.get("/users/{user_id}/posts/{post_id}", response_model=PostResponse)
async def get_user_post(
worker: BackgroundTasks,
post: Mapping = Depends(valid_owned_post),
user: Mapping = Depends(valid_active_creator),
):
"""Get post that belong the active user."""
worker.add_task(notifications_service.send_email, user["id"])
return post
6. Follow the REST¶
Restful API๋ ๋ค์๊ณผ ๊ฐ์ ๊ฒฝ๋ก์์ dependency๋ฅผ ์ฝ๊ฒ ์ฌ์ฌ์ฉํ ์ ์๋ค
GET /courses/:course_id
GET /courses/:course_id/chapters/:chapter_id/lessons
GET /chapters/:chapter_id
๋จ, ๊ฒฝ๋ก์ ๋์ผํ ๋ณ์ ์ด๋ฆ์ ์ฌ์ฉํด์ผ ํ๋ค
# src.profiles.dependencies
async def valid_profile_id(profile_id: UUID4) -> Mapping:
profile = await service.get_by_id(post_id)
if not profile:
raise ProfileNotFound()
return profile
# src.creators.dependencies
async def valid_creator_id(profile: Mapping = Depends(valid_profile_id)) -> Mapping:
if not profile["is_creator"]:
raise ProfileNotCreator()
return profile
# src.profiles.router.py
@router.get("/profiles/{profile_id}", response_model=ProfileResponse)
async def get_user_profile_by_id(profile: Mapping = Depends(valid_profile_id)):
"""Get profile by id."""
return profile
# src.creators.router.py
@router.get("/creators/{profile_id}", response_model=ProfileResponse)
async def get_user_profile_by_id(
creator_profile: Mapping = Depends(valid_creator_id)
):
"""Get creator's profile by id."""
return creator_profile
Use /me
endpoints for users resources (GET /profiles/me
,GET /users/me/posts
)
- ์ฌ์ฉ์ ID์ ์กด์ฌ ์ฌ๋ถ๋ฅผ ๊ฒ์ฆํ ํ์ ์์ - ์ธ์ฆ๋ฐฉ๋ฒ์ ํตํด ์ด๋ฏธ ํ์ธ
- ์ฌ์ฉ์ID๊ฐ ์์ฒญ์์ ๊ฒ์ธ์ง ํ์ธํ ํ์ ์์
7. Don't make your routes async, if you have only blocking I/O operations¶
Under the hood, FastAPI๋ async&sync I/O ์์ ์ ๋ชจ๋ ํจ๊ณผ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์๋ค.
- FastAPI runs
sync
routes in the threadpool and blocking I/O operations won't stop the event loop from executing the tasks. - Otherwise, if the route is defined
async
then it's called regularly viaawait
and FastAPI trusts you to do only non-blocking I/O operations.
The caveat is if you fail that trust and execute blocking operations within async routes, the event loop will not be able to run the next tasks until that blocking operation is done.
import asyncio
import time
@router.get("/terrible-ping")
async def terrible_catastrophic_ping():
time.sleep(10) # I/O blocking operation for 10 seconds
pong = service.get_pong() # I/O blocking operation to get pong from DB
return {"pong": pong}
@router.get("/good-ping")
def good_ping():
time.sleep(10) # I/O blocking operation for 10 seconds, but in another thread
pong = service.get_pong() # I/O blocking operation to get pong from DB, but in another thread
return {"pong": pong}
@router.get("/perfect-ping")
async def perfect_ping():
await asyncio.sleep(10) # non-blocking I/O operation
pong = await service.async_get_pong() # non-blocking I/O db call
return {"pong": pong}
What happens when we call:
GET /terrible-ping
- FastAPI ์๋ฒ๊ฐ request๋ฅผ ์์ ํ๊ณ , ์ฒ๋ฆฌ๋ฅผ ์์ํ๋ค
- ์๋ฒ์ event loop์ queue์ ๋ชจ๋ ์์
์ด
time.sleep()
์ด ๋๋ ๋๊น์ง ๋๊ธฐํ๋ค- ์๋ฒ๋
time.sleep()
์ด I/O ์์ ์ด ์๋๋ผ ์๊ฐํ๋ฏ๋ก, ์๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค - ์๋ฒ๋ ๋๊ธฐํ๋ ๋์ ์๋ก์ด request๋ฅผ ์๋ฝํ์ง ์๋๋ค
- ์๋ฒ๋
- event loop์ queue์ ๋ชจ๋ ์์
์ด
service.get_pong()
์ด ์๋ฃ๋ ๋๊น์ง ๋๊ธฐํ๋ค- ์๋ฒ๋
service.get_pong()
์ด I/O ์์ ์ด ์๋๋ผ ์๊ฐํ๋ฏ๋ก, ์๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค - ์๋ฒ๋ ๋๊ธฐํ๋ ๋์ ์๋ก์ด request๋ฅผ ์๋ฝํ์ง ์๋๋ค
- ์๋ฒ๋
- Server returns the response
- After a response, server starts accepting new requests
GET /good-ping
- FastAPI server receives a request and starts handling it
- FastAPI sends the whole route
good_ping
to the threadpool, where a worker thread will run the function good_ping
์ด ์คํ๋๋ ๋์ event loop๋ queue์์ ๋ค์ ์์ ์ ์ ํํ๊ณ ์์ ํ๋ค (ex: ์ ์์ฒญ ์๋ฝ, db ํธ์ถ ๋ฑ)- Main thread์ ๊ด๊ณ์์ด worker thread๋
time.sleep
๊ฐ ์๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฌ๊ณ ,service.get_pong
์ ์๋ฃํ๋ค - Sync operation blocks only the side thread, not the main one
- Main thread์ ๊ด๊ณ์์ด worker thread๋
- When
good_ping
finishes its work, server returns a response to the client
GET /perfect-ping
- FastAPI server receives a request and starts handling it
- FastAPI awaits
asyncio.sleep(10)
- Event loop selects next tasks from the queue and works on them (e.g. accept new request, call db)
- When
asyncio.sleep(10)
is done, servers goes to the next lines and awaitsservice.async_get_pong
- Event loop selects next tasks from the queue and works on them (e.g. accept new request, call db)
- When
service.async_get_pong
is done, server returns a response to the client
Non-blocking awaitables|thread pool๋ก ์ ์ก๋๋ ์์ ์ I/O intensive task์ด์ด์ผ ํ๋ค (open file, db call, external api call ...)
- CPU-intensive task(heavy calculations, data processing, video transcoding)๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ๊ฒ์ ๊ฐ์น๊ฐ ์๋ค. CPU๊ฐ ์์ ์ ์๋ฃํ๊ธฐ ์ํด ์๋ํ๊ธฐ ๋๋ฌธ์ด๋ค. ๋ฐ๋ฉด I/O ์์ ์ ์ธ๋ถ ์์ ์ด๋ฏ๋ก ์๋ฒ๋ ํด๋น ์์ ์ด ์๋ฃ๋ ๋๊น์ง ์๋ฌด๊ฒ๋ ํ์ง ์์๋ ๋๋ฏ๋ก ๋ค์ ์์ ์ผ๋ก ๋์ด๊ฐ ์ ์๋ค
- GIL๋ก ์ธํด ํ ๋ฒ์ ํ๋์ thread๋ง ์๋ํ ์ ์๋ค
- CPU intensive task๋ฅผ ์ต์ ํํ๊ธฐ ์ํด์๋, multi-process๋ก ๊ฐ์ผํ๋ค
8. Custom base model from day 0¶
์ ์ด ๊ฐ๋ฅํ global base model์ ์ฌ์ฉํ๋ฉด app๋ด์ ๋ชจ๋ model์ ์ปค์คํฐ๋ง์ด์งํ ์ ์๋ค.
์์๋ก ํ์ค datetime ํ์์ ๊ฐ๊ฑฐ๋, base model ์ ๋ชจ๋ ํ์ํด๋์ค์ ๋ํด super method๋ฅผ ์ถ๊ฐํ ์ ์๋ค.
from datetime import datetime
from zoneinfo import ZoneInfo
import orjson
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, root_validator
def orjson_dumps(v, *, default):
# orjson.dumps returns bytes, to match standard json.dumps we need to decode
return orjson.dumps(v, default=default).decode()
def convert_datetime_to_gmt(dt: datetime) -> str:
if not dt.tzinfo:
dt = dt.replace(tzinfo=ZoneInfo("UTC"))
return dt.strftime("%Y-%m-%dT%H:%M:%S%z")
class ORJSONModel(BaseModel):
class Config:
json_loads = orjson.loads
json_dumps = orjson_dumps
json_encoders = {datetime: convert_datetime_to_gmt} # method for customer JSON encoding of datetime fields
@root_validator()
def set_null_microseconds(cls, data: dict) -> dict:
"""Drops microseconds in all the datetime field values."""
datetime_fields = {
k: v.replace(microsecond=0)
for k, v in data.items()
if isinstance(k, datetime)
}
return {**data, **datetime_fields}
def serializable_dict(self, **kwargs):
"""Return a dict which contains only serializable fields."""
default_dict = super().dict(**kwargs)
return jsonable_encoder(default_dict)
In the example above we have decided to make a global base model which:
- Uses orjson to serialize data
- Drops microseconds to 0 in all date formats
- Serializes all datetime fields to standard format with explicit timezone
9. Docs¶
API๊ฐ ๊ณต๊ฐ๋์ง ์์ ๊ฒฝ์ฐ์๋ Docs ๋ํ ๊ณต๊ฐํ์ง ์๋๋ค.
from fastapi import FastAPI
from starlette.config import Config
config = Config(".env") # parse .env file for env variables
ENVIRONMENT = config("ENVIRONMENT") # get current env name
SHOW_DOCS_ENVIRONMENT = ("local", "staging") # explicit list of allowed envs
app_configs = {"title": "My Cool API"}
if ENVIRONMENT not in SHOW_DOCS_ENVIRONMENT:
app_configs["openapi_url"] = None # set url for docs as null
app = FastAPI(**app_configs)
Help FastAPI to generate an easy-to-understand docs
- Set
response_model
,status_code
,description
, ... - If models and statuses vary, use
responses
route attribute to add docs for different responses
from fastapi import APIRouter, status
router = APIRouter()
@router.post(
"/endpoints",
response_model=DefaultResponseModel, # default response pydantic model
status_code=status.HTTP_201_CREATED, # default status code
description="Description of the well documented endpoint",
tags=["Endpoint Category"],
summary="Summary of the Endpoint",
responses={
status.HTTP_200_OK: {
"model": OkResponse, # custom pydantic model for 200 response
"description": "Ok Response",
},
status.HTTP_201_CREATED: {
"model": CreatedResponse, # custom pydantic model for 201 response
"description": "Creates something from user request ",
},
status.HTTP_202_ACCEPTED: {
"model": AcceptedResponse, # custom pydantic model for 202 response
"description": "Accepts request and handles it later",
},
},
)
async def documented_route():
pass
10. Use Pydantic's BaseSettings for configs¶
Pydantic์ ํ๊ฒฝ ๋ณ์๋ฅผ ๋ถ์ํ๊ณ , validators๋ก ์ฒ๋ฆฌํ ์ ์๋ ๋๊ตฌ๋ฅผ ์ ๊ณตํ๋ค.
from pydantic import AnyUrl, BaseSettings, PostgresDsn
class AppSettings(BaseSettings):
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
env_prefix = "app_"
DATABASE_URL: PostgresDsn
IS_GOOD_ENV: bool = True
ALLOWED_CORS_ORIGINS: set[AnyUrl]
11. SQLAlchemy: Set DB keys naming convention¶
๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ท์น์ ๋ฐ๋ผ ์ธ๋ฑ์ค ์ด๋ฆ์ ์ค์ ํ๋ ๊ฒ์ด SQLalchemy๋ณด๋ค ๋ ์ข๋ค
from sqlalchemy import MetaData
POSTGRES_INDEXES_NAMING_CONVENTION = {
"ix": "%(column_0_label)s_idx",
"uq": "%(table_name)s_%(column_0_name)s_key",
"ck": "%(table_name)s_%(constraint_name)s_check",
"fk": "%(table_name)s_%(column_0_name)s_fkey",
"pk": "%(table_name)s_pkey",
}
metadata = MetaData(naming_convention=POSTGRES_INDEXES_NAMING_CONVENTION)
12. Migrations. Alembic.¶
- Migration์ staticํ๊ณ revertableํ์ฌ์ผ ํ๋ค
- ๋ง์ฝ migration์ด ๋์ ์ผ๋ก ์์ฑ๋ ๋ฐ์ดํฐ์ ์์กดํ๋ ๊ฒฝ์ฐ, ๋์ ์ธ ๊ฒ์ด ๊ตฌ์กฐ๊ฐ ์๋๋ผ ๋ฐ์ดํฐ ์์ฒด์ธ์ง ํ์ธ
- descriptive names&slugs๋ฅผ ์ฌ์ฉํ์ฌ migration์ ์์ฑํ๋ค. Slug๊ฐ ํ์ํ๋ฉฐ ๋ณ๊ฒฝ ์ฌํญ์ ์ค๋ช ํด์ผ ํ๋ค
- ์๋ก์ด migration์ ๋ํด ์ฌ์ฉ์๊ฐ ์ฝ์ ์ ์๋ ํ์ผ ํ
ํ๋ฆฟ์ ์ค์ ํ๋ค
*date*_*slug*.py
(2022-08-24_post_content_idx.py)
13. Set DB naming convention¶
- lower_case_snake
- singular form (
post
,post_like
,user_playlist
) - group similar tables with module prefix (
payment_account
,payment_bill
,post
,post_like
) - stay consistent across tables, but concrete namings are ok
- use
profile_id
in all tables, but if some of them need only profiles that are creators, usecreator_id
- use
post_id
for all abstract tables likepost_like
,post_view
, but use concrete naming in relevant modules likecourse_id
inchapters.course_id
- use
_at
suffix for datetime_date
suffix for date
14. Set tests client async from day 0¶
DB์์ integration tests๋ฅผ ์์ฑํ๋ฉด, ํฅํ event loop error๊ฐ ๋ฐ์ํ ๊ฐ๋ฅ์ฑ์ด ๋๋ค. ๋ฐ๋ผ์ async test client๋ฅผ ์ฆ์ ์ค์ ํด์ผํ๋ค (async-asgi-testclient OR httpx)
import pytest
from async_asgi_testclient import TestClient
from src.main import app # inited FastAPI app
@pytest.fixture
async def client():
host, port = "127.0.0.1", "5555"
scope = {"client": (host, port)}
async with TestClient(
app, scope=scope, headers={"X-User-Fingerprint": "Test"}
) as client:
yield client
@pytest.mark.asyncio
async def test_create_post(client: TestClient):
resp = await client.post("/posts")
assert resp.status_code == 201
15. BackgroundTasks > asyncio.create_task¶
BackgroundTasks can effectively run both blocking and non-blocking I/O operations the same way FastAPI handles blocking routes
(sync
tasks are run in a threadpool, while async
tasks are awaited later)
- Don't lie to the worker and don't mark blocking I/O operations as
async
- Don't use it for heavy CPU intensive tasks
from fastapi import APIRouter, BackgroundTasks
from pydantic import UUID4
from src.notifications import service as notifications_service
router = APIRouter()
@router.post("/users/{user_id}/email")
async def send_user_email(worker: BackgroundTasks, user_id: UUID4):
"""Send email to user"""
worker.add_task(notifications_service.send_email, user_id) # send email after responding client
return {"status": "ok"}
16. Typing is important¶
17. Save files in chunks¶
Don't hope your clients will send small files
import aiofiles
from fastapi import UploadFile
DEFAULT_CHUNK_SIZE = 1024 * 1024 * 50 # 50 megabytes
async def save_video(video_file: UploadFile):
async with aiofiles.open("/file/path/name.mp4", "wb") as f:
while chunk := await video_file.read(DEFAULT_CHUNK_SIZE):
await f.write(chunk)
18. Be careful with dynamic pydantic fields¶
If you have a pydantic field that can accept a union of types, be sure the validator explicitly knows the difference between those types
from pydantic import BaseModel
class Article(BaseModel):
text: str | None
extra: str | None
class Video(BaseModel):
video_id: int
text: str | None
extra: str | None
class Post(BaseModel):
content: Article | Video
post = Post(content={"video_id": 1, "text": "text"})
print(type(post.content))
# OUTPUT: Article
# Article is very inclusive and all fields are optional, allowing any dict to become validSolutions:
-
Input์ด ํ์ฉ๋ ์ ํจํ ํ๋๋ง ํ์ฉํ๋์ง ํ์ธํ๊ณ , ์์์๋ ํ๋๊ฐ ์ ๊ณต๋ ๊ฒฝ์ฐ ์ค๋ฅ๋ฅผ ๋ฐ์์ํจ๋ค
-
Use Pydantic's Smart Union (>v1.9) if fields are simple
It's a good solution if the fields are simple likeint
orbool
, but it doesn't work for complex fields like classesWithout Smart Unionfrom pydantic import BaseModel class Post(BaseModel): field_1: bool | int field_2: int | str content: Article | Video p = Post(field_1=1, field_2="1", content={"video_id": 1}) print(p.field_1) # OUTPUT: True print(type(p.field_2)) # OUTPUT: int print(type(p.content)) # OUTPUT: Article
With Smart Unionclass Post(BaseModel): field_1: bool | int field_2: int | str content: Article | Video class Config: smart_union = True p = Post(field_1=1, field_2="1", content={"video_id": 1}) print(p.field_1) # OUTPUT: 1 print(type(p.field_2)) # OUTPUT: str print(type(p.content)) # OUTPUT: Article, because smart_union doesn't work for complex fields like classes
-
Fast Workaround Order field types properly: from the most strict ones to loose ones
19. SQL-first, Pydantic-second¶
- ์ผ๋ฐ์ ์ผ๋ก ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ CPython ๋ณด๋ค ํจ์ฌ ๋น ๋ฅด๊ณ ๊ฐ๋ํ๊ฒ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ค
- ๋ณต์กํ join๊ณผ ๊ฐ๋จํ ๋ฐ์ดํฐ ์กฐ์ ๋ชจ๋ SQL์ ์ฌ์ฉํ์ฌ ์ํํ๋ ๊ฒ์ด ์ข๋ค
- ์ค์ฒฉ๋ ๊ฐ์ฒด๊ฐ ์๋ ์๋ต์ ๋ํด DB์์ json์ ์ง๊ณํ๋ ๊ฒ์ด ์ข๋ค
# src.posts.service
from typing import Mapping
from pydantic import UUID4
from sqlalchemy import desc, func, select, text
from sqlalchemy.sql.functions import coalesce
from src.database import databse, posts, profiles, post_review, products
async def get_posts(
creator_id: UUID4, *, limit: int = 10, offset: int = 0
) -> list[Mapping]:
select_query = (
select(
(
posts.c.id,
posts.c.type,
posts.c.slug,
posts.c.title,
func.json_build_object(
text("'id', profiles.id"),
text("'first_name', profiles.first_name"),
text("'last_name', profiles.last_name"),
text("'username', profiles.username"),
).label("creator"),
)
)
.select_from(posts.join(profiles, posts.c.owner_id == profiles.c.id))
.where(posts.c.owner_id == creator_id)
.limit(limit)
.offset(offset)
.group_by(
posts.c.id,
posts.c.type,
posts.c.slug,
posts.c.title,
profiles.c.id,
profiles.c.first_name,
profiles.c.last_name,
profiles.c.username,
profiles.c.avatar,
)
.order_by(
desc(coalesce(posts.c.updated_at, posts.c.published_at, posts.c.created_at))
)
)
return await database.fetch_all(select_query)
# src.posts.schemas
import orjson
from enum import Enum
from pydantic import BaseModel, UUID4, validator
class PostType(str, Enum):
ARTICLE = "ARTICLE"
COURSE = "COURSE"
class Creator(BaseModel):
id: UUID4
first_name: str
last_name: str
username: str
class Post(BaseModel):
id: UUID4
type: PostType
slug: str
title: str
creator: Creator
@validator("creator", pre=True) # before default validation
def parse_json(cls, creator: str | dict | Creator) -> dict | Creator:
if isinstance(creator, str): # i.e. json
return orjson.loads(creator)
return creator
# src.posts.router
from fastapi import APIRouter, Depends
router = APIRouter()
@router.get("/creators/{creator_id}/posts", response_model=list[Post])
async def get_creator_posts(creator: Mapping = Depends(valid_creator_id)):
posts = await service.get_posts(creator["id"])
return posts
If an aggregated data form DB is a simple JSON, then take a look at Pydantic'sย Jsonfield type, which will load raw JSON first.
from pydantic import BaseModel, Json
class A(BaseModel):
numbers: Json[list[int]]
dicts: Json[dict[str, int]]
valid_a = A(numbers="[1, 2, 3]", dicts='{"key": 1000}') # becomes A(numbers=[1,2,3], dicts={"key": 1000})
invalid_a = A(numbers='["a", "b", "c"]', dicts='{"key": "str instead of int"}') # raises ValueError
20. Validate hosts, if users can send publicly available URLs¶
์๋ฅผ ๋ค์ด ๋ค์๊ณผ ๊ฐ์ ํน์ endpoint๊ฐ ์๋ค:
- ์ฌ์ฉ์๋ก๋ถํฐ ๋ฏธ๋์ด ํ์ผ ์์
- ํด๋น ํ์ผ์ ๋ํด ๊ณ ์ ํ url ์์ฑ
- ์ฌ์ฉ์์๊ฒ url ๋ฐํ
PUT /profiles/me
POST/posts
๊ฐ์ ๋ค๋ฅธ endpoint์์ ์ฌ์ฉ- ์ด๋ฌํ endpoint๋ whitelisted host์ ํ์ผ๋ง ํ์ฉ
- ์ด ์ด๋ฆ๊ณผ ์ผ์นํ๋ url์ ์ฌ์ฉํ์ฌ ํ์ผ์ AWS์ ์ ๋ก๋
- url ํธ์คํธ๋ฅผ whitelist์ ์ถ๊ฐํ์ง ์์ผ๋ฉด, ์๋ชป๋ ์ฌ์ฉ์๊ฐ ์ํํ ๋งํฌ๋ฅผ ์ ๋ก๋ํ ์ ์์
from pydantic import AnyUrl, BaseModel
ALLOWED_MEDIA_URLS = {"mysite.com", "mysite.org"}
class CompanyMediaUrl(AnyUrl):
@classmethod
def validate_host(cls, parts: dict) -> tuple[str, str, str, bool]:
"""Extend pydantic's AnyUrl validation to whitelist URL hosts."""
host, tld, host_type, rebuild = super().validate_host(parts)
if host not in ALLOWED_MEDIA_URLS:
raise ValueError(
"Forbidden host url. Upload files only to internal services."
)
return host, tld, host_type, rebuild
class Profile(BaseModel):
avatar_url: CompanyMediaUrl # only whitelisted urls for avatar
21. Raise a ValueError in custom pydantic validators, if schema directly faces the client¶
It wil return a nice detailed response to users
# src.profiles.schemas
from pydantic import BaseModel, validator
class ProfileCreate(BaseModel):
username: str
@validator("username")
def validate_bad_words(cls, username: str):
if username == "me":
raise ValueError("bad username, choose another")
return username
# src.profiles.routes
from fastapi import APIRouter
router = APIRouter()
@router.post("/profiles")
async def get_creator_posts(profile_data: ProfileCreate):
pass
22. Don't forget FastAPI converts Response Pydantic Object to Dict then to an instance of ResponseModel then to Dict then to JSON¶
from fastapi import FastAPI
from pydantic import BaseModel, root_validator
app = FastAPI()
class ProfileResponse(BaseModel):
@root_validator
def debug_usage(cls, data: dict):
print("created pydantic model")
return data
def dict(self, *args, **kwargs):
print("called dict")
return super().dict(*args, **kwargs)
@app.get("/", response_model=ProfileResponse)
async def root():
return ProfileResponse()
23. If you must use sync SDK, then run it in a thread pool.¶
- If you must use a library to interact with external services, and it's not
async
, then make the HTTP calls in an external worker thread - For a simple example, we could use our well-known
run_in_threadpool
from starlette
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
from my_sync_library import SyncAPIClient
app = FastAPI()
@app.get("/")
async def call_my_sync_library():
my_data = await service.get_my_data()
client = SyncAPIClient()
await run_in_threadpool(client.make_request, data=my_data)
24. Use linters (black, isort, autoflake)¶
- With linters, you can forget about formatting the code and focus on writing the business logic
- Black is the uncompromising code formatter that eliminates so many small decisions you have to make during development. Other linters help you write cleaner code and follow the PEP8
- It's a popular good practice to use pre-commit hooks, but just using the script was ok for us
#!/bin/sh -e
set -x
autoflake --remove-all-unused-imports --recursive --remove-unused-variables --in-place src tests --exclude=__init__.py
isort src tests --profile black
black src tests
Quote
- Best-Practice Github zhanymkanov
- Pydantic Pydantic-Docs