Files
RuoYi-Vue3-FastAPI/ruoyi-fastapi-backend/module_generator/templates/python/service.py.jinja2

212 lines
11 KiB
Django/Jinja
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{% set pkField = pkColumn.python_field %}
{% set pk_field = pkColumn.python_field | camel_to_snake %}
{% set pkParentheseIndex = pkColumn.column_comment.find("") %}
{% set pk_field_comment = pkColumn.column_comment[:pkParentheseIndex] if pkParentheseIndex != -1 else pkColumn.column_comment %}
{% if dicts %}
from fastapi import Request
{% endif %}
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from config.constant import CommonConstant
from exceptions.exception import ServiceException
from module_admin.entity.vo.common_vo import CrudResponseModel
{% if dicts %}
from module_admin.service.dict_service import DictDataService
{% endif %}
from {{ packageName }}.dao.{{ businessName }}_dao import {{ BusinessName }}Dao
from {{ packageName }}.entity.vo.{{ businessName }}_vo import Delete{{ BusinessName }}Model, {{ BusinessName }}Model, {{ BusinessName }}PageQueryModel
from utils.common_util import CamelCaseUtil
from utils.excel_util import ExcelUtil
class {{ BusinessName }}Service:
"""
{{ functionName }}模块服务层
"""
@classmethod
async def get_{{ businessName }}_list_services(
cls, query_db: AsyncSession, query_object: {{ BusinessName }}PageQueryModel, is_page: bool = False
):
"""
获取{{ functionName }}列表信息service
:param query_db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: {{ functionName }}列表信息对象
"""
{{ businessName }}_list_result = await {{ BusinessName }}Dao.get_{{ businessName }}_list(query_db, query_object, is_page)
return {{ businessName }}_list_result
{% for column in columns %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
{% if column.unique %}
@classmethod
async def check_{{ column.python_field | camel_to_snake }}_unique_services(cls, query_db: AsyncSession, page_object: {{ BusinessName }}Model):
"""
检查{{ comment }}是否唯一service
:param query_db: orm对象
:param page_object: {{ functionName }}对象
:return: 校验结果
"""
{{ pk_field }} = -1 if page_object.{{ pk_field }} is None else page_object.{{ pk_field }}
{{ businessName }} = await {{ BusinessName }}Dao.get_{{ businessName }}_detail_by_info(query_db, {{ BusinessName }}Model({{ column.python_field }}=page_object.{{ column.python_field | camel_to_snake }}))
if {{ businessName }} and {{ businessName }}.{{ pk_field }} != {{ pk_field }}:
return CommonConstant.NOT_UNIQUE
return CommonConstant.UNIQUE
{% if not loop.last %}{{ "\n" }}{% endif %}
{% endif %}
{% endfor %}
@classmethod
async def add_{{ businessName }}_services(cls, query_db: AsyncSession, page_object: {{ BusinessName }}Model):
"""
新增{{ functionName }}信息service
:param query_db: orm对象
:param page_object: 新增{{ functionName }}对象
:return: 新增{{ functionName }}校验结果
"""
{% for column in columns %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
{% if column.unique %}
if not await cls.check_{{ column.python_field | camel_to_snake }}_unique_services(query_db, page_object):
raise ServiceException(message=f'新增{{ functionName }}{page_object.{{ column.python_field | camel_to_snake }}}失败,{{ comment }}已存在')
{% endif %}
{% endfor %}
try:
{% if table.sub %}
add_{{ businessName }} = await {{ BusinessName }}Dao.add_{{ businessName }}_dao(query_db, page_object)
if add_{{ businessName }}:
for sub_table in page_object.{{ subclassName }}_list:
await {{ BusinessName }}Dao.add_{{ subTable.business_name }}_dao(query_db, sub_table)
{% else %}
await {{ BusinessName }}Dao.add_{{ businessName }}_dao(query_db, page_object)
{% endif %}
await query_db.commit()
return CrudResponseModel(is_success=True, message='新增成功')
except Exception as e:
await query_db.rollback()
raise e
@classmethod
async def edit_{{ businessName }}_services(cls, query_db: AsyncSession, page_object: {{ BusinessName }}Model):
"""
编辑{{ functionName }}信息service
:param query_db: orm对象
:param page_object: 编辑{{ functionName }}对象
:return: 编辑{{ functionName }}校验结果
"""
edit_{{ businessName }} = page_object.model_dump(exclude_unset=True, exclude={% raw %}{{% endraw %}{% if table.sub %}'{{ subclassName }}_list', {% endif %}{% for column in columns %}{% if not column.edit and not column.pk and column.column_name not in column_not_edit_show %}'{{ column.python_field | camel_to_snake }}'{% if not loop.last %}, {% endif %}{% endif %}{% endfor %}{% raw %}}{% endraw %})
{{ businessName }}_info = await cls.{{ businessName }}_detail_services(query_db, page_object.{{ pk_field }})
if {{ businessName }}_info.{{ pk_field }}:
{% for column in columns %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
{% if column.unique %}
if not await cls.check_{{ column.python_field | camel_to_snake }}_unique_services(query_db, page_object):
raise ServiceException(message=f'修改{{ functionName }}{page_object.{{ column.python_field | camel_to_snake }}}失败,{{ comment }}已存在')
{% endif %}
{% endfor %}
try:
await {{ BusinessName }}Dao.edit_{{ businessName }}_dao(query_db, edit_{{ businessName }})
{% if table.sub %}
for sub_table in {{ businessName }}_info.{{ subclassName }}_list:
await {{ BusinessName }}Dao.delete_{{ subTable.business_name }}_dao(query_db, sub_table)
for sub_table in page_object.{{ subclassName }}_list:
await {{ BusinessName }}Dao.add_{{ subTable.business_name }}_dao(query_db, sub_table)
{% endif %}
await query_db.commit()
return CrudResponseModel(is_success=True, message='更新成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='{{ functionName }}不存在')
@classmethod
async def delete_{{ businessName }}_services(cls, query_db: AsyncSession, page_object: Delete{{ BusinessName }}Model):
"""
删除{{ functionName }}信息service
:param query_db: orm对象
:param page_object: 删除{{ functionName }}对象
:return: 删除{{ functionName }}校验结果
"""
if page_object.{{ pk_field }}s:
{{ pk_field }}_list = page_object.{{ pk_field }}s.split(',')
try:
for {{ pk_field }} in {{ pk_field }}_list:
{% if table.sub %}
{{ businessName }} = await cls.{{ businessName }}_detail_services(query_db, int({{ pk_field }}))
for sub_table in {{ businessName }}.{{ subclassName }}_list:
await {{ BusinessName }}Dao.delete_{{ subTable.business_name }}_dao(query_db, sub_table)
{% endif %}
await {{ BusinessName }}Dao.delete_{{ businessName }}_dao(query_db, {{ BusinessName }}Model({{ pkField }}={{ pk_field }}))
await query_db.commit()
return CrudResponseModel(is_success=True, message='删除成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='传入{{ pk_field_comment }}为空')
@classmethod
async def {{ businessName }}_detail_services(cls, query_db: AsyncSession, {{ pk_field }}: int):
"""
获取{{ functionName }}详细信息service
:param query_db: orm对象
:param {{ pk_field }}: {{ pk_field_comment }}
:return: {{ pk_field_comment }}对应的信息
"""
{{ businessName }} = await {{ BusinessName }}Dao.get_{{ businessName }}_detail_by_id(query_db, {{ pk_field }}={{ pk_field }})
if {{ businessName }}:
result = {{ BusinessName }}Model(**CamelCaseUtil.transform_result({{ businessName }}))
else:
result = {{ BusinessName }}Model(**dict())
return result
@staticmethod
async def export_{{ businessName }}_list_services({% if dicts %}request: Request, {% endif %}{{ businessName }}_list: List):
"""
导出{{ functionName }}信息service
:param {{ businessName }}_list: {{ functionName }}信息列表
:return: {{ functionName }}信息对应excel的二进制数据
"""
# 创建一个映射字典,将英文键映射到中文键
mapping_dict = {
{% for column in columns %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
'{{ column.python_field }}': '{{ comment }}',
{% endfor %}
}
{% if dicts %}
{% for dict_type in dicts.split(", ") %}
{{ dict_type[1:-1] }}_list = await DictDataService.query_dict_data_list_from_cache_services(
request.app.state.redis, dict_type={{ dict_type }}
)
{{ dict_type[1:-1] }}_option = [dict(label=item.get('dictLabel'), value=item.get('dictValue')) for item in {{ dict_type[1:-1] }}_list]
{{ dict_type[1:-1] }}_option_dict = {item.get('value'): item for item in {{ dict_type[1:-1] }}_option}
{% endfor %}
for item in {{ businessName }}_list:
{% for column in columns %}
{% if column.dict_type %}
if str(item.get('{{ column.python_field }}')) in {{ column.dict_type }}_option_dict.keys():
item['{{ column.python_field }}'] = {{ column.dict_type }}_option_dict.get(str(item.get('{{ column.python_field }}'))).get('label')
{% endif %}
{% endfor %}
{% endif %}
binary_data = ExcelUtil.export_list2excel({{ businessName }}_list, mapping_dict)
return binary_data