【Python教程】FastAPI的高级功能包括:同步与异步路由函数、依赖注入以及后台任务处理。

零 Python教程评论59字数 8280阅读27分36秒阅读模式

引言


在现代Web开发中,高效的性能和简洁的代码结构至关重要。FastAPI作为一个现代且快速的Web框架,以其对异步编程的原生支持和直观的依赖注入系统,迅速成为开发者的首选。本文将深入探讨FastAPI的几个核心概念:同步与异步函数的区别、Depends依赖关系,以及后台任务的处理。

同步异步路由函数

在fastapi中路由函数可以是同步的也可以是异步的,跟随如下例子一探究竟文章源自灵鲨社区-https://www.0s52.com/bcjc/pythonjc/15232.html

python

复制代码
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @File: def_async_def.py
# @Desc: { 同步异步路由函数 }
# @Date: 2024/05/21 11:06
import asyncio
import time

import uvicorn
from fastapi import FastAPI
import requests
import httpx

app = FastAPI(description="同步异步路由函数")


async def async_get(url):
    async with httpx.AsyncClient() as client:
        resp = await client.get(url)
    return resp


@app.get("/ping")
async def ping():
    return "pong"


@app.get("/async_route_func_demo")
async def async_route_func():

    resp = await async_get("https://www.baidu.com")
    # resp = requests.get("https://www.baidu.com")
    print("resp", resp)
    return "async_route_func_demo"


@app.get("/sync_route_func_demo")
def sync_route_func():
    resp = requests.get("https://www.baidu.com")
    print("resp", resp)
    return "sync_route_func_demo"


def main():
    uvicorn.run(app)


if __name__ == '__main__':
    main()
  • 同步路由函数中使用 requests 请求 ping 接口
  • 异步路由函数则是使用 httpx 请求 ping 接口

具体有什么区别呢文章源自灵鲨社区-https://www.0s52.com/bcjc/pythonjc/15232.html

  • 同步函数将会使用线程进行处理
  • 异步函数则是以协程方式在事件循环中处理

FastAPI 框架属于异步(asgi)框架,所以能用异步尽量用异步,没有对应的异步库可以考虑用线程池执行同步io操作,为什么不推荐直接使用同步路由函数借助fastapi内置策略使用线程处理,因为在实际处理时,异步兼容同步,而同步不兼容异步,如果数据库采用的是异步操作,则大部分业务逻辑都是异步函数,为了解决几个同步io,这样在最顶层使用同步路由函数则极其不方便,还是采用异步路由函数,然后在同步io操作中使用线程池进行封装,例如读写文件等。文章源自灵鲨社区-https://www.0s52.com/bcjc/pythonjc/15232.html

这里还有一个非常重要的一点 不要在异步路由函数中做同步io处理,不然性能将大打折扣。这里通过同异步发送http请求来模拟异步路由使用同步io。文章源自灵鲨社区-https://www.0s52.com/bcjc/pythonjc/15232.html

python

复制代码
import uvicorn
from fastapi import FastAPI
import requests
import httpx
import aiohttp

app = FastAPI(description="同步异步路由函数")

aio_client = httpx.AsyncClient()
aio_session: aiohttp.ClientSession = None
req_session = requests.Session()


@app.on_event("startup")
async def startup_event():
    # 应用启动事件
    global aio_session
    aio_session = aiohttp.ClientSession()


async def async_get(url):
    # async with aio_session.get(url) as resp:
    #     return resp

    resp = await aio_client.get(url)
    return resp


@app.get("/ping")
async def ping():
    return "pong"


@app.get("/async_route_func_demo")
async def async_route_func():
    url = "https://juejin.cn/"
    resp = await async_get(url)
    # print("resp", resp)
    return "async_route_func_demo"


@app.get("/async_route_use_sync_io_demo")
async def async_route_func():
    url = "https://juejin.cn/"
    resp = req_session.get(url)
    # print("resp", resp)
    return "async_route_func_demo"


@app.get("/sync_route_func_demo")
def sync_route_func():
    url = "https://juejin.cn/"
    resp = req_session.get(url)
    # print("resp", resp)
    return "sync_route_func_demo"


def main():
    uvicorn.run(app, log_level="warning")


if __name__ == '__main__':
    main()

最后使用wrk来压测下这种情况文章源自灵鲨社区-https://www.0s52.com/bcjc/pythonjc/15232.html

python

复制代码
# 20线程模拟200并发请求 持续10s
wrk -t20 -d10s -c200 http://127.0.0.1:8000/async_route_func_demo

文章源自灵鲨社区-https://www.0s52.com/bcjc/pythonjc/15232.html

通过wrk的压测数据可以看出,异步路由函数中使用同步io(requests),并发性能一落千丈,简直惨不忍睹。文章源自灵鲨社区-https://www.0s52.com/bcjc/pythonjc/15232.html

  • 异步路由(aiohttp):10s 总共处理了 7027 个请求,qps:697.17
  • 异步路由(requests):10s 总共处理了 138 个请求,qps:13.66
  • 同步路由(requests):10s 总共处理了 5135 个请求,qps:510.23 (隐式线程池处理)

这是一个非常严重的问题一个同步io会堵塞住整个系统,导致其他接口也访问不了。所以再次说明一遍 不要在异步路由函数中做同步io处理,尤其这些异步web框架 fastapi、tornado、sanic 等很容易会因为技术选型、程序员编码问题导致性能大打折扣。异步框架推荐用异步库,没有支持的异步库使用线程池处理同步IO这是最佳的实践。文章源自灵鲨社区-https://www.0s52.com/bcjc/pythonjc/15232.html

推荐我另外一篇文章: 同步、异步无障碍:Python异步装饰器指南 可以处理没有异步库的情况。文章源自灵鲨社区-https://www.0s52.com/bcjc/pythonjc/15232.html

Depends依赖项

在FastAPI中,Depends 用于处理依赖注入(Dependency Injection)。它的主要作用是让你能够将一些共享的逻辑、组件或者资源(如数据库连接、配置、服务等)注入到路由函数中,而不需要显式地在每个函数中传递这些依赖项。文章源自灵鲨社区-https://www.0s52.com/bcjc/pythonjc/15232.html

基本使用

公共分页参数处理

python

复制代码
import uvicorn
from fastapi import FastAPI, HTTPException, Query
from typing import Union

from fastapi import Depends, FastAPI
from pydantic import BaseModel, Field

app = FastAPI(description="depends 使用")


class PageModel(BaseModel):
    offset: int = Field(0, description="偏移量")
    limit: int = Field(10, description="每页大小")


def page_parameters(
        curr_page: int = 1, page_size: int = 10
):
    offset = (curr_page - 1) * page_size
    if offset < 0 or page_size > 1000:
        raise HTTPException(status_code=400, detail="Limit must be less than 100")

    return PageModel(offset=offset, limit=page_size)


@app.get("/v1/items/")
async def read_items(curr_page: int = Query(1), page_size: int = Query(10)):
    page_model = page_parameters(curr_page, page_size)
    items = ["item1", "item2", "item3"][page_model.offset:page_model.offset + page_model.limit]
    return {"items": items}


@app.get("/v1/users/")
async def read_users(curr_page: int = Query(1), page_size: int = Query(10)):
    page_model = page_parameters(curr_page, page_size)
    users = ["user1", "user2", "user3"][page_model.offset:page_model.offset + page_model.limit]
    return users


@app.get("/v2/items/")
async def read_items(page_model: PageModel = Depends(page_parameters)):
    items = ["item1", "item2", "item3"][page_model.offset:page_model.offset + page_model.limit]
    return {"items": items}


@app.get("/v2/users/")
async def read_users(page_model: PageModel = Depends(page_parameters)):
    users = ["user1", "user2", "user3"][page_model.offset:page_model.offset + page_model.limit]
    return users


def main():
    uvicorn.run(app)


if __name__ == '__main__':
    main()

在这个例子可以看出

  1. 分页参数处理和验证逻辑 被提取到 page_parameters 依赖函数中。
  2. 路由函数 /v2/items 和 /v2/users 通过 Depends 分别注入这些依赖函数的返回值。

不用Depends,在路由函数需要定义分页参数然后传递给 page_parameters 函数处理,而使用Depends在路由定义上简化了然后间接调用处理。整体复用代码逻辑,更简洁了。

还有一些用途示例,大家可以查阅官网:fastapi.tiangolo.com/zh/tutorial…

不知道为啥我对官网的例子没有兴趣,我一般都不想这样代码复用,共享客户端等操作。因为我不想把逻辑代码放到路由函数层处理,而是单独抽离一个逻辑层(service)处理,路由层只做如下操作

  • 请求参数校验(pydantic的BaseModel)
  • 参数透传调用逻辑层处理
  • 响应反参

路径、查询字符串参数转成pydantic的BaseModel

通常我喜欢用Depends把路径参数或者查询字符串参数封装到pydantic的BaseModel 中

python

复制代码
class UserQueryIn(BaseModel):
    user_id: int = Path(description="用户ID")
    name: Optional[str] = Query(default=None, description="姓名")
    age: Optional[int] = Query(default=None, description="年龄")


@app.get("/users/{user_id}/path_query_params2", summary="路径参数+查询字符串参数BaseModel的demo")
def with_path_query_params(req_model: UserQueryIn = Depends(UserQueryIn)):
    # 业务逻辑处理
    # logic_func(req_model)
    return req_model.model_dump()

如果直接用BaseModel 的话 fastapi 会认为是body参数,这明显不合理是错误,因此需要通过 fastapi的Depends函数来解决。

配合 contextvars 存储 Request、BackgroundTasks

这里通过全局依赖存储请求对象与后台任务处理器。到后面逻辑层需要用时可以直接通过上下文取出来。不用一层一层传递。

python

复制代码
import time
from contextvars import ContextVar

import uvicorn
from fastapi import FastAPI, HTTPException, Query, Path, Request, BackgroundTasks
from typing import Union, Optional

from fastapi import Depends, FastAPI
from pydantic import BaseModel, Field

REQ_CTX: ContextVar[Union[Request, None]] = ContextVar("req_ctx", default=None)
BG_TASK_EXECUTOR: ContextVar[Union[BackgroundTasks, None]] = ContextVar("bg_task_executor", default=None)


async def set_req_and_bg_tasks(req: Request, bg_tasks: BackgroundTasks):
    """全局依赖把请求对象与后台任务处理器存储到上下文变量中"""
REQ_CTX.set(req)
    BG_TASK_EXECUTOR.set(bg_tasks)
    return req, bg_tasks


app = FastAPI(description="depends 使用", dependencies=[Depends(set_req_and_bg_tasks)])


async def req_bg_tasks_logic(name: str, sleep_seconds: int = 5):
    req = REQ_CTX.get()
    print(req.url)

    ret = f"name {name} sleep {sleep_seconds}s"

    def bg_task_demo():
        time.sleep(sleep_seconds)
        print(ret)

    # 模拟添加后台任务
    bg_task_executor = BG_TASK_EXECUTOR.get()
    bg_task_executor.add_task(bg_task_demo)
    return ret


@app.get("/req_bg_tasks_demo")
async def req_bg_tasks_demo(name: str, sleep_seconds: int = 5):
    ret = await req_bg_tasks_logic(name, sleep_seconds)
    return ret

测试结果如下

python

复制代码
http://127.0.0.1:8000/req_bg_tasks_demo?name=hui&sleep_seconds=5
INFO:     127.0.0.1:58067 - "GET /req_bg_tasks_demo?name=hui&sleep_seconds=5 HTTP/1.1" 200 OK
bg_task_demo name hui sleep 5s

后台任务

FastAPI 内置了一个后台任务处理器,可以用于处理一些小型的后台任务。不过,不能直接实例化 BackgroundTasks 对象来使用,而是需要通过路由函数中的 BackgroundTasks 对象来添加后台任务。类似于 Request 对象,BackgroundTasks 对象是隐式地放在路由函数中的,只要在参数中声明对应的类型,它就会被自动注入。上面提到的全局依赖也是通过这种方式来进行存储的。

python

复制代码
@app.get("/bg_tasks_demo")
async def bg_task_demo(bg_tasks: BackgroundTasks):
    def sync_bg_task_test(name, sleep_seconds: int = 3):
        print("sync_bg_task_test running")
        time.sleep(sleep_seconds)
        print(f"sync_bg_task_test {name} sleep {sleep_seconds}s end")

    async def async_bg_task_test(name, sleep_seconds: int = 3):
        print("async_bg_task_test running")
        await asyncio.sleep(sleep_seconds)
        print(f"async_bg_task_test {name} sleep {sleep_seconds}s end")

    # 分别添加同步、异步io的后台任务
    bg_tasks.add_task(sync_bg_task_test, name="hui-sync", sleep_seconds=1)
    bg_tasks.add_task(async_bg_task_test, name="hui-async", sleep_seconds=2)

    return "bg_task_demo"

测试结果如下

python

复制代码
INFO:     127.0.0.1:58691 - "GET /bg_tasks_demo HTTP/1.1" 200 OK
sync_bg_task_test running
sync_bg_task_test hui-sync sleep 1s end
async_bg_task_test running
async_bg_task_test hui-async sleep 2s end

在接口请求响应完成,后台顺序调度后台任务,兼容同步、异步io,同步io是通过线程池处理,不会堵塞住系统。

这里先展示部分 BackgroundTasks 的源码,后面单独再分析研究下fastapi的一些经典源码。

python

复制代码
class BackgroundTasks(BackgroundTask):
    def __init__(self, tasks: typing.Optional[typing.Sequence[BackgroundTask]] = None):
        self.tasks = list(tasks) if tasks else []

    def add_task(
        self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
    ) -> None:
        task = BackgroundTask(func, *args, **kwargs)
        self.tasks.append(task)

    async def __call__(self) -> None:
        for task in self.tasks:
            await task()

源代码

Github:FastAPI 实战手册

从FastAPI的安装、请求处理、响应返回等基础知识开始,逐步介绍中间件、数据库操作、身份认证等核心组件的使用,再到实战项目的开发和源码分析,目的是让读者逐步深入了解并全面掌握这个框架。

零
  • 转载请务必保留本文链接:https://www.0s52.com/bcjc/pythonjc/15232.html
    本社区资源仅供用于学习和交流,请勿用于商业用途
    未经允许不得进行转载/复制/分享

发表评论