# python_web框架-FastAPI
**Repository Path**: parrotli/python-web-framework-fastapi
## Basic Information
- **Project Name**: python_web框架-FastAPI
- **Description**: 跟着b站黑马程序员2025-12-10的课程(https://www.bilibili.com/video/BV1zV2QBtE39)学习python的web框架:FastAPI。
FastAPI框架的官方中文文档:https://fastapi.org.cn/#opinions
- **Primary Language**: Python
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 1
- **Created**: 2026-01-21
- **Last Updated**: 2026-01-21
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# python_web框架-FastAPI
#### 介绍
跟着b站黑马程序员2025-12-10的课程(https://www.bilibili.com/video/BV1zV2QBtE39)学习python的web框架:FastAPI。
FastAPI框架的官方中文文档:https://fastapi.org.cn/#opinions
加油干!
#### 软件架构
软件架构说明
#### 安装教程
1. 请根据具体情况修改下面的数据库配置。
2. ```python
# 1. 创建异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8"
```
3. 自行在book表添加数据
```sql
insert into book(id, bookname, author, price, publisher, create_time, update_time) values
('1', '红楼梦', '曹雪芹', '22', '黑马', '2026-01-12 22:11:00', '2026-01-12 22:11:00'),
('2', '西楼梦', '吴承恩', '45', '金乌', '2026-01-13 20:11:00', '2026-01-13 20:11:00'),
('3', '西门子', '吴晓恩', '34', '京东', '2026-01-13 20:30:00', '2026-01-13 20:30:00'),
('4', '红楼梦1', '曹雪', '16', '黑马', '2026-01-12 22:11:00', '2026-01-12 22:11:00');
```
1. xxxx
#### 使用说明
1. xxxx
2. xxxx
3. xxxx
#### 参与贡献
1. Fork 本仓库
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request
#### 特技
1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2. /assets 文件夹存储README.md中的图片
7. 大模型工程师核心能力:
- Python基础:语言基础
- 服务化能力:通过Python Web框架将模型从本地代码部署为在线可访问、可扩展且稳定运行的服务
- AI大模型开发能力:模型训练、构建、微调、优化等
一个Web项目 + 一个AI项目:掌握独立搭建Web后端项目的能力、掌握AI模型的部署与服务化能力、
形成清晰的编程思维与工程化意识、高效运用AI工具辅助开发能力
#### 02_fastapi框架简介
1. 异步性能高
2. 下载fastapi pip install "fastapi[standard]"
3. 类型提示与验证 Pydantic类型提示与验证,减少手动校验代码
4. 可交互式文档 自动生成可交互式文档,浏览器中直接调用和测试API 127.0.0.1:8000/docs
5. 要在main.py里面运行fastapi才行,运行命令 fastapi dev main.py



#### 03_第一个FastAPI程序
1. 创建虚拟环境:.venv 隔离项目运行环境,避免依赖冲突,保持全局环境的干净和稳定
2. 启动fastapi应用命令: uvicorn main:app --reload
3. uvicorn - ASGI 高性能服务器,用于运行 FastAPI 应用

#### 04_路由
```python
# 路由是URL地址和处理函数之间的映射关系
```

#### 05_参数简介和路径参数
路径参数:
- 位置:URL路径的一部分 /book/{id}
- 作用:指向唯一的、特定的资源
- 方法:GET
查询参数:
- 位置:URL?之后k1=v1&k2=v2
- 作用:对资源集合进行过滤、排序、分页等操作
- 方法:GET
请求体:
- 位置:HTTP请求的消息体{body}中
- 作用:创建、更新资源、携带大量数据,如JSON数据
- 方法:POST、PUT
#### 06_路径参数__Path类型注解
```python
from fastapi import FastAPI,Path,Query
```
```python
# 路径参数 路径参数传到处理函数的形参中 id: int 参数类型注解
@app.get("/book/{id}") #...表示必填,gt大于,lt小于,description描述
async def get_book(id: int = Path(..., gt=0, lte=101, descripion ='书籍id,取值范围【1-101】')):
return {
"id": id,
"title": f"这是第{id}本书"
}
# 需求:查找书籍的作者,路径参数 name,长度范围2-10
@app.get("/author/{name}")
async def get_name(name: str=Path(..., min_length=2, max_length=10)):
return {"msg": f"这是{name}作者的信息"}
```
路径参数是URL路径的一部分 /book/{book_id}
路径参数添加类型注解:Python原生注解和Path注解
#### 07_查询参数__Query类型注解
```python
from fastapi import FastAPI,Path,Query
```

- 查询参数出现在URL?之后,k=v&k=v
- 查询参数添加类型注解:Python原生注解和Query注解
```python
# 需求 查询新闻 -> 分页,skip:跳过的记录数, limit:返回的记录数 10
@app.get("/news/list/news_list")
async def get_news_list(
skip: int = Query(0, description="跳过的记录数", lt=100),
limit: int = Query(10, description="返回的记录数")): # 默认就是查询参数 URL?k=v&k=v
return {
"skip": skip,
"limit": limit
}
```
#### 08_请求体参数
```python
from pydantic import BaseModel,Field
```


```python
# ---------------------------请求体参数---------------------------------
class User(BaseModel):
username: str
password: str
# 需求:注册用户
@app.post("/user/register")
async def register(user: User):
return user
class Book(BaseModel):
name: str # 书名
author: str # 作者
publisher: str # 出版社
sell_price: float # 销售价格
# 需求:新增图书
@app.post("/book/create")
async def create_book(book: Book):
return book
```
#### 09_请求体参数__Field类型注解

```python
class User(BaseModel):
username: str = Field(default="张三", min_length=2, max_length=10, description="用户名,长度要求2-10个字符")
password: str = Field(min_length=3, max_length=20)
# 需求:注册用户
@app.post("/user/register")
async def register(user: User):
return user
# Field(default="") 没有default默认值。...表示必填
class Book(BaseModel):
name: str = Field(..., min_length=2, max_length=20) # 书名
author: str = Field(..., min_length=2, max_length=10) # 作者
publisher: str = Field(default="黑马程序员") # 出版社
sell_price: float = Field(...,gt=0) # 销售价格
# 需求:新增图书
@app.post("/book/create")
async def create_book(book: Book):
return book
```

#### 10_响应类型-JSON格式
默认情况下,**FastAPI会自动将路径操作函数返回的Python对象(字典、列表、Pydantic模型等),经由jsonable_encoder转换为JSON兼容格式,并包装为JSONResponse返回**。这省去了手动序列化的步骤,让开发者能更专注于业务逻辑。
如果需要返回非JSON数据(如HTML、文件流),FastAPI提供了丰富的响应类型来返回不同数据。

#### 11_响应类型-HTML格式:HTMLResponse
```python
from fastapi.responses import HTMLResponse
```

装饰器中指定响应类
```py
# 在装饰器中添加response_class参数,指定响应类型:HTMLResponse
@app.get("/html", response_class=HTMLResponse)
async def get_html():
return """
这是HTML格式的响应
"""
```
#### 12_响应类型-文件格式:FileResponse

```python
# 接口:返回文件
@app.get("/file")
async def get_file():
path = "./assets/img.png"
return FileResponse(path)
```
#### 13_自定义响应数据格式: response_model
**response_model**是路径操作装饰器(如@app.get或@app.post)的关键参数,它通过一个**Pydantic模型来严格定义和约束API端点的输出格式**。这一机制在提供自动数据验证和序列化的同时,更是保障数据安全性的第一道防线。

```python
# 需求:新闻接口 -> 响应数据格式 id、title、content。跟请求体参数一样,少了Field类型注解
class News(BaseModel):
id: int
title: str
content: str
# 路径参数
@app.get("/news/{id}", response_model=News)
async def get_new(id: int):
return {
"id": id,
"title": f"这是第{id}条新闻",
"content": f"这是第{id}条新闻的内容"
}
```
#### 14_异常响应处理
```python
from fastapi import FastAPI,Path,Query,HTTPException
```

```py
# ------------------异常处理 raise HTTPException(status_code=404, detail="报错了,xxx") ----------------------------
# 需求:根据id查询新闻 -> 1-6
@app.get("/news/httpexception/{id}")
async def get_news(id: int):
id_list = [1, 2, 3, 4, 5, 6]
if id not in id_list:
raise HTTPException(status_code=404, detail="你查找的新闻不存在")
return {"id": id, "title": f"这是第{id}条新闻"}
```
#### 15_中间件middleware
使用中间件为每个请求前后添加统一的处理逻辑。例如:日志记录、身份认证、跨域处理、响应头处理、性能监控。


中间件Middleware是一个在每次请求进入FastAPI应用时都会被执行的函数。它在请求到达实际的路径操作(路由处理函数)之前运行,并且在响应返回给客户端之前再运行一次。

语法:
```python
@app.middleware("http")
async def middleware(request, call_next):
print("中间件开始处理 -- start")
response = await call_next(request)
print("中间件处理完成 -- end")
return response
```

```python
# async 代表异步函数。真有异步代码await要带上。
@app.middleware("http")
async def middleware(request, call_next): # request是请求,call_next是传递请求的函数名
print("中间件1开始处理 -- start")
response = await call_next(request) # 这里就有异步代码
print("中间件1处理完成 -- end")
return response
# async 代表异步函数。真有异步代码考艾要搭上
@app.middleware("http")
async def middleware(request, call_next):
print("中间件2开始处理 -- start")
response = await call_next(request)
print("中间件2处理完成 -- end")
return response
# 多个中间件的输出顺序:类似于栈的先进后出
# 中间件2开始处理 -- start
# 中间件1开始处理 -- start
# 中间件1处理完成 -- end
# 中间件2处理完成 -- end
```

#### 16_依赖注入
1. 使用**依赖注入系统**来共享通用逻辑,减少代码重复。依赖项(可重用的组件)来实现。


2. 依赖注入应用场景:处理请求参数、共享业务逻辑、共享数据库连接、安全和认证。

3. 语法:先创建依赖项,然后声明依赖项。Depends(依赖项)

```py
# 需求:分页参数逻辑共用:新闻列表和用户列表 16-1.定义依赖项(请求参数共用)
async def common_parameters(
skip: int = Query(0, ge=0), # skip跳过 >= 0
limit: int = Query(10, le=60) # page_size默认值10,范围0-60
):
return {"skip": skip, "limit": limit}
# 查询参数 URL?k=v&k=v ; 16-2.声明依赖项commons=Depends(common_parameters)
@app.get("/news_16/news_list")
async def get_news_list(commons=Depends(common_parameters)):
return commons
# 查询参数 URL?k=v&k=v ;声明依赖项commons=Depends(common_parameters)
@app.get("user_16/user_list")
async def get_user_list(commons=Depends(common_parameters)):
return commons
```
#### 17_ORM简介和安装

学ORM框架:SQLAlchemy


#### 18_ORM建表


```python
# ------------------------- 18_ORM安装和建表 -----------------------
# pip install sqlalchemy
# pip install aiomysql
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from datetime import datetime
from sqlalchemy import func, DateTime, String, Float, select
# 1. 创建异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8"
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo=True, # 可选,输出SQL日志
pool_size=10, # 设置连接池活跃的连接数
max_overflow=20, # 允许额外的连接数
)
# 2. 定义模型类: 基类 + 表对应的模型类
# 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社
class Base(DeclarativeBase):
create_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,comment="创建时间")
update_time: Mapped[datetime] = mapped_column(DateTime,insert_default=func.now, default=func.now, onupdate=func.now, comment="更新时间",)
class Book(Base):
__tablename__ = "book"
id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id")
bookname: Mapped[str] = mapped_column(String(255), comment="书名")
author: Mapped[str] = mapped_column(String(255), comment="作者")
price: Mapped[float] = mapped_column(Float, comment="价格")
publisher: Mapped[str] = mapped_column(String(255),comment="出版社")
class User(Base):
__tablename__ = "user"
id: Mapped[int] = mapped_column(primary_key=True, comment="用户id")
username: Mapped[str] = mapped_column(String(255), comment="用户名")
password: Mapped[str] = mapped_column(String(255), comment="密码")
# 3. 建表:定义函数建表 FastAPI启动时候调用建表的函数
async def create_tables():
# 获取异步引擎,创建事务 - 建表 as 别名。。。async with 异步上下文管理器。
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all) # 继承了Base模型类的元数据(表),都会被创建
# app启动时候,创建表
@app.on_event("startup")
async def startup_event():
await create_tables()
```
#### 19_ORM__在路由中使用ORM
核心:创建依赖项,使用Depends注入到路由处理函数。
创建获取数据库会话依赖项,

```python
#-------------------------- 19_在路由中使用ORM -----------------------
# 需求:查询功能的接口,查询图书 。依赖注入:创建依赖项获取数据库会话 + Depends 注入路由处理函数
AsyncSessionLocal = async_sessionmaker(
bind=async_engine, # 绑定数据库引擎
class_=AsyncSession, # 指定会话类
expire_on_commit=False # 提交后会话不过期,不会重新查询数据库
)
# 依赖项:获取数据库会话
async def get_database():
async with AsyncSessionLocal() as session:
try:
yield session # 返回数据库会话给路由处理函数
await session.commit() # 提交事务
except Exception:
await session.rollback() # 有异常,回滚
raise
finally:
await session.close() # 关闭会话
@app.get("/book/books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
# 看下依赖有没有注入成功。
# 查询
result = await db.execute(select(Book))
books = result.scalars().all()
return books
```
上述代码与18是一块的。
#### 20_ORM操作数据-查询数据
```py
await db.execute(select(模型类))`
```

```python
# -----------------------20_ORM查询数据------------------------------
@app.get("/book/books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
# # 查询书籍列表 看下依赖有没有注入成功。注入数据库会话
# result = await db.execute(select(Book)) # 查询 返回一个ORM对象
# books = result.scalars().all() # 获取所有书籍
# book = result.scalars().first() # 获取第一条数据
book = await db.get(Book, 2) # 根据id主键获取单条数据
return book
```
```python
# 需求:路径参数:数据ID
@app.get("/book/book_detail/{book_id}")
async def get_book_detail(book_id: int, db: AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.id == book_id))
book_detail = result.scalar_one_or_none()
return book_detail
# 需求:条件查询 价格大于等于30
@app.get("/book/search_book")
async def get_search_book(db: AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.price >= 30))
books = result.scalars().all()
return books
```
#### 21_ORM操作数据--条件查询--比较判断
```py
select(Book).where(条件,条件2,....)
```

例如:

#### 22_ORM查询数据 条件查询-模糊查询&与非&包含

```python
# ---------------22_ORM查询数据 条件查询-模糊查询&与非&包含----------------
# 需求 :作者以 曹开头 %
@app.get("/book/search_book_by_author_like")
async def search_book_by_author_like(db: AsyncSession = Depends(get_database)):
# where(Book.column.like("value%")) 1.模糊查询
# result = await db.execute(select(Book).where(Book.author.like("曹%")))
# result = await db.execute(select(Book).where(Book.author.like("曹_")))
# 2. where( (条件1) & (条件2) ) 与 where( (条件1) | (条件2) ) 或 where( ~(条件) ) 非
# result = await db.execute(select(Book).where(~(Book.author.like("曹%")) | (Book.price >= 20)))
# result = await db.execute(select(Book).where((Book.author.like("曹%")) & (Book.price >= 20)))
# 3. 需求:书籍id列表,数据库里面的id如果在书籍id列表里面,就返回能查询得到的
id_list = [1,2,4,999]
result = await db.execute(select(Book).where(Book.id.in_(id_list)))
books = result.scalars().all()
return books
```
#### 23,ORM操作数据 - 聚合查询

```python
# -------------------23,ORM查询数据,聚合查询 min,max,avg,--------------------------
@app.get("/book/count")
async def get_count(db: AsyncSession = Depends(get_database)):
# result= await db.execute(select(func.max(Book.price)))
# result= await db.execute(select(func.min(Book.price)))
result= await db.execute(select(func.avg(Book.price)))
# 计算多少行
# result = await db.execute(select(func.count(Book.id)))
return result.scalar() # 用来提取一个数据 = > 标量值
```
#### 24,ORM操作数据,分页查询
```python
# --------------------21,ORM数据库操作,ORM分页炒作---------------------
@app.get("/book/get_book_page_list")
async def get_book_page_list(
page: int = 2,
page_size: int = 2,
db: AsyncSession = Depends(get_database)
):
# offset = (page - 1) * page_size 跳过的记录数;limit 每页的记录数
offset = page_size * (page - 1)
stmt = select(Book).offset(offset).limit(page_size)
result = await db.execute(stmt)
books = result.scalars().all()
return books
```
#### 25,ORM查询,总结
select() -> db.execute() - 从ORM对象获取数据 - 响应结果
条件查询 where() 聚合查询func.xxxx 分页查询offset() limit()
比较 count() select().offset().limit()
模糊 max() and min() offset = (current_page-1) * page_size
与或非 avg()
包含 sum()
对ORM对象获取数据的方式
获取所有数据 scalars().all()
获取单条数据
scalars().first() 获取第一个数据
scalar_one_or_none() 提取一个或者NULL
scalar() 提取标量值(配合聚合查询使用)
#### 26,ORM操作数据,添加数据
核心步骤:定义ORM对象 -- 添加对象到事务:add(对象) -> commit提交到数据库
```python
# ----------------- 26,ORM操作数据,添加数据--------------------------
# 需求:用户输入图书信息(id、书名、作者、价格、出版社)。使用HTTP请求体传递用户输入数据
class BookCreate(BaseModel):
id: int
bookname: str
author: str
price: float
publisher: str
@app.post("/book/add_book")
async def add_book(book: BookCreate, db: AsyncSession = Depends(get_database)):
# ORM对象 -- add - commit
# book.__dict__:获取BookCreate实例的所有属性,返回一个包含属性名和值的字典
# ** 操作符:将字典解包为关键字参数传递给Book构造函数
# Book(...):创建SQLAlchemy数据库模型实例
book_obj = Book(**book.__dict__)
db.add(book_obj)
await db.commit()
return book
```
#### 27,ORM操作数据 - 更新数据

```python
# ----------------- 27,ORM操作数据 - 更新数据--------------------------
# 需求: 修改图书信息,先查后改
class BookUpdate(BaseModel):
bookname: str
author: str
price: float
publisher: str
@app.put("/book/update_book/{book_id}")
async def update_book(book_id: int,data: BookUpdate, db: AsyncSession = Depends(get_database)):
# 根据book_id 先查询是否存在
db_book = await db.get(Book, book_id)
# 如果不存在,抛出异常
if db_book is None:
raise HTTPException(status_code=404, detail="Book not found")
# 更新data
db_book.bookname = data.bookname
db_book.author = data.author
db_book.price = data.price
db_book.publisher = data.publisher
# 提交到数据库
await db.commit()
return db_book
```
#### 28,ORM操作数据 - 删除数据
核心步骤:查询get - delete删除 --- commit提交到数据库

```python
# ----------------- 28,ORM操作数据 - 删除数据--------------------------
@app.delete("/book/delete-book/{book_id}")
async def delete_book(book_id: int, db: AsyncSession = Depends(get_database)):
# 先查再删除,然后提交
db_book = await db.get(Book, book_id)
if db_book is None:
raise HTTPException(status_code=404, detail="Book not found")
await db.delete(db_book)
await db.commit()
return {"msg": "Book deleted successfully"}
```
#### 29,sqlalchemy进行CURD操作

### 30,AI掘金头条新闻系统














#### 31,工程结构
packages:
crud 数据库增删改查逻辑(封装数据库操作)
models 数据库模型(SQLAlchemy ORM)
routers 路由层(按照模块划分)
schemas 数据验证模型(Pydantic数据校验)
utils 工具函数
config 配置相关

#### 32 模块化路由


```python
from fastapi import APIRouter
# 创建APIRouter实例。prefix 路由前缀。tags 分组标签。需要app挂载一下app.include_router(news.router)
router = APIRouter(prefix="/api/news", tags=["news"])
@router.get("/categories")
async def get_categories():
return {"msg": "获取分类成功"}
```
#### 33,数据库和ORM配置


#### 34,获取新闻分类

```py
# 接口实现流程
# 1. 模块化路由 API接口规范文档
# 2. 定义模型类 数据库表结构(数据库设计文档)
# 3. 在curd文件夹里面创建文件,封装操作数据库的方法
# 4. 在路由处理函数里面调用curd封装号的方法,响应结果
```
#### 36,解决跨域问题



```py
# 解决跨域问题
app.add_middleware(CORSMiddleware,
allow_origins=["*"], #允许的源,prod环境建议指定允许的域名列表
allow_credentials=True, # 允许携带cookie
allow_methods=["*"], # 允许的请求方法
allow_headers=["*"], # 允许的请求头
)
```
#### 37-40

更新字段:
```py
# 增加新闻浏览量
async def increase_news_views(db: AsyncSession, news_id: int):
# 浏览量+1
stmt = update(News).where(News.id == news_id).values(views=News.views + 1)
result = await db.execute(stmt)
await db.commit()
# 更新后,检查数据库是否真的命中了数据,命中了返回True
return result.rowcount > 0
```
列表推导式: order_by排序:
```py
# 获取当前新闻的关联新闻
async def get_related_news(db: AsyncSession, news_id: int, category_id: int, limit: int = 5):
# order_by 排序。按照浏览量和发布时间进行排序
stmt = select(News).where(News.category_id == category_id ,News.id != news_id).order_by(News.views.desc(), News.publish_time.desc()).limit(limit)
result = await db.execute(stmt)
# return result.scalars().all()
related_news = result.scalars().all()
# 列表推导式,推导出新闻的核心数据,然后再return
return [{
"id": news.id,
"title": news.title,
"content": news.content,
"image": news.image,
"author": news.author,
"publishTime": news.publish_time.strftime("%Y-%m-%d %H:%M:%S"),
"categoryId": news.category_id,
"views": news.views,
} for news in related_news]
```

#### 41项目总结

