feat: 初始化项目架构
This commit is contained in:
184
ruoyi-fastapi-backend/utils/common_util.py
Normal file
184
ruoyi-fastapi-backend/utils/common_util.py
Normal file
@@ -0,0 +1,184 @@
|
||||
import pandas as pd
|
||||
import io
|
||||
import os
|
||||
from openpyxl import Workbook
|
||||
from openpyxl.styles import Alignment, PatternFill
|
||||
from openpyxl.utils import get_column_letter
|
||||
from openpyxl.worksheet.datavalidation import DataValidation
|
||||
from typing import List
|
||||
from config.env import CachePathConfig
|
||||
|
||||
|
||||
def worship():
|
||||
print("""
|
||||
////////////////////////////////////////////////////////////////////
|
||||
// _ooOoo_ //
|
||||
// o8888888o //
|
||||
// 88" . "88 //
|
||||
// (| ^_^ |) //
|
||||
// O\ = /O //
|
||||
// ____/`---'\____ //
|
||||
// .' \\| |// `. //
|
||||
// / \\||| : |||// \ //
|
||||
// / _||||| -:- |||||- \ //
|
||||
// | | \\\ - /// | | //
|
||||
// | \_| ''\---/'' | | //
|
||||
// \ .-\__ `-` ___/-. / //
|
||||
// ___`. .' /--.--\ `. . ___ //
|
||||
// ."" '< `.___\_<|>_/___.' >'"". //
|
||||
// | | : `- \`.;`\ _ /`;.`/ - ` : | | //
|
||||
// \ \ `-. \_ __\ /__ _/ .-` / / //
|
||||
// ========`-.____`-.___\_____/___.-`____.-'======== //
|
||||
// `=---=' //
|
||||
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ //
|
||||
// 佛祖保佑 永不宕机 永无BUG //
|
||||
////////////////////////////////////////////////////////////////////
|
||||
""")
|
||||
|
||||
|
||||
class CamelCaseUtil:
|
||||
"""
|
||||
下划线形式(snake_case)转换为小驼峰形式(camelCase)工具方法
|
||||
"""
|
||||
@classmethod
|
||||
def __to_camel_case(cls, snake_str):
|
||||
"""
|
||||
下划线形式字符串(snake_case)转换为小驼峰形式字符串(camelCase)
|
||||
:param snake_str: 下划线形式字符串
|
||||
:return: 小驼峰形式字符串
|
||||
"""
|
||||
# 分割字符串
|
||||
words = snake_str.split('_')
|
||||
# 小驼峰命名,第一个词首字母小写,其余词首字母大写
|
||||
return words[0] + ''.join(word.capitalize() for word in words[1:])
|
||||
|
||||
@classmethod
|
||||
def transform_result(cls, result):
|
||||
"""
|
||||
针对不同类型将下划线形式(snake_case)批量转换为小驼峰形式(camelCase)方法
|
||||
:param result: 输入数据
|
||||
:return: 小驼峰形式结果
|
||||
"""
|
||||
if result is None:
|
||||
return result
|
||||
# 如果是字典,直接转换键
|
||||
elif isinstance(result, dict):
|
||||
return {cls.__to_camel_case(k): v for k, v in result.items()}
|
||||
# 如果是一组字典或其他类型的列表,遍历列表进行转换
|
||||
elif isinstance(result, list):
|
||||
return [cls.transform_result(row) if isinstance(row, dict) else cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) for row in result]
|
||||
# 如果是其他类型,如模型实例,先转换为字典
|
||||
else:
|
||||
return cls.transform_result({c.name: getattr(result, c.name) for c in result.__table__.columns})
|
||||
|
||||
|
||||
def bytes2human(n, format_str="%(value).1f%(symbol)s"):
|
||||
"""Used by various scripts. See:
|
||||
http://goo.gl/zeJZl
|
||||
|
||||
>>> bytes2human(10000)
|
||||
'9.8K'
|
||||
>>> bytes2human(100001221)
|
||||
'95.4M'
|
||||
"""
|
||||
symbols = ('B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB')
|
||||
prefix = {}
|
||||
for i, s in enumerate(symbols[1:]):
|
||||
prefix[s] = 1 << (i + 1) * 10
|
||||
for symbol in reversed(symbols[1:]):
|
||||
if n >= prefix[symbol]:
|
||||
value = float(n) / prefix[symbol]
|
||||
return format_str % locals()
|
||||
return format_str % dict(symbol=symbols[0], value=n)
|
||||
|
||||
|
||||
def bytes2file_response(bytes_info):
|
||||
yield bytes_info
|
||||
|
||||
|
||||
def export_list2excel(list_data: List):
|
||||
"""
|
||||
工具方法:将需要导出的list数据转化为对应excel的二进制数据
|
||||
:param list_data: 数据列表
|
||||
:return: 字典信息对应excel的二进制数据
|
||||
"""
|
||||
df = pd.DataFrame(list_data)
|
||||
binary_data = io.BytesIO()
|
||||
df.to_excel(binary_data, index=False, engine='openpyxl')
|
||||
binary_data = binary_data.getvalue()
|
||||
|
||||
return binary_data
|
||||
|
||||
|
||||
def get_excel_template(header_list: List, selector_header_list: List, option_list: List[dict]):
|
||||
"""
|
||||
工具方法:将需要导出的list数据转化为对应excel的二进制数据
|
||||
:param header_list: 表头数据列表
|
||||
:param selector_header_list: 需要设置为选择器格式的表头数据列表
|
||||
:param option_list: 选择器格式的表头预设的选项列表
|
||||
:return: 模板excel的二进制数据
|
||||
"""
|
||||
# 创建Excel工作簿
|
||||
wb = Workbook()
|
||||
# 选择默认的活动工作表
|
||||
ws = wb.active
|
||||
|
||||
# 设置表头文字
|
||||
headers = header_list
|
||||
|
||||
# 设置表头背景样式为灰色,前景色为白色
|
||||
header_fill = PatternFill(start_color="ababab", end_color="ababab", fill_type="solid")
|
||||
|
||||
# 将表头写入第一行
|
||||
for col_num, header in enumerate(headers, 1):
|
||||
cell = ws.cell(row=1, column=col_num)
|
||||
cell.value = header
|
||||
cell.fill = header_fill
|
||||
# 设置列宽度为16
|
||||
ws.column_dimensions[chr(64 + col_num)].width = 12
|
||||
# 设置水平居中对齐
|
||||
cell.alignment = Alignment(horizontal='center')
|
||||
|
||||
# 设置选择器的预设选项
|
||||
options = option_list
|
||||
|
||||
# 获取selector_header的字母索引
|
||||
for selector_header in selector_header_list:
|
||||
column_selector_header_index = headers.index(selector_header) + 1
|
||||
|
||||
# 创建数据有效性规则
|
||||
header_option = []
|
||||
for option in options:
|
||||
if option.get(selector_header):
|
||||
header_option = option.get(selector_header)
|
||||
dv = DataValidation(type="list", formula1=f'"{",".join(header_option)}"')
|
||||
# 设置数据有效性规则的起始单元格和结束单元格
|
||||
dv.add(
|
||||
f'{get_column_letter(column_selector_header_index)}2:{get_column_letter(column_selector_header_index)}1048576')
|
||||
# 添加数据有效性规则到工作表
|
||||
ws.add_data_validation(dv)
|
||||
|
||||
# 保存Excel文件为字节类型的数据
|
||||
file = io.BytesIO()
|
||||
wb.save(file)
|
||||
file.seek(0)
|
||||
|
||||
# 读取字节数据
|
||||
excel_data = file.getvalue()
|
||||
|
||||
return excel_data
|
||||
|
||||
|
||||
def get_filepath_from_url(url: str):
|
||||
"""
|
||||
工具方法:根据请求参数获取文件路径
|
||||
:param url: 请求参数中的url参数
|
||||
:return: 文件路径
|
||||
"""
|
||||
file_info = url.split("?")[1].split("&")
|
||||
task_id = file_info[0].split("=")[1]
|
||||
file_name = file_info[1].split("=")[1]
|
||||
task_path = file_info[2].split("=")[1]
|
||||
filepath = os.path.join(CachePathConfig.PATH, task_path, task_id, file_name)
|
||||
|
||||
return filepath
|
11
ruoyi-fastapi-backend/utils/log_util.py
Normal file
11
ruoyi-fastapi-backend/utils/log_util.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import os
|
||||
import time
|
||||
from loguru import logger
|
||||
|
||||
log_path = os.path.join(os.getcwd(), 'logs')
|
||||
if not os.path.exists(log_path):
|
||||
os.mkdir(log_path)
|
||||
|
||||
log_path_error = os.path.join(log_path, f'{time.strftime("%Y-%m-%d")}_error.log')
|
||||
|
||||
logger.add(log_path_error, rotation="50MB", encoding="utf-8", enqueue=True, compression="zip")
|
5
ruoyi-fastapi-backend/utils/message_util.py
Normal file
5
ruoyi-fastapi-backend/utils/message_util.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from utils.log_util import logger
|
||||
|
||||
|
||||
def message_service(sms_code: str):
|
||||
logger.info(f"短信验证码为{sms_code}")
|
98
ruoyi-fastapi-backend/utils/page_util.py
Normal file
98
ruoyi-fastapi-backend/utils/page_util.py
Normal file
@@ -0,0 +1,98 @@
|
||||
import math
|
||||
from typing import Optional, List
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
from pydantic.alias_generators import to_camel
|
||||
|
||||
|
||||
class PageModel(BaseModel):
|
||||
"""
|
||||
分页模型
|
||||
"""
|
||||
offset: int
|
||||
page_num: int
|
||||
page_size: int
|
||||
total: int
|
||||
has_next: bool
|
||||
|
||||
|
||||
class PageObjectResponse(BaseModel):
|
||||
"""
|
||||
用户管理列表分页查询返回模型
|
||||
"""
|
||||
rows: List = []
|
||||
page_num: int
|
||||
page_size: int
|
||||
total: int
|
||||
has_next: bool
|
||||
|
||||
|
||||
class PageResponseModel(BaseModel):
|
||||
"""
|
||||
列表分页查询返回模型
|
||||
"""
|
||||
model_config = ConfigDict(alias_generator=to_camel)
|
||||
|
||||
rows: List = []
|
||||
page_num: Optional[int] = None
|
||||
page_size: Optional[int] = None
|
||||
total: int
|
||||
has_next: Optional[bool] = None
|
||||
|
||||
|
||||
def get_page_info(offset: int, page_num: int, page_size: int, count: int):
|
||||
"""
|
||||
根据分页参数获取分页信息
|
||||
:param offset: 起始数据位置
|
||||
:param page_num: 当前页码
|
||||
:param page_size: 当前页面数据量
|
||||
:param count: 数据总数
|
||||
:return: 分页信息对象
|
||||
"""
|
||||
has_next = False
|
||||
if offset >= count:
|
||||
res_offset_1 = (page_num - 2) * page_size
|
||||
if res_offset_1 < 0:
|
||||
res_offset = 0
|
||||
res_page_num = 1
|
||||
else:
|
||||
res_offset = res_offset_1
|
||||
res_page_num = page_num - 1
|
||||
else:
|
||||
res_offset = offset
|
||||
if (res_offset + page_size) < count:
|
||||
has_next = True
|
||||
res_page_num = page_num
|
||||
|
||||
result = dict(offset=res_offset, page_num=res_page_num, page_size=page_size, total=count, has_next=has_next)
|
||||
|
||||
return PageModel(**result)
|
||||
|
||||
|
||||
def get_page_obj(data_list: List, page_num: int, page_size: int):
|
||||
"""
|
||||
输入数据列表data_list和分页信息,返回分页数据列表结果
|
||||
:param data_list: 原始数据列表
|
||||
:param page_num: 当前页码
|
||||
:param page_size: 当前页面数据量
|
||||
:return: 分页数据对象
|
||||
"""
|
||||
# 计算起始索引和结束索引
|
||||
start = (page_num - 1) * page_size
|
||||
end = page_num * page_size
|
||||
|
||||
# 根据计算得到的起始索引和结束索引对数据列表进行切片
|
||||
paginated_data = data_list[start:end]
|
||||
has_next = True if math.ceil(len(data_list) / page_size) > page_num else False
|
||||
|
||||
result = PageResponseModel(
|
||||
rows=paginated_data,
|
||||
pageNum=page_num,
|
||||
pageSize=page_size,
|
||||
total=len(data_list),
|
||||
hasNext=has_next
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
28
ruoyi-fastapi-backend/utils/pwd_util.py
Normal file
28
ruoyi-fastapi-backend/utils/pwd_util.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from passlib.context import CryptContext
|
||||
|
||||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
|
||||
|
||||
class PwdUtil:
|
||||
"""
|
||||
密码工具类
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def verify_password(cls, plain_password, hashed_password):
|
||||
"""
|
||||
工具方法:校验当前输入的密码与数据库存储的密码是否一致
|
||||
:param plain_password: 当前输入的密码
|
||||
:param hashed_password: 数据库存储的密码
|
||||
:return: 校验结果
|
||||
"""
|
||||
return pwd_context.verify(plain_password, hashed_password)
|
||||
|
||||
@classmethod
|
||||
def get_password_hash(cls, input_password):
|
||||
"""
|
||||
工具方法:对当前输入的密码进行加密
|
||||
:param input_password: 输入的密码
|
||||
:return: 加密成功的密码
|
||||
"""
|
||||
return pwd_context.hash(input_password)
|
301
ruoyi-fastapi-backend/utils/response_util.py
Normal file
301
ruoyi-fastapi-backend/utils/response_util.py
Normal file
@@ -0,0 +1,301 @@
|
||||
from fastapi import status
|
||||
from fastapi.responses import JSONResponse, Response, StreamingResponse
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from typing import Any, Dict, Optional
|
||||
from pydantic import BaseModel
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class ResponseUtil:
|
||||
"""
|
||||
响应工具类
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def success(cls, msg: str = '操作成功', data: Optional[Any] = None, rows: Optional[Any] = None,
|
||||
dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response:
|
||||
"""
|
||||
成功响应方法
|
||||
:param msg: 可选,自定义成功响应信息
|
||||
:param data: 可选,成功响应结果中属性为data的值
|
||||
:param rows: 可选,成功响应结果中属性为rows的值
|
||||
:param dict_content: 可选,dict类型,成功响应结果中自定义属性的值
|
||||
:param model_content: 可选,BaseModel类型,成功响应结果中自定义属性的值
|
||||
:return: 成功响应结果
|
||||
"""
|
||||
result = {
|
||||
'code': 200,
|
||||
'msg': msg
|
||||
}
|
||||
|
||||
if data is not None:
|
||||
result['data'] = data
|
||||
if rows is not None:
|
||||
result['rows'] = rows
|
||||
if dict_content is not None:
|
||||
result.update(dict_content)
|
||||
if model_content is not None:
|
||||
result.update(model_content.model_dump(by_alias=True))
|
||||
|
||||
result.update({'success': True, 'time': datetime.now()})
|
||||
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_200_OK,
|
||||
content=jsonable_encoder(result)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def failure(cls, msg: str = '操作失败', data: Optional[Any] = None, rows: Optional[Any] = None,
|
||||
dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response:
|
||||
"""
|
||||
失败响应方法
|
||||
:param msg: 可选,自定义失败响应信息
|
||||
:param data: 可选,失败响应结果中属性为data的值
|
||||
:param rows: 可选,失败响应结果中属性为rows的值
|
||||
:param dict_content: 可选,dict类型,失败响应结果中自定义属性的值
|
||||
:param model_content: 可选,BaseModel类型,失败响应结果中自定义属性的值
|
||||
:return: 失败响应结果
|
||||
"""
|
||||
result = {
|
||||
'code': 601,
|
||||
'msg': msg
|
||||
}
|
||||
|
||||
if data is not None:
|
||||
result['data'] = data
|
||||
if rows is not None:
|
||||
result['rows'] = rows
|
||||
if dict_content is not None:
|
||||
result.update(dict_content)
|
||||
if model_content is not None:
|
||||
result.update(model_content.model_dump(by_alias=True))
|
||||
|
||||
result.update({'success': False, 'time': datetime.now()})
|
||||
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_200_OK,
|
||||
content=jsonable_encoder(result)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unauthorized(cls, msg: str = '登录信息已过期,访问系统资源失败', data: Optional[Any] = None, rows: Optional[Any] = None,
|
||||
dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response:
|
||||
"""
|
||||
未认证响应方法
|
||||
:param msg: 可选,自定义未认证响应信息
|
||||
:param data: 可选,未认证响应结果中属性为data的值
|
||||
:param rows: 可选,未认证响应结果中属性为rows的值
|
||||
:param dict_content: 可选,dict类型,未认证响应结果中自定义属性的值
|
||||
:param model_content: 可选,BaseModel类型,未认证响应结果中自定义属性的值
|
||||
:return: 未认证响应结果
|
||||
"""
|
||||
result = {
|
||||
'code': 401,
|
||||
'msg': msg
|
||||
}
|
||||
|
||||
if data is not None:
|
||||
result['data'] = data
|
||||
if rows is not None:
|
||||
result['rows'] = rows
|
||||
if dict_content is not None:
|
||||
result.update(dict_content)
|
||||
if model_content is not None:
|
||||
result.update(model_content.model_dump(by_alias=True))
|
||||
|
||||
result.update({'success': False, 'time': datetime.now()})
|
||||
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_200_OK,
|
||||
content=jsonable_encoder(result)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def forbidden(cls, msg: str = '该用户无此接口权限', data: Optional[Any] = None, rows: Optional[Any] = None,
|
||||
dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response:
|
||||
"""
|
||||
未认证响应方法
|
||||
:param msg: 可选,自定义未认证响应信息
|
||||
:param data: 可选,未认证响应结果中属性为data的值
|
||||
:param rows: 可选,未认证响应结果中属性为rows的值
|
||||
:param dict_content: 可选,dict类型,未认证响应结果中自定义属性的值
|
||||
:param model_content: 可选,BaseModel类型,未认证响应结果中自定义属性的值
|
||||
:return: 未认证响应结果
|
||||
"""
|
||||
result = {
|
||||
'code': 403,
|
||||
'msg': msg
|
||||
}
|
||||
|
||||
if data is not None:
|
||||
result['data'] = data
|
||||
if rows is not None:
|
||||
result['rows'] = rows
|
||||
if dict_content is not None:
|
||||
result.update(dict_content)
|
||||
if model_content is not None:
|
||||
result.update(model_content.model_dump(by_alias=True))
|
||||
|
||||
result.update({'success': False, 'time': datetime.now()})
|
||||
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_200_OK,
|
||||
content=jsonable_encoder(result)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def error(cls, msg: str = '接口异常', data: Optional[Any] = None, rows: Optional[Any] = None,
|
||||
dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response:
|
||||
"""
|
||||
错误响应方法
|
||||
:param msg: 可选,自定义错误响应信息
|
||||
:param data: 可选,错误响应结果中属性为data的值
|
||||
:param rows: 可选,错误响应结果中属性为rows的值
|
||||
:param dict_content: 可选,dict类型,错误响应结果中自定义属性的值
|
||||
:param model_content: 可选,BaseModel类型,错误响应结果中自定义属性的值
|
||||
:return: 错误响应结果
|
||||
"""
|
||||
result = {
|
||||
'code': 500,
|
||||
'msg': msg
|
||||
}
|
||||
|
||||
if data is not None:
|
||||
result['data'] = data
|
||||
if rows is not None:
|
||||
result['rows'] = rows
|
||||
if dict_content is not None:
|
||||
result.update(dict_content)
|
||||
if model_content is not None:
|
||||
result.update(model_content.model_dump(by_alias=True))
|
||||
|
||||
result.update({'success': False, 'time': datetime.now()})
|
||||
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_200_OK,
|
||||
content=jsonable_encoder(result)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def streaming(cls, *, data: Any = None):
|
||||
"""
|
||||
流式响应方法
|
||||
:param data: 流式传输的内容
|
||||
:return: 流式响应结果
|
||||
"""
|
||||
return StreamingResponse(
|
||||
status_code=status.HTTP_200_OK,
|
||||
content=data
|
||||
)
|
||||
|
||||
|
||||
def response_200(*, data: Any = None, message="获取成功") -> Response:
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_200_OK,
|
||||
content=jsonable_encoder(
|
||||
{
|
||||
'code': 200,
|
||||
'message': message,
|
||||
'data': data,
|
||||
'success': 'true',
|
||||
'time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def response_400(*, data: Any = None, message: str = "获取失败") -> Response:
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
content=jsonable_encoder(
|
||||
{
|
||||
'code': 400,
|
||||
'message': message,
|
||||
'data': data,
|
||||
'success': 'false',
|
||||
'time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def response_401(*, data: Any = None, message: str = "获取失败") -> Response:
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
content=jsonable_encoder(
|
||||
{
|
||||
'code': 401,
|
||||
'message': message,
|
||||
'data': data,
|
||||
'success': 'false',
|
||||
'time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def response_403(*, data: Any = None, message: str = "获取失败") -> Response:
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
content=jsonable_encoder(
|
||||
{
|
||||
'code': 403,
|
||||
'message': message,
|
||||
'data': data,
|
||||
'success': 'false',
|
||||
'time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def response_500(*, data: Any = None, message: str = "接口异常") -> Response:
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
content=jsonable_encoder(
|
||||
{
|
||||
'code': 500,
|
||||
'message': message,
|
||||
'data': data,
|
||||
'success': 'false',
|
||||
'time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def streaming_response_200(*, data: Any = None):
|
||||
return StreamingResponse(
|
||||
status_code=status.HTTP_200_OK,
|
||||
content=data,
|
||||
)
|
||||
|
||||
|
||||
class AuthException(Exception):
|
||||
"""
|
||||
自定义令牌异常AuthException
|
||||
"""
|
||||
|
||||
def __init__(self, data: str = None, message: str = None):
|
||||
self.data = data
|
||||
self.message = message
|
||||
|
||||
|
||||
class PermissionException(Exception):
|
||||
"""
|
||||
自定义权限异常PermissionException
|
||||
"""
|
||||
|
||||
def __init__(self, data: str = None, message: str = None):
|
||||
self.data = data
|
||||
self.message = message
|
||||
|
||||
|
||||
class LoginException(Exception):
|
||||
"""
|
||||
自定义登录异常LoginException
|
||||
"""
|
||||
|
||||
def __init__(self, data: str = None, message: str = None):
|
||||
self.data = data
|
||||
self.message = message
|
51
ruoyi-fastapi-backend/utils/time_format_util.py
Normal file
51
ruoyi-fastapi-backend/utils/time_format_util.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import datetime
|
||||
|
||||
|
||||
def object_format_datetime(obj):
|
||||
"""
|
||||
:param obj: 输入一个对象
|
||||
:return:对目标对象所有datetime类型的属性格式化
|
||||
"""
|
||||
for attr in dir(obj):
|
||||
value = getattr(obj, attr)
|
||||
if isinstance(value, datetime.datetime):
|
||||
setattr(obj, attr, value.strftime('%Y-%m-%d %H:%M:%S'))
|
||||
return obj
|
||||
|
||||
|
||||
def list_format_datetime(lst):
|
||||
"""
|
||||
:param lst: 输入一个嵌套对象的列表
|
||||
:return: 对目标列表中所有对象的datetime类型的属性格式化
|
||||
"""
|
||||
for obj in lst:
|
||||
for attr in dir(obj):
|
||||
value = getattr(obj, attr)
|
||||
if isinstance(value, datetime.datetime):
|
||||
setattr(obj, attr, value.strftime('%Y-%m-%d %H:%M:%S'))
|
||||
return lst
|
||||
|
||||
|
||||
def format_datetime_dict_list(dicts):
|
||||
"""
|
||||
递归遍历嵌套字典,并将 datetime 值转换为字符串格式
|
||||
:param dicts: 输入一个嵌套字典的列表
|
||||
:return: 对目标列表中所有字典的datetime类型的属性格式化
|
||||
"""
|
||||
result = []
|
||||
|
||||
for item in dicts:
|
||||
new_item = {}
|
||||
for k, v in item.items():
|
||||
if isinstance(v, dict):
|
||||
# 递归遍历子字典
|
||||
new_item[k] = format_datetime_dict_list([v])[0]
|
||||
elif isinstance(v, datetime.datetime):
|
||||
# 如果值是 datetime 类型,则格式化为字符串
|
||||
new_item[k] = v.strftime('%Y-%m-%d %H:%M:%S')
|
||||
else:
|
||||
# 否则保留原始值
|
||||
new_item[k] = v
|
||||
result.append(new_item)
|
||||
|
||||
return result
|
Reference in New Issue
Block a user