连接数据库
前言
nil
连接关系型数据
使用ORM,比如SQLAlchemy
同步
from fastapi import FastAPI
from pydantic import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
app = FastAPI()
class Item(BaseModel):
id: int
name: str
description: str
engine = create_engine("mysql+mysqlconnector://user:password@localhost/testdb")
Session = sessionmaker(bind=engine)
session = Session()
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.post("/item/")
def create_item(item: Item):
session.add(item)
session.commit()
return {"item": item}
@app.get("/item/{item_id}")
def read_item(item_id: int, q: str = None):
item = session.query(Item).get(item_id)
return {"item": item}
@app.put("/item/{item_id}")
def update_item(item_id: int, item: Item):
item = session.query(Item).get(item_id)
item.name = item.name
item.description = item.description
session.commit()
return {"item": item}
@app.delete("/item/{item_id}")
def delete_item(item_id: int):
item = session.query(Item).get(item_id)
session.delete(item)
session.commit()
return {"message": "Item deleted"}
异步
nil
依赖注入形式
nil
连接Redis
- Python 3.11
- Redis 7.2
安装依赖
# 不一定要保持一致的版本,不同版本的API可能不兼容
pip install redis==5.0.2 fastapi==0.110.0 uvicorn==0.27.1
示例
为了方便在其它模块中使用redis,示例中将redis客户端放在单独的模块中,并在启动时建立redis连接,停止后关闭redis连接。
目录结构:
├── demo.py
└── pkg
├── __init__.py
└── redis
└── __init__.py
其中pkg/redis/__init__.py
内容如下(__init__.py
脚本会在import
时运行一次)
from redis import ConnectionPool, Redis
pool = ConnectionPool(
host="192.168.1.112",
port=6379,
db=1,
password="123456",
max_connections=10, # 最大连接池大小为10
encoding = "utf-8", # 用于指定返回utf-8编码的字符串
decode_responses=True, # 用于指定返回utf-8编码的字符串
)
redis_client = Redis(connection_pool=pool)
__all__ = ["redis_client"]
主代码文件demo.py
内容如下
from fastapi import FastAPI
from contextlib import asynccontextmanager
import uvicorn
from pkg.redis import redis_client
from datetime import datetime
def start_event():
uptime = datetime.now()
redis_client.set("uptime", uptime.strftime("%Y-%m-%d %H:%M:%S"))
def shutdown_event():
redis_client.close()
@asynccontextmanager
async def applifespan(app: FastAPI):
start_event()
yield
shutdown_event()
app = FastAPI(
lifespan=applifespan,
)
@app.get("/")
async def get_root():
uptime = redis_client.get("uptime")
return {"msg": f"uptime is {uptime}"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
常见操作
- 检测链接是否正常
redis_client.ping()
set
andget
redis_client.set("k1", "v1")
redis_client.get("k1")
redis_client.delete("k1")
redis_client.exists("k1") # 是否存在
redis_client.setex("greeting", 10, "hello world") # ttl为10秒
redis_client.ttl("greeting") # 获取指定键的当前ttl
redis_client.expire("greeting", 100) # 刷新指定键的ttl为100秒
# 设置字典
dict_data = {
"employee_name": "Adam Adams",
"employee_age": 30,
"position": "Software Engineer",
}
redis_client.mset(dict_data)
# 自增
redis_client.set("request_total", 0)
redis_client.get("request_total") # 自增1
redis_client.get("request_total", 10) # 自增10
- 管道操作。常用于批量操作,减少多轮多写导致的性能损耗。
with redis_client.pipeline() as pipe:
for _ in range(10000):
pipe.incr("request_total")
pipe.exe
- 列表。
lpush -> rpop
就是队列操作,先入先出。lpush -> lpop
为栈操作,先入后出。
# 向名为language的列表lpush
redis_client.lpush('language', 'python')
redis_client.lpush('language', 'go')
redis_client.lpush('language', 'rust')
redis_client.rpop('language') # output: python
redis_client.rpop('language') # output: go
redis_client.rpop('language') # output: rust
# 列表长度
redis_client.llen('language')
- 集合
# 向名为warframe的集合添加元素
redis_client.sadd('warframe', 'atlas')
redis_client.sadd('warframe', 'banshee')
# 删除元素
redis_client.srem('warframe', 'atlas')