Files
RuoYi-Vue3-FastAPI/ruoyi-fastapi-backend/module_generator/templates/python/dao.py.jinja2
2025-02-17 17:43:29 +08:00

209 lines
8.5 KiB
Django/Jinja
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{% set pkField = pkColumn.python_field %}
{% set pk_field = pkColumn.python_field | camel_to_snake %}
{% set pkParentheseIndex = pkColumn.column_comment.find("") %}
{% set pk_field_comment = pkColumn.column_comment[:pkParentheseIndex] if pkParentheseIndex != -1 else pkColumn.column_comment %}
{% for column in columns %}
{% if column.python_field == "createTime" %}
from datetime import datetime, time
{% endif %}
{% endfor %}
from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession
{% if table.sub %}
from sqlalchemy.orm import selectinload
{% endif %}
from {{ packageName }}.entity.do.{{ businessName }}_do import {{ ClassName }}
from {{ packageName }}.entity.vo.{{ businessName }}_vo import {{ BusinessName }}Model, {{ BusinessName }}PageQueryModel
from utils.page_util import PageUtil
class {{ BusinessName }}Dao:
"""
{{ functionName }}模块数据库操作层
"""
@classmethod
async def get_{{ businessName }}_detail_by_id(cls, db: AsyncSession, {{ pk_field }}: int):
"""
根据{{ pk_field_comment }}获取{{ functionName }}详细信息
:param db: orm对象
:param {{ pk_field }}: {{ pk_field_comment }}
:return: {{ functionName }}信息对象
"""
{{ businessName }}_info = (
(
await db.execute(
{% if table.sub %}
select({{ ClassName }})
.options(selectinload({{ ClassName }}.{{ subclassName }}_list))
{% else %}
select({{ ClassName }})
{% endif %}
.where(
{{ ClassName }}.{{ pk_field }} == {{ pk_field }}
)
)
)
.scalars()
.first()
)
return {{ businessName }}_info
@classmethod
async def get_{{ businessName }}_detail_by_info(cls, db: AsyncSession, {{ businessName }}: {{ BusinessName }}Model):
"""
根据{{ functionName }}参数获取{{ functionName }}信息
:param db: orm对象
:param {{ businessName }}: {{ functionName }}参数对象
:return: {{ functionName }}信息对象
"""
{{ businessName }}_info = (
(
await db.execute(
select({{ ClassName }}).where(
{% for column in columns %}
{% if column.required %}
{{ ClassName }}.{{ column.python_field | camel_to_snake }} == {{ businessName }}.{{ column.python_field | camel_to_snake }},
{% endif %}
{% endfor %}
)
)
)
.scalars()
.first()
)
return {{ businessName }}_info
@classmethod
async def get_{{ businessName }}_list(cls, db: AsyncSession, query_object: {{ BusinessName }}PageQueryModel, is_page: bool = False):
"""
根据查询参数获取{{ functionName }}列表信息
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: {{ functionName }}列表信息对象
"""
query = (
{% if table.sub %}
select({{ ClassName }})
.options(selectinload({{ ClassName }}.{{ subclassName }}_list))
{% else %}
select({{ ClassName }})
{% endif %}
.where(
{% for column in columns %}
{% set field = column.python_field | camel_to_snake %}
{% if column.query %}
{% if column.query_type == "EQ" %}
{{ ClassName }}.{{ field }} == query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "NE" %}
{{ ClassName }}.{{ field }} != query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "GT" %}
{{ ClassName }}.{{ field }} > query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "GTE" %}
{{ ClassName }}.{{ field }} >= query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "LT" %}
{{ ClassName }}.{{ field }} < query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "LTE" %}
{{ ClassName }}.{{ field }} <= query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "LIKE" %}
{{ ClassName }}.{{ field }}.like(f'%{% raw %}{{% endraw %}query_object.{{ field }}{% raw %}}{% endraw %}%') if query_object.{{ field }} else True,
{% elif column.query_type == "BETWEEN" %}
{{ ClassName }}.{{ field }}.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(00, 00, 00)),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
)
if query_object.begin_time and query_object.end_time
else True,
{% endif %}
{% endif %}
{% endfor %}
)
.order_by({{ ClassName }}.{{ pk_field }})
.distinct()
)
{{ businessName }}_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
return {{ businessName }}_list
@classmethod
async def add_{{ businessName }}_dao(cls, db: AsyncSession, {{ businessName }}: {{ BusinessName }}Model):
"""
新增{{ functionName }}数据库操作
:param db: orm对象
:param {{ businessName }}: {{ functionName }}对象
:return:
"""
db_{{ businessName }} = {{ ClassName }}(**{{ businessName }}.model_dump({%if table.sub %}exclude={'{{ subclassName }}_list'}{% endif %}))
db.add(db_{{ businessName }})
await db.flush()
return db_{{ businessName }}
@classmethod
async def edit_{{ businessName }}_dao(cls, db: AsyncSession, {{ businessName }}: dict):
"""
编辑{{ functionName }}数据库操作
:param db: orm对象
:param {{ businessName }}: 需要更新的{{ functionName }}字典
:return:
"""
await db.execute(update({{ ClassName }}), [{{ businessName }}])
@classmethod
async def delete_{{ businessName }}_dao(cls, db: AsyncSession, {{ businessName }}: {{ BusinessName }}Model):
"""
删除{{ functionName }}数据库操作
:param db: orm对象
:param {{ businessName }}: {{ functionName }}对象
:return:
"""
await db.execute(delete({{ ClassName }}).where({{ ClassName }}.{{ pk_field }}.in_([{{ businessName }}.{{ pk_field }}])))
{% if table.sub %}
@classmethod
async def add_{{ subTable.business_name }}_dao(cls, db: AsyncSession, {{ subTable.business_name }}: {{ subTable.business_name | capitalize }}Model):
"""
新增{{ subTable.function_name }}数据库操作
:param db: orm对象
:param {{ subTable.business_name }}: {{ subTable.function_name }}对象
:return:
"""
db_{{ subTable.business_name }} = {{ subClassName }}(**{{ subTable.business_name }}.model_dump())
db.add(db_{{ subTable.business_name }})
await db.flush()
return db_{{ subTable.business_name }}
@classmethod
async def edit_{{ subTable.business_name }}_dao(cls, db: AsyncSession, {{ subTable.business_name }}: dict):
"""
编辑{{ subTable.function_name }}数据库操作
:param db: orm对象
:param {{ subTable.business_name }}: 需要更新的{{ subTable.function_name }}字典
:return:
"""
await db.execute(update({{ subClassName }}), [{{ subTable.business_name }}])
@classmethod
async def delete_{{ subTable.business_name }}_dao(cls, db: AsyncSession, {{ subTable.business_name }}: {{ subTable.business_name | capitalize }}Model):
"""
删除{{ subTable.function_name }}数据库操作
:param db: orm对象
:param {{ subTable.business_name }}: {{ subTable.function_name }}对象
:return:
"""
await db.execute(delete({{ subClassName }}).where({{ subClassName }}.{{ subTable.pk_column.python_field | camel_to_snake }}.in_([{{ subTable.business_name }}.{{ subTable.pk_column.python_field | camel_to_snake }}])))
{% endif %}