跳到主要内容

实现基本功能

前言

可以使用app.config方式配置一些基础功能,但本文还是以单独模块的形式实现,以实现更灵活的功能。

公共功能模块个人习惯于放在pkg目录下,比如配置模块放在pkg/config,日志模块放在pkg/log

读取配置

配置模块个人一般放在pkg/config/config.py,然后在pkg/config/__init__.py中实例化一个单例对象。

以下为pkg/config/config.py的示例代码,只包含读取配置的部分,实际项目中一般只会读取一种类型的配置文件,而且会把具体参数写成方法和属性以便于其他模块使用。

from abc import ABCMeta, abstractmethod
from pathlib import Path
from typing import Optional
import configparser
import os
import json
import tomllib
import yaml

class Configer(metaclass=ABCMeta):
def __init__(self):
self.config: dict = {}
@abstractmethod
def read_config(self):
pass

def is_exists(self, config_file: str) -> bool:
return os.path.exists(config_file)

class JsonConfiger(Configer):
"""json配置文件解析类"""
def __init__(self, config_file: str):
if not self.is_exists(config_file):
raise FileNotFoundError(f"{config_file} is not exists")
self.config_file = config_file
self.config = self.read_config()

def read_config(self) -> Optional[dict]:
try:
with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f)
return config
except Exception as e:
raise Exception(f"Failed to read config file: {e}") from e

class IniConfiger(Configer):
"""ini配置文件解析类"""
def __int__(self, config_file: str):
if not os.path.exists(config_file):
raise Exception(f"Config file {config_file} not exists")
self.config_file = config_file
self.config = self.read_config()

def read_config(self) -> Optional[dict]:
config = configparser.ConfigParser()
try:
config.read(self.config_file, encoding="utf-8")
except Exception as e:
raise Exception(f"Read config file {self.config_file} error: {e}")
return config

class YamlConfiger(Configer):
"""yaml配置文件解析类"""
def __init__(self, config_file: str):
if not os.path.exists(config_file):
raise FileNotFoundError(f"Config file {config_file} not found")
self.config_file = config_file
self.config = self.read_config()

def read_config(self):
try:
with open(self.config_file, "r", encoding="utf-8") as f:
config = yaml.safe_load(f.read())
return config
except Exception as e:
raise Exception(f"Read config file {self.config_file} error: {e}")

class TomlConfiger(Configer):
def __init__(self, config_file):
if not os.path.exists(config_file):
raise Exception(f"Config file {config_file} does not exist")
self.config_file = config_file
self.config = self.read_config()

def read_config(self):
try:
with open(self.config_file, "rb") as f:
config = tomllib.load(f)
return config
except Exception as e:
raise Exception(f"Read config file {self.config_file} error:{e}")


def new_configer(config_file: str = "") -> Configer:
p = Path(config_file)
if p.suffix.lower() == ".toml":
return TomlConfiger(config_file)
elif p.suffix.lower() == ".json":
return JsonConfiger(config_file)
elif p.suffix.lower() == ".yaml":
return YamlConfiger(config_file)
elif p.suffix.lower() == ".yml":
return YamlConfiger(config_file)
elif p.suffix.lower() == ".ini":
return IniConfiger(config_file)
else:
raise ValueError("Unsupported config file type: {}".format(p.suffix))

pkg/config/__init__.py中实例化,这样其他模块可以直接使用这个单例。

from .config import new_configer

config_file = "conf/app.yaml"

configer = new_configer(config_file)

日志

flask默认会在控制台输出非结构化的请求日志,如果要输出json格式的日志,并且要把请求日志写到单独的文件中,可以通过先禁用默认请求日志,然后在钩子函数中自行记录请求的方式来实现。

下面代码定义了两个JSON日志格式化器,JsonFormatter 的日志格式是给普通代码内使用的,会记录调用函数、调用文件等信息,AccessLogFormatter的日志格式用于记录请求日志,记录请求路径、响应状态码、响应时间等信息。

FlaskLogger通过继承logging.Logger来实现一些自定义功能,比如指定格式化器、创建日志目录等。

代码文件:pkg/log/log.py

import logging
import sys
from logging.handlers import TimedRotatingFileHandler
import json
from pathlib import Path


class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord):
log_record = {
"@timestamp": self.formatTime(
record, "%Y-%m-%dT%H:%M:%S%z"
), # format iso 8601
"level": record.levelname,
"name": record.name,
"file": record.filename,
"lineno": record.lineno,
"func": record.funcName,
"message": record.getMessage(),
}
return json.dumps(log_record)


class AccessLogFormatter(logging.Formatter):
def format(self, record: logging.LogRecord):
log_record = {
"@timestamp": self.formatTime(
record, "%Y-%m-%dT%H:%M:%S%z"
), # format iso 8601
"remote_addr": getattr(record, "remote_addr", ""),
"scheme": getattr(record, "scheme", ""),
"method": getattr(record, "method", ""),
"host": getattr(record, "host", ""),
"path": getattr(record, "path", ""),
"status": getattr(record, "status", ""),
"response_length": getattr(record, "response_length", ""),
"response_time": getattr(record, "response_time", 0),
}
return json.dumps(log_record)


class FlaskLogger(logging.Logger):
"""自定义日志类, 设置请求日志和普通日志两个不同的日志器

Args:
name: str, 日志器名称, 默认为 __name__
level: int, 日志级别, 默认为 DEBUG
logfile: str, 日志文件名, 默认为 app.log
logdir: str, 日志文件目录, 默认为当前目录
access_log: bool, 是否用于记录访问日志, 默认为 False
console: bool, 是否输出到控制台, 默认为 True
json_log: bool, 是否使用json格式的日志, 默认为 True
"""

def __init__(
self,
name: str = __name__,
level: int = logging.DEBUG,
logfile: str = "app.log",
logdir: str = "",
access_log: bool = False,
console: bool = True,
json_log: bool = True,
):
super().__init__(name, level)
self.logfile = logfile
self.logdir = logdir
self.access_log = access_log
self.console = console
self.json_log = json_log
self.setup_logpath()
self.setup_handler()

def setup_logpath(self):
"""设置日志文件路径, 如果创建日志器时未指定日志目录, 则使用当前目录"""
if not self.logdir:
return

p = Path(self.logdir)
if not p.exists():
try:
p.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"Failed to create log directory: {e}")
sys.exit(1)

self.logfile = p / self.logfile

def setup_handler(self):
if self.json_log:
formatter = self.set_json_formatter()
else:
formatter = self.set_plain_formatter()
handler_file = self.set_handler_file(formatter)
handler_stdout = self.set_handler_stdout(formatter)
self.addHandler(handler_file)
if self.console:
self.addHandler(handler_stdout)

def set_plain_formatter(self):
fmt = "%(asctime)s | %(levelname)s | %(name)s | %(filename)s:%(lineno)d | %(funcName)s | %(message)s"
datefmt = "%Y-%m-%dT%H:%M:%S%z"
return logging.Formatter(fmt, datefmt=datefmt)

def set_json_formatter(self):
"""设置json格式的日志"""
if self.access_log:
return AccessLogFormatter()
return JsonFormatter()

def set_handler_stdout(self, formatter: logging.Formatter):
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
return handler

def set_handler_file(self, formatter: logging.Formatter):
handler = TimedRotatingFileHandler(
filename=self.logfile,
when="midnight",
interval=1,
backupCount=7,
encoding="utf-8",
)
handler.setFormatter(formatter)
return handler

pkg/log/__init__.py中实例化

from .log import FlaskLogger

access_logger = FlaskLogger("access", logdir="logs", access_log=True, logfile="access.log")
logger = FlaskLogger(logdir="logs")

__all__ = ["access_logger", "logger"]

编写请求日志中间件 pkg/middlewares/log.py

import time
from pkg.log import access_logger

class AccessLogMiddleware:
def __init__(self, app):
self.app = app
self.white_list = frozenset(
[
"/metrics",
"/health",
]
)

def __call__(self, environ, start_response):
log_entry = {
"remote_addr": environ.get("REMOTE_ADDR", "NaN"),
"method": environ.get("REQUEST_METHOD", "NaN"),
"scheme": environ.get("wsgi.url_scheme", "NaN"),
"host": environ.get("HTTP_HOST", "NaN"),
"path": environ.get("PATH_INFO", "NaN"),
"status":None,
"response_length": None,
"response_time": None,
}
resp_status_code = None
resp_length = None
resp_time = None

def catching_start_response(status, headers, exc_info=None):
nonlocal resp_status_code, resp_length, resp_time
resp_status_code = status.split(" ")[0]
for i in headers:
if i[0] == "Content-Length":
resp_length = int(i[1])
break
return start_response(status, headers, exc_info)

start_time = time.time()
response = self.app(environ, catching_start_response)
response_time = round(time.time() - start_time, 4)
log_entry["status"] = int(resp_status_code)
log_entry["response_length"] = resp_length
log_entry["response_time"] = response_time

if log_entry["path"] not in self.white_list:
access_logger.info("", extra=log_entry)

return response

pkg/middlewares/__init__.py中导出

from .log import AccessLogMiddleware

__all__ = [
"AccessLogMiddleware",
]

flask实例注册中间件

from pkg.middlerwares import AccessLogMiddleware
from werkzeug.middleware.dispatcher import DispatcherMiddleware

app = Flask(__name__)
app.wsgi_app = AccessLogMiddleware(app.wsgi_app)

监控

在服务监控中,大致可分为日志监控和指标监控。日志监控一般由类似ELK这样的日志系统去收集分析,而指标监控一般是由Prometheus收集服务的一些可量化的指标数据,比如服务响应码、响应时间。这里主要是要封装指标监控,一般会暴露HTTP API请求量,请求响应时间等

安装SDK:python -m pip install prometheus-client

编写指标监控模块:pkg/metrics/metrics.py

from prometheus_client import Counter, Info, Histogram
import socket
from functools import lru_cache
import sys
from pkg import Singleton

@Singleton
class Metrics:
app_info = Info("app_info", "Application info")
request_counter = Counter(
"http_requests_total",
"Total HTTP Requests",
["method", "path", "status"],
)
response_time = Histogram(
"http_response_time_seconds",
"HTTP Response Time",
["method", "path"],
)

def __init__(self):
self.app_info.info(
{
"version": self._get_version(),
"host": self.localip(),
"python_version": self.get_pyversion(),
}
)

def _get_version(self) -> str:
return "0.1.0"

@lru_cache(maxsize=128)
def localip(self):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 53))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"

@lru_cache(maxsize=128)
def get_pyversion(self):
return sys.version.split(" ")[0]

用到了单例模式,单例模式的代码如下

class Singleton:  
def __init__(self, cls):
self._cls = cls
self._instance = {}

def __call__(self):
if self._cls not in self._instance:
self._instance[self._cls] = self._cls()
return self._instance[self._cls]

pkg/metrics/__init_.py中实例化。其实实现类用单例模式装饰了,要不要实例化都行

from .metrics import Metrics

collector = Metrics()

编写指标监控中间件:pkg/middlewares/metrics.py

from pkg.metrics import collector
import time

class MetricsMiddleware:
def __init__(self, app):
self.app = app
self.white_list = frozenset(
[
"/metrics",
"/health",
]
)

def __call__(self, environ, start_response):
method = environ.get("REQUEST_METHOD", "NaN")
path = environ.get("PATH_INFO", "NaN")
resp_status_code = None

def catching_start_response(status, headers, exc_info=None):
nonlocal resp_status_code
resp_status_code = status.split(" ")[0]
return start_response(status, headers, exc_info)

start_time = time.time()
response = self.app(environ, catching_start_response)
response_time = round(time.time() - start_time, 4)

if path not in self.white_list:
collector.request_counter.labels(method, path, resp_status_code).inc()
collector.response_time.labels(method, path).observe(response_time)

return response

pkg/middlewares/__init__.py中导出

from .metrics import MetricsMiddleware

__all__ = [
"MetricsMiddleware",
]

在flask实例中注册指标监控中间件

from pkg.middlerwares import MetricsMiddleware
from prometheus_client import make_wsgi_app
from werkzeug.middleware.dispatcher import DispatcherMiddleware

app = Flask(__name__)
app.wsgi_app = DispatcherMiddleware(
app.wsgi_app, {
'/metrics': make_wsgi_app()
}
)
app.wsgi_app = MetricsMiddleware(app.wsgi_app)

连接数据库

关系型数据库

使用sqlalchemy

Redis

  • Flask-Cache