feat: 新增代码生成功能

This commit is contained in:
insistence
2025-02-14 17:42:36 +08:00
parent 7cca5c88f3
commit 1d36c0c56e
30 changed files with 4400 additions and 1032 deletions

View File

@@ -150,3 +150,208 @@ class MenuConstant:
LAYOUT = 'Layout'
PARENT_VIEW = 'ParentView'
INNER_LINK = 'InnerLink'
class GenConstant:
"""
代码生成常量
"""
"""单表(增删改查)"""
TPL_CRUD = 'crud'
"""树表(增删改查)"""
TPL_TREE = 'tree'
"""主子表(增删改查)"""
TPL_SUB = 'sub'
"""树编码字段"""
TREE_CODE = 'treeCode'
"""树父编码字段"""
TREE_PARENT_CODE = 'treeParentCode'
"""树名称字段"""
TREE_NAME = 'treeName'
"""上级菜单ID字段"""
PARENT_MENU_ID = 'parentMenuId'
"""上级菜单名称字段"""
PARENT_MENU_NAME = 'parentMenuName'
"""数据库字符串类型"""
COLUMNTYPE_STR = ['char', 'varchar', 'nvarchar', 'varchar2']
"""数据库文本类型"""
COLUMNTYPE_TEXT = ['tinytext', 'text', 'mediumtext', 'longtext']
"""数据库时间类型"""
COLUMNTYPE_TIME = ['datetime', 'time', 'date', 'timestamp']
"""数据库数字类型"""
COLUMNTYPE_NUMBER = [
'tinyint',
'smallint',
'mediumint',
'int',
'number',
'integer',
'bit',
'bigint',
'float',
'double',
'decimal',
]
"""页面不需要编辑字段"""
COLUMNNAME_NOT_EDIT = ['id', 'create_by', 'create_time', 'del_flag']
"""页面不需要显示的列表字段"""
COLUMNNAME_NOT_LIST = ['id', 'create_by', 'create_time', 'del_flag', 'update_by', 'update_time']
"""页面不需要查询字段"""
COLUMNNAME_NOT_QUERY = ['id', 'create_by', 'create_time', 'del_flag', 'update_by', 'update_time', 'remark']
"""Entity基类字段"""
BASE_ENTITY = ['createBy', 'createTime', 'updateBy', 'updateTime', 'remark']
"""Tree基类字段"""
TREE_ENTITY = ['parentName', 'parentId', 'orderNum', 'ancestors', 'children']
"""文本框"""
HTML_INPUT = 'input'
"""文本域"""
HTML_TEXTAREA = 'textarea'
"""下拉框"""
HTML_SELECT = 'select'
"""单选框"""
HTML_RADIO = 'radio'
"""复选框"""
HTML_CHECKBOX = 'checkbox'
"""日期控件"""
HTML_DATETIME = 'datetime'
"""图片上传控件"""
HTML_IMAGE_UPLOAD = 'imageUpload'
"""文件上传控件"""
HTML_FILE_UPLOAD = 'fileUpload'
"""富文本控件"""
HTML_EDITOR = 'editor'
"""高精度计算类型"""
TYPE_DECIMAL = 'Decimal'
"""时间类型"""
TYPE_DATE = ['date', 'time', 'datetime']
"""模糊查询"""
QUERY_LIKE = 'LIKE'
"""相等查询"""
QUERY_EQ = 'EQ'
"""需要"""
REQUIRE = '1'
MYSQL_TO_SQLALCHEMY_TYPE_MAPPING = {
# 数值类型
'TINYINT': 'SmallInteger',
'SMALLINT': 'SmallInteger',
'MEDIUMINT': 'Integer',
'INT': 'Integer',
'INTEGER': 'Integer',
'BIGINT': 'BigInteger',
'FLOAT': 'Float',
'DOUBLE': 'Float',
'DECIMAL': 'DECIMAL',
'BIT': 'Integer',
# 日期和时间类型
'DATE': 'Date',
'TIME': 'Time',
'DATETIME': 'DateTime',
'TIMESTAMP': 'TIMESTAMP',
'YEAR': 'Integer',
# 字符串类型
'CHAR': 'CHAR',
'VARCHAR': 'String',
'TINYTEXT': 'Text',
'TEXT': 'Text',
'MEDIUMTEXT': 'Text',
'LONGTEXT': 'Text',
'BINARY': 'BINARY',
'VARBINARY': 'VARBINARY',
'TINYBLOB': 'LargeBinary',
'BLOB': 'LargeBinary',
'MEDIUMBLOB': 'LargeBinary',
'LONGBLOB': 'LargeBinary',
# 枚举和集合类型
'ENUM': 'Enum',
'SET': 'String',
# JSON 类型
'JSON': 'JSON',
# 空间数据类型(需要扩展支持,如 GeoAlchemy2
'GEOMETRY': 'geoalchemy2.Geometry', # 需要安装 geoalchemy2
'POINT': 'geoalchemy2.Geometry',
'LINESTRING': 'geoalchemy2.Geometry',
'POLYGON': 'geoalchemy2.Geometry',
'MULTIPOINT': 'geoalchemy2.Geometry',
'MULTILINESTRING': 'geoalchemy2.Geometry',
'MULTIPOLYGON': 'geoalchemy2.Geometry',
'GEOMETRYCOLLECTION': 'geoalchemy2.Geometry',
}
MYSQL_TO_PYTHON_TYPE_MAPPING = {
# 数值类型
'TINYINT': 'int',
'SMALLINT': 'int',
'MEDIUMINT': 'int',
'INT': 'int',
'INTEGER': 'int',
'BIGINT': 'int',
'FLOAT': 'float',
'DOUBLE': 'float',
'DECIMAL': 'Decimal',
'BIT': 'int',
# 日期和时间类型
'DATE': 'date',
'TIME': 'time',
'DATETIME': 'datetime',
'TIMESTAMP': 'datetime',
'YEAR': 'int',
# 字符串类型
'CHAR': 'str',
'VARCHAR': 'str',
'TINYTEXT': 'str',
'TEXT': 'str',
'MEDIUMTEXT': 'str',
'LONGTEXT': 'str',
'BINARY': 'bytes',
'VARBINARY': 'bytes',
'TINYBLOB': 'bytes',
'BLOB': 'bytes',
'MEDIUMBLOB': 'bytes',
'LONGBLOB': 'bytes',
# 枚举和集合类型
'ENUM': 'str',
'SET': 'str',
# JSON 类型
'JSON': 'dict',
# 空间数据类型(通常需要特殊处理)
'GEOMETRY': 'bytes',
'POINT': 'bytes',
'LINESTRING': 'bytes',
'POLYGON': 'bytes',
'MULTIPOINT': 'bytes',
'MULTILINESTRING': 'bytes',
'MULTIPOLYGON': 'bytes',
'GEOMETRYCOLLECTION': 'bytes',
}

View File

@@ -64,6 +64,24 @@ class RedisSettings(BaseSettings):
redis_database: int = 2
class GenSettings:
"""
代码生成配置
"""
author = 'insistence'
package_name = 'module_admin.system'
auto_remove_pre = False
table_prefix = 'sys_'
allow_overwrite = True
GEN_PATH = 'vf_admin/gen_path'
def __init__(self):
if not os.path.exists(self.GEN_PATH):
os.makedirs(self.GEN_PATH)
class UploadSettings:
"""
上传配置
@@ -159,6 +177,14 @@ class GetConfig:
# 实例化Redis配置模型
return RedisSettings()
@lru_cache()
def get_gen_config(self):
"""
获取代码生成配置
"""
# 实例化代码生成配置
return GenSettings()
@lru_cache()
def get_upload_config(self):
"""
@@ -204,5 +230,7 @@ JwtConfig = get_config.get_jwt_config()
DataBaseConfig = get_config.get_database_config()
# Redis配置
RedisConfig = get_config.get_redis_config()
# 代码生成配置
GenConfig = get_config.get_gen_config()
# 上传配置
UploadConfig = get_config.get_upload_config()

View File

@@ -0,0 +1,158 @@
from datetime import datetime
from fastapi import APIRouter, Depends, Query, Request
from pydantic_validation_decorator import ValidateFields
from sqlalchemy.ext.asyncio import AsyncSession
from config.enums import BusinessType
from config.env import GenConfig
from config.get_db import get_db
from module_admin.annotation.log_annotation import Log
from module_admin.aspect.interface_auth import CheckRoleInterfaceAuth, CheckUserInterfaceAuth
from module_admin.service.login_service import LoginService
from module_admin.entity.vo.user_vo import CurrentUserModel
from module_generator.entity.vo.gen_vo import DeleteGenTableModel, EditGenTableModel, GenTablePageQueryModel
from module_generator.service.gen_service import GenTableColumnService, GenTableService
from utils.common_util import bytes2file_response
from utils.log_util import logger
from utils.page_util import PageResponseModel
from utils.response_util import ResponseUtil
genController = APIRouter(prefix='/tool/gen', dependencies=[Depends(LoginService.get_current_user)])
@genController.get(
'/list', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('tool:gen:list'))]
)
async def get_gen_table_list(
request: Request,
gen_page_query: GenTablePageQueryModel = Depends(GenTablePageQueryModel.as_query),
query_db: AsyncSession = Depends(get_db),
):
# 获取分页数据
gen_page_query_result = await GenTableService.get_gen_table_list_services(query_db, gen_page_query, is_page=True)
logger.info('获取成功')
return ResponseUtil.success(model_content=gen_page_query_result)
@genController.get(
'/db/list', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('tool:gen:list'))]
)
async def get_gen_db_table_list(
request: Request,
gen_page_query: GenTablePageQueryModel = Depends(GenTablePageQueryModel.as_query),
query_db: AsyncSession = Depends(get_db),
):
# 获取分页数据
gen_page_query_result = await GenTableService.get_gen_db_table_list_services(query_db, gen_page_query, is_page=True)
logger.info('获取成功')
return ResponseUtil.success(model_content=gen_page_query_result)
@genController.post('/importTable', dependencies=[Depends(CheckUserInterfaceAuth('tool:gen:import'))])
@Log(title='代码生成', business_type=BusinessType.IMPORT)
async def import_gen_table(
request: Request,
tables: str = Query(),
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
table_names = tables.split(',') if tables else []
add_gen_table_list = await GenTableService.get_gen_db_table_list_by_name_services(query_db, table_names)
add_gen_table_result = await GenTableService.import_gen_table_services(query_db, add_gen_table_list, current_user)
logger.info(add_gen_table_result.message)
return ResponseUtil.success(msg=add_gen_table_result.message)
@genController.put('', dependencies=[Depends(CheckUserInterfaceAuth('tool:gen:edit'))])
@ValidateFields(validate_model='edit_post')
@Log(title='代码生成', business_type=BusinessType.UPDATE)
async def edit_gen_table(
request: Request,
edit_gen_table: EditGenTableModel,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
edit_gen_table.update_by = current_user.user.user_name
edit_gen_table.update_time = datetime.now()
await GenTableService.validate_edit(edit_gen_table)
edit_gen_result = await GenTableService.edit_gen_table_services(query_db, edit_gen_table)
logger.info(edit_gen_result.message)
return ResponseUtil.success(msg=edit_gen_result.message)
@genController.delete('/{table_ids}', dependencies=[Depends(CheckUserInterfaceAuth('tool:gen:remove'))])
@Log(title='代码生成', business_type=BusinessType.DELETE)
async def delete_gen_table(request: Request, table_ids: str, query_db: AsyncSession = Depends(get_db)):
delete_gen_table = DeleteGenTableModel(tableIds=table_ids)
delete_gen_table_result = await GenTableService.delete_gen_table_services(query_db, delete_gen_table)
logger.info(delete_gen_table_result.message)
return ResponseUtil.success(msg=delete_gen_table_result.message)
@genController.post('/createTable', dependencies=[Depends(CheckRoleInterfaceAuth('admin'))])
@Log(title='创建表', business_type=BusinessType.OTHER)
async def create_table(
request: Request,
sql: str = Query(),
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
create_table_result = await GenTableService.create_table_services(query_db, sql, current_user)
logger.info(create_table_result.message)
return ResponseUtil.success(msg=create_table_result.message)
@genController.get('/batchGenCode', dependencies=[Depends(CheckUserInterfaceAuth('tool:gen:code'))])
@Log(title='代码生成', business_type=BusinessType.GENCODE)
async def batch_gen_code(request: Request, tables: str = Query(), query_db: AsyncSession = Depends(get_db)):
table_names = tables.split(',') if tables else []
batch_gen_code_result = await GenTableService.batch_gen_code_services(query_db, table_names)
logger.info('生成代码成功')
return ResponseUtil.streaming(data=bytes2file_response(batch_gen_code_result))
@genController.get('/genCode/{table_name}', dependencies=[Depends(CheckUserInterfaceAuth('tool:gen:code'))])
@Log(title='代码生成', business_type=BusinessType.GENCODE)
async def gen_code_local(request: Request, table_name: str, query_db: AsyncSession = Depends(get_db)):
if not GenConfig.allow_overwrite:
logger.error('【系统预设】不允许生成文件覆盖到本地')
return ResponseUtil.error('【系统预设】不允许生成文件覆盖到本地')
gen_code_local_result = await GenTableService.generate_code_services(query_db, table_name)
logger.info(gen_code_local_result.message)
return ResponseUtil.success(msg=gen_code_local_result.message)
@genController.get('/{table_id}', dependencies=[Depends(CheckUserInterfaceAuth('tool:gen:query'))])
async def query_detail_gen_table(request: Request, table_id: int, query_db: AsyncSession = Depends(get_db)):
gen_table = await GenTableService.get_gen_table_by_id_services(query_db, table_id)
gen_tables = await GenTableService.get_gen_table_all_services(query_db)
gen_columns = await GenTableColumnService.get_gen_table_column_list_by_table_id_services(query_db, table_id)
gen_table_detail_result = dict(info=gen_table, rows=gen_columns, tables=gen_tables)
logger.info(f'获取table_id为{table_id}的信息成功')
return ResponseUtil.success(data=gen_table_detail_result)
@genController.get('/preview/{table_id}', dependencies=[Depends(CheckUserInterfaceAuth('tool:gen:preview'))])
async def preview_code(request: Request, table_id: int, query_db: AsyncSession = Depends(get_db)):
preview_code_result = await GenTableService.preview_code_services(query_db, table_id)
logger.info('获取预览代码成功')
return ResponseUtil.success(data=preview_code_result)
@genController.get('/synchDb/{table_name}', dependencies=[Depends(CheckUserInterfaceAuth('tool:gen:edit'))])
@Log(title='代码生成', business_type=BusinessType.UPDATE)
async def sync_db(request: Request, table_name: str, query_db: AsyncSession = Depends(get_db)):
sync_db_result = await GenTableService.sync_db_services(query_db, table_name)
logger.info(sync_db_result.message)
return ResponseUtil.success(data=sync_db_result.message)

View File

@@ -0,0 +1,340 @@
from datetime import datetime, time
from sqlalchemy import delete, func, select, text, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from typing import List
from module_generator.entity.do.gen_do import GenTable, GenTableColumn
from module_generator.entity.vo.gen_vo import (
GenTableBaseModel,
GenTableColumnBaseModel,
GenTableColumnModel,
GenTableModel,
GenTablePageQueryModel,
)
from utils.page_util import PageUtil
class GenTableDao:
"""
代码生成表模块数据库操作层
"""
@classmethod
async def get_gen_table_by_id(cls, db: AsyncSession, table_id: int):
"""
根据表格id获取需要生成的表格信息
:param db: orm对象
:param table_id: 岗位id
:return: 需要生成的表格信息对象
"""
gen_table_info = (
(
await db.execute(
select(GenTable).options(selectinload(GenTable.columns)).where(GenTable.table_id == table_id)
)
)
.scalars()
.first()
)
return gen_table_info
@classmethod
async def get_gen_table_by_name(cls, db: AsyncSession, table_name: str):
"""
根据表格名称获取需要生成的表格信息
:param db: orm对象
:param table_name: 表格名称
:return: 需要生成的表格信息对象
"""
gen_table_info = (
(
await db.execute(
select(GenTable).options(selectinload(GenTable.columns)).where(GenTable.table_name == table_name)
)
)
.scalars()
.first()
)
return gen_table_info
@classmethod
async def get_gen_table_all(cls, db: AsyncSession):
"""
根据表格id获取需要生成的表格详细信息
:param db: orm对象
:return: 需要生成的表格信息对象
"""
gen_table_all = (await db.execute(select(GenTable).options(selectinload(GenTable.columns)))).scalars().all()
return gen_table_all
@classmethod
async def create_table_by_sql_dao(cls, db: AsyncSession, sql: str):
"""
根据sql语句创建表结构
:param db: orm对象
:param sql: sql语句
:return:
"""
await db.execute(text(sql))
@classmethod
async def get_gen_table_list(cls, db: AsyncSession, query_object: GenTablePageQueryModel, is_page: bool = False):
"""
根据查询参数获取代码生成列表信息
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 代码生成列表信息对象
"""
query = (
select(GenTable)
.options(selectinload(GenTable.columns))
.where(
func.lower(GenTable.table_name).like(f'%{query_object.table_name.lower()}%')
if query_object.table_name
else True,
func.lower(GenTable.table_comment).like(f'%{query_object.table_comment.lower()}%')
if query_object.table_comment
else True,
GenTable.create_time.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(00, 00, 00)),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
)
if query_object.begin_time and query_object.end_time
else True,
)
.distinct()
)
gen_table_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
return gen_table_list
@classmethod
async def get_gen_db_table_list(cls, db: AsyncSession, query_object: GenTablePageQueryModel, is_page: bool = False):
"""
根据查询参数获取数据库列表信息
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 数据库列表信息对象
"""
query_sql = """
table_name as table_name,
table_comment as table_comment,
create_time as create_time,
update_time as update_time
from
information_schema.tables
where
table_schema = (select database())
and table_name not like 'apscheduler\_%'
and table_name not like 'gen\_%'
and table_name not in (select table_name from gen_table)
"""
if query_object.table_name:
query_sql += """and lower(table_name) like lower(concat('%', :table_name, '%'))"""
if query_object.table_comment:
query_sql += """and lower(table_comment) like lower(concat('%', :table_comment, '%'))"""
if query_object.begin_time:
query_sql += """and date_format(create_time, '%Y%m%d') >= date_format(:begin_time, '%Y%m%d')"""
if query_object.end_time:
query_sql += """and date_format(create_time, '%Y%m%d') >= date_format(:end_time, '%Y%m%d')"""
query_sql += """order by create_time desc"""
query = select(
text(query_sql).bindparams(
**{
k: v
for k, v in query_object.model_dump(exclude_none=True, exclude={'page_num', 'page_size'}).items()
}
)
)
gen_db_table_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
return gen_db_table_list
@classmethod
async def get_gen_db_table_list_by_names(cls, db: AsyncSession, table_names: List[str]):
"""
根据查询参数获取数据库列表信息
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 数据库列表信息对象
"""
query_sql = """
select
table_name as table_name,
table_comment as table_comment,
create_time as create_time,
update_time as update_time
from
information_schema.tables
where
table_name not like 'qrtz\_%'
and table_name not like 'gen\_%'
and table_schema = (select database())
and table_name in :table_names
"""
query = text(query_sql).bindparams(table_names=tuple(table_names))
gen_db_table_list = (await db.execute(query)).fetchall()
return gen_db_table_list
@classmethod
async def add_gen_table_dao(cls, db: AsyncSession, gen_table: GenTableModel):
"""
新增岗位数据库操作
:param db: orm对象
:param post: 岗位对象
:return:
"""
db_gen_table = GenTable(**GenTableBaseModel(**gen_table.model_dump(by_alias=True)).model_dump())
db.add(db_gen_table)
await db.flush()
return db_gen_table
@classmethod
async def edit_gen_table_dao(cls, db: AsyncSession, gen_table: dict):
"""
编辑岗位数据库操作
:param db: orm对象
:param post: 需要更新的岗位字典
:return:
"""
await db.execute(update(GenTable), [GenTableBaseModel(**gen_table).model_dump()])
@classmethod
async def delete_gen_table_dao(cls, db: AsyncSession, gen_table: GenTableModel):
"""
删除岗位数据库操作
:param db: orm对象
:param post: 岗位对象
:return:
"""
await db.execute(delete(GenTable).where(GenTable.table_id.in_([gen_table.table_id])))
class GenTableColumnDao:
"""
代码生成列模块数据库操作层
"""
@classmethod
async def get_gen_table_column_list_by_table_id(cls, db: AsyncSession, table_id: int):
"""
根据表格id获取需要生成的列列表信息
:param db: orm对象
:param table_id: 表格id
:return: 需要生成的列列表信息对象
"""
gen_table_column_list = (
(await db.execute(select(GenTableColumn).where(GenTableColumn.table_id == table_id))).scalars().all()
)
return gen_table_column_list
@classmethod
async def get_gen_db_table_columns_by_name(cls, db: AsyncSession, table_name: str):
"""
根据查询参数获取数据库列表信息
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 数据库列表信息对象
"""
query_sql = """
select
column_name as column_name,
case
when is_nullable = 'no' and column_key != 'PRI' then '1'
else '0'
end as is_required,
case
when column_key = 'PRI' then '1'
else '0'
end as is_pk,
ordinal_position as sort,
column_comment as column_comment,
case
when extra = 'auto_increment' then '1'
else '0'
end as is_increment,
column_type as column_type
from
information_schema.columns
where
table_schema = (select database())
and table_name = :table_name
order by
ordinal_position
"""
query = text(query_sql).bindparams(table_name=table_name)
gen_db_table_columns = (await db.execute(query)).fetchall()
return gen_db_table_columns
@classmethod
async def add_gen_table_column_dao(cls, db: AsyncSession, gen_table_column: GenTableColumnModel):
"""
新增岗位数据库操作
:param db: orm对象
:param post: 岗位对象
:return:
"""
db_gen_table_column = GenTableColumn(
**GenTableColumnBaseModel(**gen_table_column.model_dump(by_alias=True)).model_dump()
)
db.add(db_gen_table_column)
await db.flush()
return db_gen_table_column
@classmethod
async def edit_gen_table_column_dao(cls, db: AsyncSession, gen_table_column: dict):
"""
编辑岗位数据库操作
:param db: orm对象
:param post: 需要更新的岗位字典
:return:
"""
await db.execute(update(GenTableColumn), [GenTableColumnBaseModel(**gen_table_column).model_dump()])
@classmethod
async def delete_gen_table_column_by_table_id_dao(cls, db: AsyncSession, gen_table_column: GenTableColumnModel):
"""
删除岗位数据库操作
:param db: orm对象
:param post: 岗位对象
:return:
"""
await db.execute(delete(GenTableColumn).where(GenTableColumn.table_id.in_([gen_table_column.table_id])))
@classmethod
async def delete_gen_table_column_by_column_id_dao(cls, db: AsyncSession, gen_table_column: GenTableColumnModel):
"""
删除岗位数据库操作
:param db: orm对象
:param post: 岗位对象
:return:
"""
await db.execute(delete(GenTableColumn).where(GenTableColumn.column_id.in_([gen_table_column.column_id])))

View File

@@ -0,0 +1,73 @@
from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from config.database import Base
class GenTable(Base):
"""
代码生成业务表
"""
__tablename__ = 'gen_table'
table_id = Column(Integer, primary_key=True, autoincrement=True, comment='编号')
table_name = Column(String(200), nullable=True, default='', comment='表名称')
table_comment = Column(String(500), nullable=True, default='', comment='表描述')
sub_table_name = Column(String(64), nullable=True, comment='关联子表的表名')
sub_table_fk_name = Column(String(64), nullable=True, comment='子表关联的外键名')
class_name = Column(String(100), nullable=True, default='', comment='实体类名称')
tpl_category = Column(String(200), nullable=True, default='crud', comment='使用的模板crud单表操作 tree树表操作')
tpl_web_type = Column(
String(30), nullable=True, default='', comment='前端模板类型element-ui模版 element-plus模版'
)
package_name = Column(String(100), nullable=True, comment='生成包路径')
module_name = Column(String(30), nullable=True, comment='生成模块名')
business_name = Column(String(30), nullable=True, comment='生成业务名')
function_name = Column(String(100), nullable=True, comment='生成功能名')
function_author = Column(String(100), nullable=True, comment='生成功能作者')
gen_type = Column(String(1), nullable=True, default='0', comment='生成代码方式0zip压缩包 1自定义路径')
gen_path = Column(String(200), nullable=True, default='/', comment='生成路径(不填默认项目路径)')
options = Column(String(1000), nullable=True, comment='其它生成选项')
create_by = Column(String(64), default='', comment='创建者')
create_time = Column(DateTime, nullable=True, default=datetime.now(), comment='创建时间')
update_by = Column(String(64), default='', comment='更新者')
update_time = Column(DateTime, nullable=True, default=datetime.now(), comment='更新时间')
remark = Column(String(500), nullable=True, default=None, comment='备注')
columns = relationship('GenTableColumn', order_by='GenTableColumn.sort', back_populates='tables')
class GenTableColumn(Base):
"""
代码生成业务表字段
"""
__tablename__ = 'gen_table_column'
column_id = Column(Integer, primary_key=True, autoincrement=True, comment='编号')
table_id = Column(Integer, ForeignKey('gen_table.table_id'), nullable=True, comment='归属表编号')
column_name = Column(String(200), nullable=True, comment='列名称')
column_comment = Column(String(500), nullable=True, comment='列描述')
column_type = Column(String(100), nullable=True, comment='列类型')
python_type = Column(String(500), nullable=True, comment='PYTHON类型')
python_field = Column(String(200), nullable=True, comment='PYTHON字段名')
is_pk = Column(String(1), nullable=True, comment='是否主键1是')
is_increment = Column(String(1), nullable=True, comment='是否自增1是')
is_required = Column(String(1), nullable=True, comment='是否必填1是')
is_insert = Column(String(1), nullable=True, comment='是否为插入字段1是')
is_edit = Column(String(1), nullable=True, comment='是否编辑字段1是')
is_list = Column(String(1), nullable=True, comment='是否列表字段1是')
is_query = Column(String(1), nullable=True, comment='是否查询字段1是')
query_type = Column(String(200), nullable=True, default='EQ', comment='查询方式(等于、不等于、大于、小于、范围)')
html_type = Column(
String(200), nullable=True, comment='显示类型(文本框、文本域、下拉框、复选框、单选框、日期控件)'
)
dict_type = Column(String(200), nullable=True, default='', comment='字典类型')
sort = Column(Integer, nullable=True, comment='排序')
create_by = Column(String(64), default='', comment='创建者')
create_time = Column(DateTime, nullable=True, default=datetime.now(), comment='创建时间')
update_by = Column(String(64), default='', comment='更新者')
update_time = Column(DateTime, nullable=True, default=datetime.now(), comment='更新时间')
tables = relationship('GenTable', back_populates='columns')

View File

@@ -0,0 +1,266 @@
from datetime import datetime
from pydantic import BaseModel, ConfigDict, Field, model_validator
from pydantic.alias_generators import to_camel
from pydantic_validation_decorator import NotBlank
from typing import List, Literal, Optional
from config.constant import GenConstant
from module_admin.annotation.pydantic_annotation import as_query
class GenTableBaseModel(BaseModel):
"""
代码生成业务表对应pydantic模型
"""
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True)
table_id: Optional[int] = Field(default=None, description='编号')
table_name: Optional[str] = Field(default=None, description='表名称')
table_comment: Optional[str] = Field(default=None, description='表描述')
sub_table_name: Optional[str] = Field(default=None, description='关联子表的表名')
sub_table_fk_name: Optional[str] = Field(default=None, description='子表关联的外键名')
class_name: Optional[str] = Field(default=None, description='实体类名称')
tpl_category: Optional[str] = Field(default=None, description='使用的模板crud单表操作 tree树表操作')
tpl_web_type: Optional[str] = Field(default=None, description='前端模板类型element-ui模版 element-plus模版')
package_name: Optional[str] = Field(default=None, description='生成包路径')
module_name: Optional[str] = Field(default=None, description='生成模块名')
business_name: Optional[str] = Field(default=None, description='生成业务名')
function_name: Optional[str] = Field(default=None, description='生成功能名')
function_author: Optional[str] = Field(default=None, description='生成功能作者')
gen_type: Optional[Literal['0', '1']] = Field(default=None, description='生成代码方式0zip压缩包 1自定义路径')
gen_path: Optional[str] = Field(default=None, description='生成路径(不填默认项目路径)')
options: Optional[str] = Field(default=None, description='其它生成选项')
create_by: Optional[str] = Field(default=None, description='创建者')
create_time: Optional[datetime] = Field(default=None, description='创建时间')
update_by: Optional[str] = Field(default=None, description='更新者')
update_time: Optional[datetime] = Field(default=None, description='更新时间')
remark: Optional[str] = Field(default=None, description='备注')
@NotBlank(field_name='table_name', message='表名称不能为空')
def get_table_name(self):
return self.table_name
@NotBlank(field_name='table_comment', message='表描述不能为空')
def get_table_comment(self):
return self.table_comment
@NotBlank(field_name='class_name', message='实体类名称不能为空')
def get_class_name(self):
return self.class_name
@NotBlank(field_name='package_name', message='生成包路径不能为空')
def get_package_name(self):
return self.package_name
@NotBlank(field_name='module_name', message='生成模块名不能为空')
def get_module_name(self):
return self.module_name
@NotBlank(field_name='business_name', message='生成业务名不能为空')
def get_business_name(self):
return self.business_name
@NotBlank(field_name='function_name', message='生成功能名不能为空')
def get_function_name(self):
return self.function_name
@NotBlank(field_name='function_author', message='生成功能作者不能为空')
def get_function_author(self):
return self.function_author
def validate_fields(self):
self.get_table_name()
self.get_table_comment()
self.get_class_name()
self.get_package_name()
self.get_module_name()
self.get_business_name()
self.get_function_name()
self.get_function_author()
class GenTableModel(GenTableBaseModel):
"""
代码生成业务表模型
"""
pk_column: Optional['GenTableColumnModel'] = Field(default=None, description='主键信息')
sub_table: Optional['GenTableModel'] = Field(default=None, description='子表信息')
columns: Optional[List['GenTableColumnModel']] = Field(default=None, description='表列信息')
tree_code: Optional[str] = Field(default=None, description='树编码字段')
tree_parent_code: Optional[str] = Field(default=None, description='树父编码字段')
tree_name: Optional[str] = Field(default=None, description='树名称字段')
parent_menu_id: Optional[int] = Field(default=None, description='上级菜单ID字段')
parent_menu_name: Optional[str] = Field(default=None, description='上级菜单名称字段')
sub: Optional[bool] = Field(default=None, description='是否为子表')
tree: Optional[bool] = Field(default=None, description='是否为树表')
crud: Optional[bool] = Field(default=None, description='是否为单表')
@model_validator(mode='after')
def check_some_is(self) -> 'GenTableModel':
self.sub = True if self.tpl_category and self.tpl_category == GenConstant.TPL_SUB else False
self.tree = True if self.tpl_category and self.tpl_category == GenConstant.TPL_TREE else False
self.crud = True if self.tpl_category and self.tpl_category == GenConstant.TPL_CRUD else False
return self
class EditGenTableModel(GenTableModel):
"""
修改代码生成业务表模型
"""
params: Optional['GenTableParamsModel'] = Field(default=None, description='业务表参数')
class GenTableParamsModel(BaseModel):
"""
代码生成业务表参数模型
"""
model_config = ConfigDict(alias_generator=to_camel)
tree_code: Optional[str] = Field(default=None, description='树编码字段')
tree_parent_code: Optional[str] = Field(default=None, description='树父编码字段')
tree_name: Optional[str] = Field(default=None, description='树名称字段')
parent_menu_id: Optional[int] = Field(default=None, description='上级菜单ID字段')
class GenTableQueryModel(GenTableBaseModel):
"""
代码生成业务表不分页查询模型
"""
begin_time: Optional[str] = Field(default=None, description='开始时间')
end_time: Optional[str] = Field(default=None, description='结束时间')
@as_query
class GenTablePageQueryModel(GenTableQueryModel):
"""
代码生成业务表分页查询模型
"""
page_num: int = Field(default=1, description='当前页码')
page_size: int = Field(default=10, description='每页记录数')
class DeleteGenTableModel(BaseModel):
"""
删除代码生成业务表模型
"""
model_config = ConfigDict(alias_generator=to_camel)
table_ids: str = Field(description='需要删除的代码生成业务表ID')
class GenTableColumnBaseModel(BaseModel):
"""
代码生成业务表字段对应pydantic模型
"""
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True)
column_id: Optional[int] = Field(default=None, description='编号')
table_id: Optional[int] = Field(default=None, description='归属表编号')
column_name: Optional[str] = Field(default=None, description='列名称')
column_comment: Optional[str] = Field(default=None, description='列描述')
column_type: Optional[str] = Field(default=None, description='列类型')
python_type: Optional[str] = Field(default=None, description='PYTHON类型')
python_field: Optional[str] = Field(default=None, description='PYTHON字段名')
is_pk: Optional[str] = Field(default=None, description='是否主键1是')
is_increment: Optional[str] = Field(default=None, description='是否自增1是')
is_required: Optional[str] = Field(default=None, description='是否必填1是')
is_insert: Optional[str] = Field(default=None, description='是否为插入字段1是')
is_edit: Optional[str] = Field(default=None, description='是否编辑字段1是')
is_list: Optional[str] = Field(default=None, description='是否列表字段1是')
is_query: Optional[str] = Field(default=None, description='是否查询字段1是')
query_type: Optional[str] = Field(default=None, description='查询方式(等于、不等于、大于、小于、范围)')
html_type: Optional[str] = Field(
default=None, description='显示类型(文本框、文本域、下拉框、复选框、单选框、日期控件)'
)
dict_type: Optional[str] = Field(default=None, description='字典类型')
sort: Optional[int] = Field(default=None, description='排序')
create_by: Optional[str] = Field(default=None, description='创建者')
create_time: Optional[datetime] = Field(default=None, description='创建时间')
@NotBlank(field_name='python_field', message='Python属性不能为空')
def get_python_field(self):
return self.python_field
def validate_fields(self):
self.get_python_field()
class GenTableColumnModel(GenTableColumnBaseModel):
"""
代码生成业务表字段模型
"""
cap_python_field: Optional[str] = Field(default=None, description='字段大写形式')
pk: Optional[bool] = Field(default=None, description='是否为子表')
increment: Optional[bool] = Field(default=None, description='是否为树表')
required: Optional[bool] = Field(default=None, description='是否为必填字段')
insert: Optional[bool] = Field(default=None, description='是否为必填字段')
edit: Optional[bool] = Field(default=None, description='是否为必填字段')
list: Optional[bool] = Field(default=None, description='是否为必填字段')
query: Optional[bool] = Field(default=None, description='是否为必填字段')
super_column: Optional[bool] = Field(default=None, description='是否为基类字段')
usable_column: Optional[bool] = Field(default=None, description='是否为基类字段')
@model_validator(mode='after')
def check_some_is(self) -> 'GenTableModel':
self.cap_python_field = self.python_field[0].upper() + self.python_field[1:] if self.python_field else None
self.pk = True if self.is_pk and self.is_pk == '1' else False
self.increment = True if self.is_increment and self.is_increment == '1' else False
self.required = True if self.is_required and self.is_required == '1' else False
self.insert = True if self.is_insert and self.is_insert == '1' else False
self.edit = True if self.is_edit and self.is_edit == '1' else False
self.list = True if self.is_list and self.is_list == '1' else False
self.query = True if self.is_query and self.is_query == '1' else False
self.super_column = (
True
if any(
self.python_field and self.python_field.lower() == field.lower()
for field in GenConstant.TREE_ENTITY + GenConstant.BASE_ENTITY
)
else False
)
self.usable_column = (
True
if any(
self.python_field and self.python_field.lower() == field.lower()
for field in ['parentId', 'orderNum', 'remark']
)
else False
)
return self
class GenTableColumnQueryModel(GenTableColumnBaseModel):
"""
代码生成业务表字段不分页查询模型
"""
begin_time: Optional[str] = Field(default=None, description='开始时间')
end_time: Optional[str] = Field(default=None, description='结束时间')
@as_query
class GenTableColumnPageQueryModel(GenTableColumnQueryModel):
"""
代码生成业务表字段分页查询模型
"""
page_num: int = Field(default=1, description='当前页码')
page_size: int = Field(default=10, description='每页记录数')
class DeleteGenTableColumnModel(BaseModel):
"""
删除代码生成业务表字段模型
"""
model_config = ConfigDict(alias_generator=to_camel)
column_ids: str = Field(description='需要删除的代码生成业务表字段ID')

View File

@@ -0,0 +1,491 @@
import io
import json
import os
import re
import zipfile
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from config.constant import GenConstant
from config.env import GenConfig
from exceptions.exception import ServiceException
from module_admin.entity.vo.common_vo import CrudResponseModel
from module_admin.entity.vo.user_vo import CurrentUserModel
from module_generator.entity.vo.gen_vo import (
DeleteGenTableModel,
EditGenTableModel,
GenTableColumnModel,
GenTableModel,
GenTablePageQueryModel,
)
from module_generator.dao.gen_dao import GenTableColumnDao, GenTableDao
from utils.common_util import CamelCaseUtil
from utils.excel_util import ExcelUtil
from utils.gen_util import GenUtils
from utils.template_util import TemplateInitializer, TemplateUtils
class GenTableService:
"""
岗位管理模块服务层
"""
@classmethod
async def get_gen_table_list_services(
cls, query_db: AsyncSession, query_object: GenTablePageQueryModel, is_page: bool = False
):
"""
获取代码生成列表信息service
:param query_db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 代码生成列表信息对象
"""
gen_table_list_result = await GenTableDao.get_gen_table_list(query_db, query_object, is_page)
return gen_table_list_result
@classmethod
async def get_gen_db_table_list_services(
cls, query_db: AsyncSession, query_object: GenTablePageQueryModel, is_page: bool = False
):
"""
获取数据库列表信息service
:param query_db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 数据库列表信息对象
"""
gen_db_table_list_result = await GenTableDao.get_gen_db_table_list(query_db, query_object, is_page)
return gen_db_table_list_result
@classmethod
async def get_gen_db_table_list_by_name_services(cls, query_db: AsyncSession, table_names: List[str]):
"""
获取数据库列表信息service
:param query_db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 数据库列表信息对象
"""
gen_db_table_list_result = await GenTableDao.get_gen_db_table_list_by_names(query_db, table_names)
return [GenTableModel(**gen_table) for gen_table in CamelCaseUtil.transform_result(gen_db_table_list_result)]
@classmethod
async def import_gen_table_services(
cls, query_db: AsyncSession, gen_table_list: List[GenTableModel], current_user: CurrentUserModel
):
try:
for table in gen_table_list:
table_name = table.table_name
GenUtils.init_table(table, current_user.user.user_name)
add_gen_table = await GenTableDao.add_gen_table_dao(query_db, table)
if add_gen_table:
table.table_id = add_gen_table.table_id
gen_table_columns = await GenTableColumnDao.get_gen_db_table_columns_by_name(query_db, table_name)
for column in [
GenTableColumnModel(**gen_table_column)
for gen_table_column in CamelCaseUtil.transform_result(gen_table_columns)
]:
GenUtils.init_column_field(column, table)
await GenTableColumnDao.add_gen_table_column_dao(query_db, column)
await query_db.commit()
return CrudResponseModel(is_success=True, message='导入成功')
except Exception as e:
await query_db.rollback()
raise ServiceException(message=f'导入失败, {str(e)}')
@classmethod
async def edit_gen_table_services(cls, query_db: AsyncSession, page_object: EditGenTableModel):
"""
编辑岗位信息service
:param query_db: orm对象
:param page_object: 编辑岗位对象
:return: 编辑岗位校验结果
"""
edit_gen_table = page_object.model_dump(exclude_unset=True, by_alias=True)
gen_table_info = await cls.get_gen_table_by_id_services(query_db, page_object.table_id)
if gen_table_info.table_id:
try:
edit_gen_table['options'] = json.dumps(edit_gen_table.get('params'))
await GenTableDao.edit_gen_table_dao(query_db, edit_gen_table)
for gen_table_column in page_object.columns:
await GenTableColumnDao.edit_gen_table_column_dao(
query_db, gen_table_column.model_dump(by_alias=True)
)
await query_db.commit()
return CrudResponseModel(is_success=True, message='更新成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='业务表不存在')
@classmethod
async def delete_gen_table_services(cls, query_db: AsyncSession, page_object: DeleteGenTableModel):
"""
删除岗位信息service
:param query_db: orm对象
:param page_object: 删除岗位对象
:return: 删除岗位校验结果
"""
if page_object.table_ids:
table_id_list = page_object.table_ids.split(',')
try:
for table_id in table_id_list:
await GenTableDao.delete_gen_table_dao(query_db, GenTableModel(tableId=table_id))
await GenTableColumnDao.delete_gen_table_column_by_table_id_dao(
query_db, GenTableColumnModel(tableId=table_id)
)
await query_db.commit()
return CrudResponseModel(is_success=True, message='删除成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='传入业务表id为空')
@classmethod
async def get_gen_table_by_id_services(cls, query_db: AsyncSession, table_id: int):
"""
获取需要生成的表格详细信息service
:param query_db: orm对象
:param table_id: 需要生成的表格id
:return: 需要生成的表格id对应的信息
"""
gen_table = await GenTableDao.get_gen_table_by_id(query_db, table_id)
result = await cls.set_table_from_options(GenTableModel(**CamelCaseUtil.transform_result(gen_table)))
return result
@classmethod
async def get_gen_table_all_services(cls, query_db: AsyncSession):
"""
获取需要生成的表格详细信息service
:param query_db: orm对象
:param table_id: 需要生成的表格id
:return: 需要生成的表格id对应的信息
"""
gen_table_all = await GenTableDao.get_gen_table_all(query_db)
result = [GenTableModel(**gen_table) for gen_table in CamelCaseUtil.transform_result(gen_table_all)]
return result
@classmethod
async def create_table_services(cls, query_db: AsyncSession, sql: str, current_user: CurrentUserModel):
"""
创建表结构service
:param query_db: orm对象
:param sql: 建表语句
:param current_user: 当前用户信息对象
:return: 创建表结构结果
"""
if cls.__is_valid_create_table(sql):
try:
table_names = re.findall(r'create\s+table\s+(\w+)', sql, re.IGNORECASE)
await GenTableDao.create_table_by_sql_dao(query_db, sql)
gen_table_list = await cls.get_gen_db_table_list_by_name_services(query_db, table_names)
await cls.import_gen_table_services(query_db, gen_table_list, current_user)
return CrudResponseModel(is_success=True, message='创建表结构成功')
except Exception as e:
raise ServiceException(message=f'创建表结构异常,详细错误信息:{str(e)}')
else:
raise ServiceException(message='建表语句不合法')
@classmethod
def __is_valid_create_table(cls, sql: str):
"""
校验sql语句是否为合法的建表语句
:param sql: sql语句
:return: 校验结果
"""
create_table_pattern = r'^\s*CREATE\s+TABLE\s+'
if not re.search(create_table_pattern, sql, re.IGNORECASE):
return False
forbidden_keywords = ['INSERT', 'UPDATE', 'DELETE', 'DROP', 'ALTER', 'TRUNCATE']
for keyword in forbidden_keywords:
if re.search(rf'\b{keyword}\b', sql, re.IGNORECASE):
return False
return True
@classmethod
async def preview_code_services(cls, query_db: AsyncSession, table_id: int):
"""
预览代码service
:param query_db: orm对象
:param table_id: 表格id
:return: 预览数据列表
"""
gen_table = GenTableModel(
**CamelCaseUtil.transform_result(await GenTableDao.get_gen_table_by_id(query_db, table_id))
)
await cls.set_sub_table(query_db, gen_table)
await cls.set_pk_column(gen_table)
env = TemplateInitializer.init_jinja2()
context = TemplateUtils.prepare_context(gen_table)
template_list = TemplateUtils.get_template_list(gen_table.tpl_category, gen_table.tpl_web_type)
preview_code_result = {}
for template in template_list:
render_content = env.get_template(template).render(**context)
preview_code_result[template] = render_content
return preview_code_result
@classmethod
async def generate_code_services(cls, query_db: AsyncSession, table_name: str):
"""
生成代码至指定路径service
:param query_db: orm对象
:param table_name: 表格名称
:return: 生成代码结果
"""
env = TemplateInitializer.init_jinja2()
render_info = await cls.__get_gen_render_info(query_db, table_name)
for template in render_info[0]:
try:
render_content = env.get_template(template).render(**render_info[2])
gen_path = cls.__get_gen_path(render_info[3], template)
os.makedirs(os.path.dirname(gen_path), exist_ok=True)
with open(gen_path, 'w', encoding='utf-8') as f:
f.write(render_content)
except Exception as e:
raise ServiceException(
message=f'渲染模板失败,表名:{render_info[3].table_name},详细错误信息:{str(e)}'
)
return CrudResponseModel(is_success=True, message='生成代码成功')
@classmethod
async def batch_gen_code_services(cls, query_db: AsyncSession, table_names: List[str]):
"""
批量生成代码service
:param query_db: orm对象
:param table_names: 表格名称
:return: 下载代码结果
"""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for table_name in table_names:
env = TemplateInitializer.init_jinja2()
render_info = await cls.__get_gen_render_info(query_db, table_name)
for template_file, output_file in zip(render_info[0], render_info[1]):
render_content = env.get_template(template_file).render(**render_info[2])
zip_file.writestr(output_file, render_content)
zip_data = zip_buffer.getvalue()
zip_buffer.close()
return zip_data
@classmethod
async def __get_gen_render_info(cls, query_db: AsyncSession, table_name: str):
"""
获取生成代码渲染模板相关信息
:param query_db: orm对象
:param table_name: 表格名称
:return: 生成代码渲染模板相关信息
"""
gen_table = GenTableModel(
**CamelCaseUtil.transform_result(await GenTableDao.get_gen_table_by_name(query_db, table_name))
)
await cls.set_sub_table(query_db, gen_table)
await cls.set_pk_column(gen_table)
context = TemplateUtils.prepare_context(gen_table)
template_list = TemplateUtils.get_template_list(gen_table.tpl_category, gen_table.tpl_web_type)
output_files = [TemplateUtils.get_file_name(template, gen_table) for template in template_list]
return [template_list, output_files, context, gen_table]
@classmethod
def __get_gen_path(cls, gen_table: GenTableModel, template: str):
"""
根据GenTableModel对象和模板名称生成路径
:param gen_table: GenTableModel对象
:param template: 模板名称
:return: 生成的路径
"""
gen_path = gen_table.gen_path
if gen_path == '/':
return os.path.join(os.getcwd(), GenConfig.GEN_PATH, TemplateUtils.get_file_name(template, gen_table))
else:
return os.path.join(gen_path, TemplateUtils.get_file_name(template, gen_table))
@classmethod
async def sync_db_services(cls, query_db: AsyncSession, table_name: str):
"""
获取需要生成的表格详细信息service
:param query_db: orm对象
:param table_name: 表格名称
:return: 需要生成的表格名称对应的信息
"""
gen_table = await GenTableDao.get_gen_table_by_name(query_db, table_name)
table = GenTableModel(**CamelCaseUtil.transform_result(gen_table))
table_columns = table.columns
table_column_map = {column.column_name: column for column in table_columns}
query_db_table_columns = await GenTableColumnDao.get_gen_db_table_columns_by_name(query_db, table_name)
db_table_columns = [
GenTableColumnModel(**column) for column in CamelCaseUtil.transform_result(query_db_table_columns)
]
if not db_table_columns:
raise ServiceException('同步数据失败,原表结构不存在')
db_table_column_names = [column.column_name for column in db_table_columns]
try:
for column in db_table_columns:
GenUtils.init_column_field(column, table)
if column.column_name in table_column_map:
prev_column = table_column_map[column.column_name]
column.column_id = prev_column.column_id
if column.list:
column.dict_type = prev_column.dict_type
column.query_type = prev_column.query_type
if (
prev_column.is_required != ''
and not column.pk
and (column.insert or column.edit)
and (column.usable_column or column.super_column)
):
column.is_required = prev_column.is_required
column.html_type = prev_column.html_type
await GenTableColumnDao.edit_gen_table_column_dao(query_db, column.model_dump(by_alias=True))
else:
await GenTableColumnDao.add_gen_table_column_dao(query_db, column)
del_columns = [column for column in table_columns if column.column_name not in db_table_column_names]
if del_columns:
for column in del_columns:
await GenTableColumnDao.delete_gen_table_column_by_column_id_dao(query_db, column)
await query_db.commit()
return CrudResponseModel(is_success=True, message='同步成功')
except Exception as e:
await query_db.rollback()
raise e
@classmethod
async def set_sub_table(cls, query_db: AsyncSession, gen_table: GenTableModel):
"""
设置主子表信息
:param query_db: orm对象
:param gen_table: 业务表信息
"""
if gen_table.sub_table_name:
sub_table = await GenTableDao.get_gen_table_by_name(query_db, gen_table.sub_table_name)
gen_table.sub_table = GenTableModel(**CamelCaseUtil.transform_result(sub_table))
@classmethod
async def set_pk_column(cls, gen_table: GenTableModel):
"""
设置主键列信息
:param gen_table: 业务表信息
"""
for column in gen_table.columns:
if column.pk:
gen_table.pk_column = column
break
if gen_table.pk_column is None:
gen_table.pk_column = gen_table.columns[0]
if gen_table.tpl_category == GenConstant.TPL_SUB:
for column in gen_table.sub_table.columns:
if column.pk:
gen_table.sub_table.pk_column = column
break
if gen_table.sub_table.columns is None:
gen_table.sub_table.pk_column = gen_table.sub_table.columns[0]
@staticmethod
async def export_post_list_services(post_list: List):
"""
导出岗位信息service
:param post_list: 岗位信息列表
:return: 岗位信息对应excel的二进制数据
"""
# 创建一个映射字典,将英文键映射到中文键
mapping_dict = {
'postId': '岗位编号',
'postCode': '岗位编码',
'postName': '岗位名称',
'postSort': '显示顺序',
'status': '状态',
'createBy': '创建者',
'createTime': '创建时间',
'updateBy': '更新者',
'updateTime': '更新时间',
'remark': '备注',
}
for item in post_list:
if item.get('status') == '0':
item['status'] = '正常'
else:
item['status'] = '停用'
binary_data = ExcelUtil.export_list2excel(post_list, mapping_dict)
return binary_data
@classmethod
async def set_table_from_options(cls, gen_table: GenTableModel):
"""
设置代码生成其他选项值
:param gen_table: 设置后的生成对象
"""
params_obj = json.loads(gen_table.options) if gen_table.options else None
if params_obj:
gen_table.tree_code = params_obj.get(GenConstant.TREE_CODE)
gen_table.tree_parent_code = params_obj.get(GenConstant.TREE_PARENT_CODE)
gen_table.tree_name = params_obj.get(GenConstant.TREE_NAME)
gen_table.parent_menu_id = params_obj.get(GenConstant.PARENT_MENU_ID)
gen_table.parent_menu_name = params_obj.get(GenConstant.PARENT_MENU_NAME)
return gen_table
@classmethod
async def validate_edit(cls, edit_gen_table: EditGenTableModel):
if edit_gen_table.tpl_category == GenConstant.TPL_TREE:
params_obj = edit_gen_table.params
if not getattr(params_obj, GenConstant.TREE_CODE):
raise ServiceException('树编码字段不能为空')
elif not getattr(params_obj, GenConstant.TREE_PARENT_CODE):
raise ServiceException('树父编码字段不能为空')
elif not getattr(params_obj, GenConstant.TREE_NAME):
raise ServiceException('树名称字段不能为空')
elif edit_gen_table.tpl_category == GenConstant.TPL_SUB:
if not edit_gen_table.sub_table_name:
raise ServiceException('关联子表的表名不能为空')
elif not edit_gen_table.sub_table_fk_name:
raise ServiceException('子表关联的外键名不能为空')
class GenTableColumnService:
@classmethod
async def get_gen_table_column_list_by_table_id_services(cls, query_db: AsyncSession, table_id: int):
"""
获取代码生成列列表信息service
:param query_db: orm对象
:param table_id: 表格id
:return: 代码生成列列表信息对象
"""
gen_table_column_list_result = await GenTableColumnDao.get_gen_table_column_list_by_table_id(query_db, table_id)
return [
GenTableColumnModel(**gen_table_column)
for gen_table_column in CamelCaseUtil.transform_result(gen_table_column_list_result)
]

View File

@@ -0,0 +1,44 @@
import request from '@/utils/request'
// 查询{{ functionName }}列表
export function list{{ BusinessName }}(query) {
return request({
url: '/{{ moduleName }}/{{ businessName }}/list',
method: 'get',
params: query
})
}
// 查询{{ functionName }}详细
export function get{{ BusinessName }}({{ pkColumn.python_field }}) {
return request({
url: '/{{ moduleName }}/{{ businessName }}/' + {{ pkColumn.python_field }},
method: 'get'
})
}
// 新增{{ functionName }}
export function add{{ BusinessName }}(data) {
return request({
url: '/{{ moduleName }}/{{ businessName }}',
method: 'post',
data: data
})
}
// 修改{{ functionName }}
export function update{{ BusinessName }}(data) {
return request({
url: '/{{ moduleName }}/{{ businessName }}',
method: 'put',
data: data
})
}
// 删除{{ functionName }}
export function del{{ BusinessName }}({{ pkColumn.python_field }}) {
return request({
url: '/{{ moduleName }}/{{ businessName }}/' + {{ pkColumn.python_field }},
method: 'delete'
})
}

View File

@@ -0,0 +1,121 @@
{% set pkField = pkColumn.python_field %}
{% set pk_field = pkColumn.python_field | camel_to_snake %}
from datetime import datetime
from fastapi import APIRouter, Depends, Form, Request
from pydantic_validation_decorator import ValidateFields
from sqlalchemy.ext.asyncio import AsyncSession
from config.enums import BusinessType
from config.get_db import get_db
from module_admin.annotation.log_annotation import Log
from module_admin.aspect.interface_auth import CheckUserInterfaceAuth
from module_admin.service.login_service import LoginService
from {{ packageName }}.service.{{ businessName }}_service import {{ BusinessName }}Service
from {{ packageName }}.entity.vo.{{ businessName }}_vo import Delete{{ BusinessName }}Model, {{ BusinessName }}Model, {{ BusinessName }}PageQueryModel
from {{ packageName }}.entity.vo.user_vo import CurrentUserModel
from utils.common_util import bytes2file_response
from utils.log_util import logger
from utils.page_util import PageResponseModel
from utils.response_util import ResponseUtil
{{ businessName }}Controller = APIRouter(prefix='/{{ moduleName }}/{{ businessName }}', dependencies=[Depends(LoginService.get_current_user)])
@{{ businessName }}Controller.get(
'/list', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('{{ permissionPrefix }}:list'))]
)
async def get_{{ moduleName }}_{{ businessName }}_list(
request: Request,
{{ businessName }}_page_query: {{ BusinessName }}PageQueryModel = Depends({{ BusinessName }}PageQueryModel.as_query),
query_db: AsyncSession = Depends(get_db),
):
{% if table.crud or table.sub %}
# 获取分页数据
{{ businessName }}_page_query_result = await {{ BusinessName }}Service.get_{{ businessName }}_list_services(query_db, {{ businessName }}_page_query, is_page=True)
logger.info('获取成功')
return ResponseUtil.success(model_content={{ businessName }}_page_query_result)
{% elif table.tree %}
{{ businessName }}_query_result = await {{ BusinessName }}Service.get_{{ businessName }}_list_services(query_db, {{ businessName }}_query)
logger.info('获取成功')
return ResponseUtil.success(data={{ businessName }}_query_result)
{% endif %}
@{{ businessName }}Controller.post('', dependencies=[Depends(CheckUserInterfaceAuth('{{ permissionPrefix }}:add'))])
@ValidateFields(validate_model='add_{{ businessName }}')
@Log(title='{{ functionName }}', business_type=BusinessType.INSERT)
async def add_{{ moduleName }}_{{ businessName }}(
request: Request,
add_{{ businessName }}: {{ BusinessName }}Model,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
{% for column in columns %}
{% if column.python_field == "createBy" %}
add_{{ businessName }}.create_by = current_user.user.user_name
{% elif column.python_field == "createTime" %}
add_{{ businessName }}.create_time = datetime.now()
{% elif column.python_field == "updateBy" %}
add_{{ businessName }}.update_by = current_user.user.user_name
{% elif column.python_field == "updateTime" %}
add_{{ businessName }}.update_time = datetime.now()
{% endif %}
{% endfor %}
add_{{ businessName }}_result = await {{ BusinessName }}Service.add_{{ businessName }}_services(query_db, add_{{ businessName }})
logger.info(add_{{ businessName }}_result.message)
return ResponseUtil.success(msg=add_{{ businessName }}_result.message)
@{{ businessName }}Controller.put('', dependencies=[Depends(CheckUserInterfaceAuth('{{ permissionPrefix }}:edit'))])
@ValidateFields(validate_model='edit_{{ businessName }}')
@Log(title='{{ functionName }}', business_type=BusinessType.UPDATE)
async def edit_{{ moduleName }}_{{ businessName }}(
request: Request,
edit_{{ businessName }}: {{ BusinessName }}Model,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
edit_{{ businessName }}.update_by = current_user.user.user_name
edit_{{ businessName }}.update_time = datetime.now()
edit_{{ businessName }}_result = await {{ BusinessName }}Service.edit_{{ businessName }}_services(query_db, edit_{{ businessName }})
logger.info(edit_{{ businessName }}_result.message)
return ResponseUtil.success(msg=edit_{{ businessName }}_result.message)
@{{ businessName }}Controller.delete('/{% raw %}{{% endraw %}{{ pk_field }}s{% raw %}}{% endraw %}', dependencies=[Depends(CheckUserInterfaceAuth('{{ permissionPrefix }}:remove'))])
@Log(title='{{ functionName }}', business_type=BusinessType.DELETE)
async def delete_{{ moduleName }}_{{ businessName }}(request: Request, {{ pk_field }}s: str, query_db: AsyncSession = Depends(get_db)):
delete_{{ businessName }} = Delete{{ BusinessName }}Model({{ pkField }}s={{ pk_field }}s)
delete_{{ businessName }}_result = await {{ BusinessName }}Service.delete_{{ businessName }}_services(query_db, delete_{{ businessName }})
logger.info(delete_{{ businessName }}_result.message)
return ResponseUtil.success(msg=delete_{{ businessName }}_result.message)
@{{ businessName }}Controller.get(
'/{% raw %}{{% endraw %}{{ pk_field }}{% raw %}}{% endraw %}', response_model={{ BusinessName }}Model, dependencies=[Depends(CheckUserInterfaceAuth('{{ permissionPrefix }}:query'))]
)
async def query_detail_{{ moduleName }}_{{ businessName }}(request: Request, {{ pk_field }}: int, query_db: AsyncSession = Depends(get_db)):
{{ businessName }}_detail_result = await {{ BusinessName }}Service.{{ businessName }}_detail_services(query_db, {{ pk_field }})
logger.info(f'获取{{ pk_field }}为{% raw %}{{% endraw %}{{ pk_field }}{% raw %}}{% endraw %}的信息成功')
return ResponseUtil.success(data={{ businessName }}_detail_result)
@{{ businessName }}Controller.post('/export', dependencies=[Depends(CheckUserInterfaceAuth('{{ permissionPrefix }}:export'))])
@Log(title='{{ functionName }}', business_type=BusinessType.EXPORT)
async def export_{{ moduleName }}_{{ businessName }}_list(
request: Request,
{{ businessName }}_page_query: {{ BusinessName }}PageQueryModel = Form(),
query_db: AsyncSession = Depends(get_db),
):
# 获取全量数据
{{ businessName }}_query_result = await {{ BusinessName }}Service.get_{{ businessName }}_list_services(query_db, {{ businessName }}_page_query, is_page=False)
{{ businessName }}_export_result = await {{ BusinessName }}Service.export_{{ businessName }}_list_services({{ businessName }}_query_result)
logger.info('导出成功')
return ResponseUtil.streaming(data=bytes2file_response({{ businessName }}_export_result))

View File

@@ -0,0 +1,139 @@
{% set pkField = pkColumn.python_field %}
{% set pk_field = pkColumn.python_field | camel_to_snake %}
from datetime import datetime, time
from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from {{ packageName }}.entity.do.{{ businessName }}_do import {{ ClassName }}
from {{ packageName }}.entity.vo.{{ businessName }}_vo import {{ BusinessName }}Model, {{ BusinessName }}PageQueryModel
from utils.page_util import PageUtil
class {{ BusinessName }}Dao:
"""
{{ functionName }}模块数据库操作层
"""
@classmethod
async def get_{{ businessName }}_detail_by_id(cls, db: AsyncSession, {{ pk_field }}: int):
"""
根据{{ functionName }}id获取{{ functionName }}详细信息
:param db: orm对象
:param {{ pk_field }}: 通知公告id
:return: {{ functionName }}信息对象
"""
{{ businessName }}_info = (await db.execute(select({{ ClassName }}).where({{ ClassName }}.{{ pk_field }} == {{ pk_field }}))).scalars().first()
return {{ businessName }}_info
@classmethod
async def get_{{ businessName }}_detail_by_info(cls, db: AsyncSession, {{ businessName }}: {{ BusinessName }}Model):
"""
根据{{ functionName }}参数获取{{ functionName }}信息
:param db: orm对象
:param {{ businessName }}: {{ functionName }}参数对象
:return: {{ functionName }}信息对象
"""
{{ businessName }}_info = (
(
await db.execute(
select({{ ClassName }}).where(
{% for column in columns %}
{% if column.required %}
{{ ClassName }}.{{ column.python_field | camel_to_snake }} == {{ businessName }}.{{ column.python_field | camel_to_snake }},
{% endif %}
{% endfor %}
)
)
)
.scalars()
.first()
)
return {{ businessName }}_info
@classmethod
async def get_{{ businessName }}_list(cls, db: AsyncSession, query_object: {{ BusinessName }}PageQueryModel, is_page: bool = False):
"""
根据查询参数获取{{ functionName }}列表信息
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: {{ functionName }}列表信息对象
"""
query = (
select({{ ClassName }})
.where(
{% for column in columns %}
{% set field = column.python_field | camel_to_snake %}
{% if column.query %}
{% if column.query_type == "EQ" %}
{{ ClassName }}.{{ field }} == query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "NE" %}
{{ ClassName }}.{{ field }} != query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "GT" %}
{{ ClassName }}.{{ field }} > query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "GTE" %}
{{ ClassName }}.{{ field }} >= query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "LT" %}
{{ ClassName }}.{{ field }} < query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "LTE" %}
{{ ClassName }}.{{ field }} <= query_object.{{ field }} if query_object.{{ field }} else True,
{% elif column.query_type == "LIKE" %}
{{ ClassName }}.{{ field }}.like(f'%{% raw %}{{% endraw %}query_object.{{ field }}{% raw %}}{% endraw %}%') if query_object.{{ field }} else True,
{% elif column.query_type == "BETWEEN" %}
{{ ClassName }}.{{ field }}.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(00, 00, 00)),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
)
if query_object.begin_time and query_object.end_time
else True,
{% endif %}
{% endif %}
{% endfor %}
)
.order_by({{ ClassName }}.{{ pk_field }})
.distinct()
)
{{ businessName }}_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
return {{ businessName }}_list
@classmethod
async def add_{{ businessName }}_dao(cls, db: AsyncSession, {{ businessName }}: {{ BusinessName }}Model):
"""
新增{{ functionName }}数据库操作
:param db: orm对象
:param notice: {{ functionName }}对象
:return:
"""
db_{{ businessName }} = {{ ClassName }}(**{{ businessName }}.model_dump())
db.add(db_{{ businessName }})
await db.flush()
return db_{{ businessName }}
@classmethod
async def edit_{{ businessName }}_dao(cls, db: AsyncSession, {{ businessName }}: dict):
"""
编辑{{ functionName }}数据库操作
:param db: orm对象
:param notice: 需要更新的{{ functionName }}字典
:return:
"""
await db.execute(update({{ ClassName }}), [{{ businessName }}])
@classmethod
async def delete_{{ businessName }}_dao(cls, db: AsyncSession, {{ businessName }}: {{ BusinessName }}Model):
"""
删除{{ functionName }}数据库操作
:param db: orm对象
:param notice: {{ functionName }}对象
:return:
"""
await db.execute(delete({{ ClassName }}).where({{ ClassName }}.{{ pk_field }}.in_([{{ businessName }}.{{ pk_field }}])))

View File

@@ -0,0 +1,40 @@
from datetime import datetime
from sqlalchemy import Column, DateTime, Integer, String
{% if table.sub %}
from sqlalchemy.orm import relationship
{% endif %}
from config.database import Base
class {{ ClassName }}(Base):
"""
{{ functionName }}表
"""
__tablename__ = '{{ tableName }}'
{% for column in columns %}
{{ column.column_name }} = Column({{ column.column_type | get_sqlalchemy_type }}, {% if column.pk %}primary_key=True, {% endif %}{% if column.increment %}autoincrement=True, {% endif %}{% if column.required %}nullable=True{% else %}nullable=False{% endif %}, comment='{{ column.column_comment }}')
{% endfor %}
{% if table.sub %}
{{ table.sub_table.business_name }} = relationship('{{ table.sub_table.class_name }}', back_populates='{{ businessName }}')
{% endif %}
{% if table.sub %}
class {{ table.sub_table.class_name }}(Base):
"""
{{ table.sub_table.function_name }}表
"""
__tablename__ = '{{ table.sub_table.table_name }}'
{% for column in table.sub_table.columns %}
{{ column.column_name }} = Column({{ column.column_type | get_sqlalchemy_type }}, {% if column.pk %}primary_key=True, {% endif %}{% if column.increment %}autoincrement=True, {% endif %}{% if column.required %}nullable=True{% else %}nullable=False{% endif %}, comment='{{ column.column_comment }}')
{% endfor %}
{% if table.sub %}
{{ businessName }} = relationship('{{ ClassName }}', back_populates='{{ table.sub_table.business_name }}')
{% endif %}
{% endif %}

View File

@@ -0,0 +1,171 @@
{% set pkField = pkColumn.python_field %}
{% set pk_field = pkColumn.python_field | camel_to_snake %}
{% set pk_field_comment = pkColumn.comment %}
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from config.constant import CommonConstant
from exceptions.exception import ServiceException
from {{ packageName }}.dao.{{ businessName }}_dao import {{ BusinessName }}Dao
from module_admin.entity.vo.common_vo import CrudResponseModel
from {{ packageName }}.entity.vo.{{ businessName }}_vo import Delete{{ BusinessName }}Model, {{ BusinessName }}Model, {{ BusinessName }}PageQueryModel
from utils.common_util import CamelCaseUtil
from utils.excel_util import ExcelUtil
class {{ BusinessName }}Service:
"""
{{ functionName }}模块服务层
"""
@classmethod
async def get_{{ businessName }}_list_services(
cls, query_db: AsyncSession, query_object: {{ BusinessName }}PageQueryModel, is_page: bool = False
):
"""
获取{{ functionName }}列表信息service
:param query_db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: {{ functionName }}列表信息对象
"""
{{ businessName }}_list_result = await {{ BusinessName }}Dao.get_{{ businessName }}_list(query_db, query_object, is_page)
return {{ businessName }}_list_result
{% for column in columns %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
{% if column.required %}
@classmethod
async def check_{{ column.python_field | camel_to_snake }}_unique_services(cls, query_db: AsyncSession, page_object: {{ BusinessName }}Model):
"""
检查{{ comment }}是否唯一service
:param query_db: orm对象
:param page_object: {{ functionName }}对象
:return: 校验结果
"""
{{ pk_field }} = -1 if page_object.{{ pk_field }} is None else page_object.{{ pk_field }}
{{ businessName }} = await {{ BusinessName }}Dao.get_{{ businessName }}_detail_by_info(query_db, {{ BusinessName }}Model({{ column.python_field }}=page_object.{{ column.python_field | camel_to_snake }}))
if {{ businessName }} and {{ businessName }}.{{ pk_field }} != {{ pk_field }}:
return CommonConstant.NOT_UNIQUE
return CommonConstant.UNIQUE
{% if not loop.last %}{{ "\n" }}{% endif %}
{% endif %}
{% endfor %}
@classmethod
async def add_{{ businessName }}_services(cls, query_db: AsyncSession, page_object: {{ BusinessName }}Model):
"""
新增{{ functionName }}信息service
:param query_db: orm对象
:param page_object: 新增{{ functionName }}对象
:return: 新增{{ functionName }}校验结果
"""
if not await cls.check_post_name_unique_services(query_db, page_object):
raise ServiceException(message=f'新增岗位{page_object.post_name}失败,岗位名称已存在')
elif not await cls.check_post_code_unique_services(query_db, page_object):
raise ServiceException(message=f'新增岗位{page_object.post_name}失败,岗位编码已存在')
else:
try:
await {{ BusinessName }}Dao.add_{{ businessName }}_dao(query_db, page_object)
await query_db.commit()
return CrudResponseModel(is_success=True, message='新增成功')
except Exception as e:
await query_db.rollback()
raise e
@classmethod
async def edit_{{ businessName }}_services(cls, query_db: AsyncSession, page_object: {{ BusinessName }}Model):
"""
编辑{{ functionName }}信息service
:param query_db: orm对象
:param page_object: 编辑{{ functionName }}对象
:return: 编辑{{ functionName }}校验结果
"""
edit_{{ businessName }} = page_object.model_dump(exclude_unset=True)
{{ businessName }}_info = await cls.{{ businessName }}_detail_services(query_db, page_object.post_id)
if {{ businessName }}_info.post_id:
if not await cls.check_post_name_unique_services(query_db, page_object):
raise ServiceException(message=f'修改岗位{page_object.post_name}失败,岗位名称已存在')
elif not await cls.check_post_code_unique_services(query_db, page_object):
raise ServiceException(message=f'修改岗位{page_object.post_name}失败,岗位编码已存在')
else:
try:
await {{ BusinessName }}Dao.edit_{{ businessName }}_dao(query_db, edit_{{ businessName }})
await query_db.commit()
return CrudResponseModel(is_success=True, message='更新成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='{{ functionName }}不存在')
@classmethod
async def delete_{{ businessName }}_services(cls, query_db: AsyncSession, page_object: Delete{{ BusinessName }}Model):
"""
删除{{ functionName }}信息service
:param query_db: orm对象
:param page_object: 删除{{ functionName }}对象
:return: 删除{{ functionName }}校验结果
"""
if page_object.{{ pk_field }}s:
{{ pk_field }}_list = page_object.{{ pk_field }}s.split(',')
try:
for {{ pk_field }} in {{ pk_field }}_list:
{{ businessName }} = await cls.{{ businessName }}_detail_services(query_db, int({{ pk_field }}))
await {{ BusinessName }}Dao.delete_{{ businessName }}_dao(query_db, {{ BusinessName }}Model({{ pkField }}={{ pk_field }}))
await query_db.commit()
return CrudResponseModel(is_success=True, message='删除成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='传入{{ pk_field_comment }}为空')
@classmethod
async def {{ businessName }}_detail_services(cls, query_db: AsyncSession, {{ pk_field }}: int):
"""
获取{{ functionName }}详细信息service
:param query_db: orm对象
:param post_id: 岗位id
:return: 岗位id对应的信息
"""
{{ businessName }} = await {{ BusinessName }}Dao.get_{{ businessName }}_detail_by_id(query_db, {{ pk_field }}={{ pk_field }})
if {{ businessName }}:
result = {{ BusinessName }}Model(**CamelCaseUtil.transform_result({{ businessName }}))
else:
result = {{ BusinessName }}Model(**dict())
return result
@staticmethod
async def export_{{ businessName }}_list_services({{ businessName }}_list: List):
"""
导出{{ functionName }}信息service
:param {{ businessName }}_list: {{ functionName }}信息列表
:return: {{ functionName }}信息对应excel的二进制数据
"""
# 创建一个映射字典,将英文键映射到中文键
mapping_dict = {
{% for column in columns %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
'{{ column.python_field }}': '{{ comment }}',
{% endfor %}
}
for item in {{ businessName }}_list:
if item.get('status') == '0':
item['status'] = '正常'
else:
item['status'] = '停用'
binary_data = ExcelUtil.export_list2excel({{ businessName }}_list, mapping_dict)
return binary_data

View File

@@ -0,0 +1,72 @@
{% set pkField = pkColumn.python_field %}
{% set pk_field = pkColumn.python_field | camel_to_snake %}
{% set pk_field_comment = pkColumn.comment %}
from datetime import datetime
from pydantic import BaseModel, ConfigDict, Field
from pydantic.alias_generators import to_camel
from pydantic_validation_decorator import NotBlank
from typing import Literal, Optional
from module_admin.annotation.pydantic_annotation import as_query
class {{ BusinessName }}Model(BaseModel):
"""
{{ functionName }}表对应pydantic模型
"""
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True)
{% for column in columns %}
{{ column.column_name }}: Optional[{{ column.python_type }}] = Field(default=None, description='{{ column.column_comment }}')
{% endfor %}
{% for column in columns %}
{% if column.required %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
@NotBlank(field_name='{{ column.column_name }}', message='{{ comment }}不能为空')
def get_{{ column.column_name }}(self):
return self.{{ column.column_name }}
{% endif %}
{% endfor %}
def validate_fields(self):
{% set vo_field_required = namespace(has_required=False) %}
{% for column in columns %}
{% if column.required %}
self.get_{{ column.column_name }}()
{% set vo_field_required.has_required = True %}
{% endif %}
{% endfor %}
{% if not vo_field_required.has_required %}
pass
{% endif %}
class {{ BusinessName }}QueryModel({{ BusinessName }}Model):
"""
{{ functionName }}不分页查询模型
"""
begin_time: Optional[str] = Field(default=None, description='开始时间')
end_time: Optional[str] = Field(default=None, description='结束时间')
@as_query
class {{ BusinessName }}PageQueryModel({{ BusinessName }}QueryModel):
"""
{{ functionName }}分页查询模型
"""
page_num: int = Field(default=1, description='当前页码')
page_size: int = Field(default=10, description='每页记录数')
class Delete{{ BusinessName }}Model(BaseModel):
"""
删除{{ functionName }}模型
"""
model_config = ConfigDict(alias_generator=to_camel)
{{ pk_field }}s: str = Field(description='需要删除的{{ pk_field_comment }}')

View File

@@ -0,0 +1,22 @@
-- 菜单 SQL
insert into sys_menu (menu_name, parent_id, order_num, path, component, is_frame, is_cache, menu_type, visible, status, perms, icon, create_by, create_time, update_by, update_time, remark)
values('{{ functionName }}', '{{ parentMenuId }}', '1', '{{ businessName }}', '{{ moduleName }}/{{ businessName }}/index', 1, 0, 'C', '0', '0', '{{ permissionPrefix }}:list', '#', 'admin', sysdate(), '', null, '{{ functionName }}菜单');
-- 按钮父菜单ID
SELECT @parentId := LAST_INSERT_ID();
-- 按钮 SQL
insert into sys_menu (menu_name, parent_id, order_num, path, component, is_frame, is_cache, menu_type, visible, status, perms, icon, create_by, create_time, update_by, update_time, remark)
values('{{ functionName }}查询', @parentId, '1', '#', '', 1, 0, 'F', '0', '0', '{{ permissionPrefix }}:query', '#', 'admin', sysdate(), '', null, '');
insert into sys_menu (menu_name, parent_id, order_num, path, component, is_frame, is_cache, menu_type, visible, status, perms, icon, create_by, create_time, update_by, update_time, remark)
values('{{ functionName }}新增', @parentId, '2', '#', '', 1, 0, 'F', '0', '0', '{{ permissionPrefix }}:add', '#', 'admin', sysdate(), '', null, '');
insert into sys_menu (menu_name, parent_id, order_num, path, component, is_frame, is_cache, menu_type, visible, status, perms, icon, create_by, create_time, update_by, update_time, remark)
values('{{ functionName }}修改', @parentId, '3', '#', '', 1, 0, 'F', '0', '0', '{{ permissionPrefix }}:edit', '#', 'admin', sysdate(), '', null, '');
insert into sys_menu (menu_name, parent_id, order_num, path, component, is_frame, is_cache, menu_type, visible, status, perms, icon, create_by, create_time, update_by, update_time, remark)
values('{{ functionName }}删除', @parentId, '4', '#', '', 1, 0, 'F', '0', '0', '{{ permissionPrefix }}:remove', '#', 'admin', sysdate(), '', null, '');
insert into sys_menu (menu_name, parent_id, order_num, path, component, is_frame, is_cache, menu_type, visible, status, perms, icon, create_by, create_time, update_by, update_time, remark)
values('{{ functionName }}导出', @parentId, '5', '#', '', 1, 0, 'F', '0', '0', '{{ permissionPrefix }}:export', '#', 'admin', sysdate(), '', null, '');

View File

@@ -0,0 +1,572 @@
<template>
<div class="app-container">
<el-form :model="queryParams" ref="queryRef" :inline="true" v-show="showSearch" label-width="68px">
{% for column in columns %}
{% if column.query %}
{% set dictType = column.dict_type %}
{% set AttrName = column.python_field[0] | upper + column.python_field[1:] %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
{% if column.html_type == "input" %}
<el-form-item label="{{ comment }}" prop="{{ column.python_field }}">
<el-input
v-model="queryParams.{{ column.python_field }}"
placeholder="请输入{{ comment }}"
clearable
@keyup.enter="handleQuery"
/>
</el-form-item>
{% elif (column.html_type == "select" or column.html_type == "radio") and dictType %}
<el-form-item label="{{ comment }}" prop="{{ column.python_field }}">
<el-select v-model="queryParams.{{ column.python_field }}" placeholder="请选择{{ comment }}" clearable>
<el-option
v-for="dict in {{ dictType }}"
:key="dict.value"
:label="dict.label"
:value="dict.value"
/>
</el-select>
</el-form-item>
{% elif (column.html_type == "select" or column.html_type == "radio") and not dictType %}
<el-form-item label="{{ comment }}" prop="{{ column.python_field }}">
<el-select v-model="queryParams.{{ column.python_field }}" placeholder="请选择{{ comment }}" clearable>
<el-option label="请选择字典生成" value="" />
</el-select>
</el-form-item>
{% elif column.html_type == "datetime" and column.query_type != "BETWEEN" %}
<el-form-item label="{{ comment }}" prop="{{ column.python_field }}">
<el-date-picker clearable
v-model="queryParams.{{ column.python_field }}"
type="date"
value-format="YYYY-MM-DD"
placeholder="请选择{{ comment }}">
</el-date-picker>
</el-form-item>
{% elif column.html_type == "datetime" and column.query_type == "BETWEEN" %}
<el-form-item label="{{ comment }}" style="width: 308px">
<el-date-picker
v-model="daterange{{ AttrName }}"
value-format="YYYY-MM-DD"
type="daterange"
range-separator="-"
start-placeholder="开始日期"
end-placeholder="结束日期"
></el-date-picker>
</el-form-item>
{% endif %}
{% endif %}
{% endfor %}
<el-form-item>
<el-button type="primary" icon="Search" @click="handleQuery">搜索</el-button>
<el-button icon="Refresh" @click="resetQuery">重置</el-button>
</el-form-item>
</el-form>
<el-row :gutter="10" class="mb8">
<el-col :span="1.5">
<el-button
type="primary"
plain
icon="Plus"
@click="handleAdd"
v-hasPermi="['{{ moduleName }}:{{ businessName }}:add']"
>新增</el-button>
</el-col>
<el-col :span="1.5">
<el-button
type="success"
plain
icon="Edit"
:disabled="single"
@click="handleUpdate"
v-hasPermi="['{{ moduleName }}:{{ businessName }}:edit']"
>修改</el-button>
</el-col>
<el-col :span="1.5">
<el-button
type="danger"
plain
icon="Delete"
:disabled="multiple"
@click="handleDelete"
v-hasPermi="['{{ moduleName }}:{{ businessName }}:remove']"
>删除</el-button>
</el-col>
<el-col :span="1.5">
<el-button
type="warning"
plain
icon="Download"
@click="handleExport"
v-hasPermi="['{{ moduleName }}:{{ businessName }}:export']"
>导出</el-button>
</el-col>
<right-toolbar v-model:showSearch="showSearch" @queryTable="getList"></right-toolbar>
</el-row>
<el-table v-loading="loading" :data="{{ businessName }}List" @selection-change="handleSelectionChange">
<el-table-column type="selection" width="55" align="center" />
{% for column in columns %}
{% set pythonField = column.python_field %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
{% if column.pk %}
<el-table-column label="{{ comment }}" align="center" prop="{{ pythonField }}" />
{% elif column.list and column.html_type == "datetime" %}
<el-table-column label="{{ comment }}" align="center" prop="{{ pythonField }}" width="180">
<template #default="scope">
<span>{% raw %}{{{% endraw %} parseTime(scope.row.{{ pythonField }}, '{y}-{m}-{d}') {% raw %}}}{% endraw %}</span>
</template>
</el-table-column>
{% elif column.list and column.html_type == "imageUpload" %}
<el-table-column label="{{ comment }}" align="center" prop="{{ pythonField }}" width="100">
<template #default="scope">
<image-preview :src="scope.row.{{ pythonField }}" :width="50" :height="50"/>
</template>
</el-table-column>
{% elif column.list and column.dict_type %}
<el-table-column label="{{ comment }}" align="center" prop="{{ pythonField }}">
<template #default="scope">
{% if column.html_type == "checkbox" %}
<dict-tag :options="{{ column.dict_type }}" :value="scope.row.{{ pythonField }} ? scope.row.{{ pythonField }}.split(',') : []"/>
{% else %}
<dict-tag :options="{{ column.dict_type }}" :value="scope.row.{{ pythonField }}"/>
{% endif %}
</template>
</el-table-column>
{% elif column.list and pythonField %}
<el-table-column label="{{ comment }}" align="center" prop="{{ pythonField }}" />
{% endif %}
{% endfor %}
<el-table-column label="操作" align="center" class-name="small-padding fixed-width">
<template #default="scope">
<el-button link type="primary" icon="Edit" @click="handleUpdate(scope.row)" v-hasPermi="['{{ moduleName }}:{{ businessName }}:edit']">修改</el-button>
<el-button link type="primary" icon="Delete" @click="handleDelete(scope.row)" v-hasPermi="['{{ moduleName }}:{{ businessName }}:remove']">删除</el-button>
</template>
</el-table-column>
</el-table>
<pagination
v-show="total>0"
:total="total"
v-model:page="queryParams.pageNum"
v-model:limit="queryParams.pageSize"
@pagination="getList"
/>
<!-- 添加或修改{{ functionName }}对话框 -->
<el-dialog :title="title" v-model="open" width="500px" append-to-body>
<el-form ref="{{ businessName }}Ref" :model="form" :rules="rules" label-width="80px">
{% for column in columns %}
{% set field = column.python_field %}
{% if column.insert and not column.pk %}
{% if column.usable_column or column.super_column %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
{% set dictType = column.dict_type %}
{% if column.html_type == "input" %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<el-input v-model="form.{{ field }}" placeholder="请输入{{ comment }}" />
</el-form-item>
{% elif column.html_type == "imageUpload" %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<image-upload v-model="form.{{ field }}"/>
</el-form-item>
{% elif column.html_type == "fileUpload" %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<file-upload v-model="form.{{ field }}"/>
</el-form-item>
{% elif column.html_type == "editor" %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<editor v-model="form.{{ field }}" :min-height="192"/>
</el-form-item>
{% elif column.html_type == "select" and dictType %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<el-select v-model="form.{{ field }}" placeholder="请选择{{ comment }}">
<el-option
v-for="dict in {{ dictType }}"
:key="dict.value"
:label="dict.label"
{% if column.python_type == 'Integer' or column.python_type == 'Long' %}
:value="parseInt(dict.value)"
{% else %}
:value="dict.value"
{% endif %}
></el-option>
</el-select>
</el-form-item>
{% elif column.html_type == "select" and not dictType %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<el-select v-model="form.{{ field }}" placeholder="请选择{{ comment }}">
<el-option label="请选择字典生成" value="" />
</el-select>
</el-form-item>
{% elif column.html_type == "checkbox" and dictType %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<el-checkbox-group v-model="form.{{ field }}">
<el-checkbox
v-for="dict in {{ dictType }}"
:key="dict.value"
:label="dict.value">
{{ dict.label }}
</el-checkbox>
</el-checkbox-group>
</el-form-item>
{% elif column.html_type == "checkbox" and not dictType %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<el-checkbox-group v-model="form.{{ field }}">
<el-checkbox>请选择字典生成</el-checkbox>
</el-checkbox-group>
</el-form-item>
{% elif column.html_type == "radio" and dictType %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<el-radio-group v-model="form.{{ field }}">
<el-radio
v-for="dict in {{ dictType }}"
:key="dict.value"
{% if column.python_type == 'Integer' or column.python_type == 'Long' %}
:label="parseInt(dict.value)"
{% else %}
:label="dict.value"
{% endif %}>
{{ dict.label }}
</el-radio>
</el-radio-group>
</el-form-item>
{% elif column.html_type == "radio" and not dictType %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<el-radio-group v-model="form.{{ field }}">
<el-radio label="请选择字典生成" value="" />
</el-radio-group>
</el-form-item>
{% elif column.html_type == "datetime" %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<el-date-picker clearable
v-model="form.{{ field }}"
type="date"
value-format="YYYY-MM-DD"
placeholder="请选择{{ comment }}">
</el-date-picker>
</el-form-item>
{% elif column.html_type == "textarea" %}
<el-form-item label="{{ comment }}" prop="{{ field }}">
<el-input v-model="form.{{ field }}" type="textarea" placeholder="请输入内容" />
</el-form-item>
{% endif %}
{% endif %}
{% endif %}
{% endfor %}
{% if table.sub %}
<el-divider content-position="center">{{ subTable.functionName }}信息</el-divider>
<el-row :gutter="10" class="mb8">
<el-col :span="1.5">
<el-button type="primary" icon="Plus" @click="handleAdd{{ subClassName }}">添加</el-button>
</el-col>
<el-col :span="1.5">
<el-button type="danger" icon="Delete" @click="handleDelete{{ subClassName }}">删除</el-button>
</el-col>
</el-row>
<el-table :data="{{ subclassName }}List" :row-class-name="row{{ subClassName }}Index" @selection-change="handle{{ subClassName }}SelectionChange" ref="{{ subclassName }}">
<el-table-column type="selection" width="50" align="center" />
<el-table-column label="序号" align="center" prop="index" width="50"/>
{% for column in subTable.columns %}
{% set pythonField = column.python_field %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
{% if column.pk or pythonField == subTableFkclassName %}
{% elif column.list and column.html_type == 'input' %}
<el-table-column label="{{ comment }}" prop="{{ pythonField }}" width="150">
<template #default="scope">
<el-input v-model="scope.row.{{ pythonField }}" placeholder="请输入{{ comment }}" />
</template>
</el-table-column>
{% elif column.list and column.html_type == 'datetime' %}
<el-table-column label="{{ comment }}" prop="{{ pythonField }}" width="240">
<template #default="scope">
<el-date-picker clearable
v-model="scope.row.{{ pythonField }}"
type="date"
value-format="YYYY-MM-DD"
placeholder="请选择{{ comment }}">
</el-date-picker>
</template>
</el-table-column>
{% elif column.list and (column.html_type == 'select' or column.html_type == 'radio') and column.dict_type %}
<el-table-column label="{{ comment }}" prop="{{ pythonField }}" width="150">
<template #default="scope">
<el-select v-model="scope.row.{{ pythonField }}" placeholder="请选择{{ comment }}">
<el-option
v-for="dict in {{ column.dictType }}"
:key="dict.value"
:label="dict.label"
:value="dict.value"
></el-option>
</el-select>
</template>
</el-table-column>
{% elif column.list and (column.html_type == 'select' or column.html_type == 'radio') and not column.dict_type %}
<el-table-column label="{{ comment }}" prop="{{ pythonField }}" width="150">
<template #default="scope">
<el-select v-model="scope.row.{{ pythonField }}" placeholder="请选择{{ comment }}">
<el-option label="请选择字典生成" value="" />
</el-select>
</template>
</el-table-column>
{% endif %}
{% endfor %}
</el-table>
{% endif %}
</el-form>
<template #footer>
<div class="dialog-footer">
<el-button type="primary" @click="submitForm">确 定</el-button>
<el-button @click="cancel">取 消</el-button>
</div>
</template>
</el-dialog>
</div>
</template>
<script setup name="{{ BusinessName }}">
import { list{{ BusinessName }}, get{{ BusinessName }}, del{{ BusinessName }}, add{{ BusinessName }}, update{{ BusinessName }} } from "@/api/{{ moduleName }}/{{ businessName }}";
const { proxy } = getCurrentInstance();
{% if dicts != '' %}
{% set dictsNoSymbol = dicts.replace("'", "") %}
const { {{ dictsNoSymbol }} } = proxy.useDict({{ dicts }});
{% endif %}
const {{ businessName }}List = ref([]);
{% if table.sub %}
const {{ subclassName }}List = ref([]);
{% endif %}
const open = ref(false);
const loading = ref(true);
const showSearch = ref(true);
const ids = ref([]);
{% if table.sub %}
const checked{{ subClassName }} = ref([]);
{% endif %}
const single = ref(true);
const multiple = ref(true);
const total = ref(0);
const title = ref("");
{% for column in columns %}
{% if column.html_type == "datetime" and column.query_type == "BETWEEN" %}
{% set AttrName = column.python_field[0] | upper + column.python_field[1:] %}
const daterange{{ AttrName }} = ref([]);
{% endif %}
{% endfor %}
const data = reactive({
form: {},
queryParams: {
pageNum: 1,
pageSize: 10,
{% for column in columns %}
{% if column.query %}
{{ column.python_field }}: null,
{% endif %}
{% endfor %}
},
rules: {
{% for column in columns %}
{% if column.required %}
{% set parentheseIndex = column.column_comment.find("") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
{{ column.python_field }}: [
{ required: true, message: "{{ comment }}不能为空", trigger: "{% if column.html_type == 'select' or column.html_type == 'radio' %}change{% else %}blur{% endif %}" }
],
{% endif %}
{% endfor %}
}
});
const { queryParams, form, rules } = toRefs(data);
/** 查询{{ functionName }}列表 */
function getList() {
loading.value = true;
{% for column in columns %}
{% if column.html_type == "datetime" and column.query_type == "BETWEEN" %}
queryParams.value.params = {};
{% endif %}
{% endfor %}
{% for column in columns %}
{% if column.html_type == "datetime" and column.query_type == "BETWEEN" %}
{% set AttrName = column.python_field[0] | upper + column.python_field[1:] %}
if (null != daterange{{ AttrName }} && '' != daterange{{ AttrName }}) {
queryParams.value.params["begin{{ AttrName }}"] = daterange{{ AttrName }}.value[0];
queryParams.value.params["end{{ AttrName }}"] = daterange{{ AttrName }}.value[1];
}
{% endif %}
{% endfor %}
list{{ BusinessName }}(queryParams.value).then(response => {
{{ businessName }}List.value = response.rows;
total.value = response.total;
loading.value = false;
});
}
/** 取消按钮 */
function cancel() {
open.value = false;
reset();
}
/** 表单重置 */
function reset() {
form.value = {
{% for column in columns %}
{% if column.html_type == "checkbox" %}
{{ column.python_field }}: [],
{% else %}
{{ column.python_field }}: null,
{% endif %}
{% endfor %}
};
{% if table.sub %}
{{ subclassName }}List.value = [];
{% endif %}
proxy.resetForm("{{ businessName }}Ref");
}
/** 搜索按钮操作 */
function handleQuery() {
queryParams.value.pageNum = 1;
getList();
}
/** 重置按钮操作 */
function resetQuery() {
{% for column in columns %}
{% if column.html_type == "datetime" and column.query_type == "BETWEEN" %}
{% set AttrName = column.python_field[0] | upper + column.python_field[1:] %}
daterange{{ AttrName }}.value = [];
{% endif %}
{% endfor %}
proxy.resetForm("queryRef");
handleQuery();
}
/** 多选框选中数据 */
function handleSelectionChange(selection) {
ids.value = selection.map(item => item.{{ pkColumn.python_field }});
single.value = selection.length != 1;
multiple.value = !selection.length;
}
/** 新增按钮操作 */
function handleAdd() {
reset();
open.value = true;
title.value = "添加{{ functionName }}";
}
/** 修改按钮操作 */
function handleUpdate(row) {
reset();
const _{{ pkColumn.python_field }} = row.{{ pkColumn.python_field }} || ids.value;
get{{ BusinessName }}(_{{ pkColumn.python_field }}).then(response => {
form.value = response.data;
{% for column in columns %}
{% if column.html_type == "checkbox" %}
form.value.{{ column.python_field }} = form.value.{{ column.python_field }}.split(",");
{% endif %}
{% endfor %}
{% if table.sub %}
{{ subclassName }}List.value = response.{{ subclassName }}List;
{% endif %}
open.value = true;
title.value = "修改{{ functionName }}";
});
}
/** 提交按钮 */
function submitForm() {
proxy.$refs["{{ businessName }}Ref"].validate(valid => {
if (valid) {
{% for column in columns %}
{% if column.html_type == "checkbox" %}
form.value.{{ column.python_field }} = form.value.{{ column.python_field }}.join(",");
{% endif %}
{% endfor %}
{% if table.sub %}
form.value.{{ subclassName }}List = {{ subclassName }}List.value;
{% endif %}
if (form.value.{{ pkColumn.python_field }} != null) {
update{{ BusinessName }}(form.value).then(response => {
proxy.$modal.msgSuccess("修改成功");
open.value = false;
getList();
});
} else {
add{{ BusinessName }}(form.value).then(response => {
proxy.$modal.msgSuccess("新增成功");
open.value = false;
getList();
});
}
}
});
}
/** 删除按钮操作 */
function handleDelete(row) {
const _{{ pkColumn.python_field }}s = row.{{ pkColumn.python_field }} || ids.value;
proxy.$modal.confirm('是否确认删除{{ functionName }}编号为"' + _{{ pkColumn.python_field }}s + '"的数据项?').then(function() {
return del{{ BusinessName }}(_{{ pkColumn.python_field }}s);
}).then(() => {
getList();
proxy.$modal.msgSuccess("删除成功");
}).catch(() => {});
}
{% if table.sub %}
/** {{ subTable.functionName }}序号 */
function row{{ subClassName }}Index({ row, rowIndex }) {
row.index = rowIndex + 1;
}
/** {{ subTable.functionName }}添加按钮操作 */
function handleAdd{{ subClassName }}() {
let obj = {};
{% for column in subTable.columns %}
{% if column.pk or column.python_field == subTableFkclassName %}
{% elif column.list and "" != column.python_field %}
obj.{{ column.python_field }} = "";
{% endif %}
{% endfor %}
{{ subclassName }}List.value.push(obj);
}
/** {{ subTable.functionName }}删除按钮操作 */
function handleDelete{{ subClassName }}() {
if (checked{{ subClassName }}.value.length == 0) {
proxy.$modal.msgError("请先选择要删除的{{ subTable.functionName }}数据");
} else {
const {{ subclassName }}s = {{ subclassName }}List.value;
const checked{{ subClassName }}s = checked{{ subClassName }}.value;
{{ subclassName }}List.value = {{ subclassName }}s.filter(function(item) {
return checked{{ subClassName }}s.indexOf(item.index) == -1
});
}
}
/** 复选框选中数据 */
function handle{{ subClassName }}SelectionChange(selection) {
checked{{ subClassName }}.value = selection.map(item => item.index)
}
{% endif %}
/** 导出按钮操作 */
function handleExport() {
proxy.download('{{ moduleName }}/{{ businessName }}/export', {
...queryParams.value
}, `{{ businessName }}_${new Date().getTime()}.xlsx`);
}
getList();
</script>

View File

@@ -22,6 +22,7 @@ from module_admin.controller.post_controler import postController
from module_admin.controller.role_controller import roleController
from module_admin.controller.server_controller import serverController
from module_admin.controller.user_controller import userController
from module_generator.controller.gen_controller import genController
from sub_applications.handle import handle_sub_applications
from utils.common_util import worship
from utils.log_util import logger
@@ -77,6 +78,7 @@ controller_list = [
{'router': serverController, 'tags': ['系统监控-菜单管理']},
{'router': cacheController, 'tags': ['系统监控-缓存监控']},
{'router': commonController, 'tags': ['通用模块']},
{'router': genController, 'tags': ['代码生成']},
]
for controller in controller_list:

View File

@@ -913,8 +913,8 @@ create table gen_table_column (
column_name varchar(200),
column_comment varchar(500),
column_type varchar(100),
java_type varchar(500),
java_field varchar(200),
python_type varchar(500),
python_field varchar(200),
is_pk char(1),
is_increment char(1),
is_required char(1),
@@ -937,8 +937,8 @@ comment on column gen_table_column.table_id is '归属表编号';
comment on column gen_table_column.column_name is '列名称';
comment on column gen_table_column.column_comment is '列描述';
comment on column gen_table_column.column_type is '列类型';
comment on column gen_table_column.java_type is 'JAVA类型';
comment on column gen_table_column.java_field is 'JAVA字段名';
comment on column gen_table_column.python_type is 'PYTHON类型';
comment on column gen_table_column.python_field is 'PYTHON字段名';
comment on column gen_table_column.is_pk is '是否主键1是';
comment on column gen_table_column.is_increment is '是否自增1是';
comment on column gen_table_column.is_required is '是否必填1是';

View File

@@ -691,8 +691,8 @@ create table gen_table_column (
column_name varchar(200) comment '列名称',
column_comment varchar(500) comment '列描述',
column_type varchar(100) comment '列类型',
java_type varchar(500) comment 'JAVA类型',
java_field varchar(200) comment 'JAVA字段名',
python_type varchar(500) comment 'PYTHON类型',
python_field varchar(200) comment 'PYTHON字段名',
is_pk char(1) comment '是否主键1是',
is_increment char(1) comment '是否自增1是',
is_required char(1) comment '是否必填1是',

View File

@@ -7,6 +7,7 @@ from openpyxl.styles import Alignment, PatternFill
from openpyxl.utils import get_column_letter
from openpyxl.worksheet.datavalidation import DataValidation
from sqlalchemy.engine.row import Row
from sqlalchemy.orm.collections import InstrumentedList
from typing import Any, Dict, List, Literal, Union
from config.database import Base
from config.env import CachePathConfig
@@ -58,6 +59,9 @@ class SqlalchemyUtil:
if isinstance(obj, Base):
base_dict = obj.__dict__.copy()
base_dict.pop('_sa_instance_state', None)
for name, value in base_dict.items():
if isinstance(value, InstrumentedList):
base_dict[name] = cls.serialize_result(value, 'snake_to_camel')
elif isinstance(obj, dict):
base_dict = obj.copy()
if transform_case == 'snake_to_camel':

View File

@@ -0,0 +1,152 @@
import re
from datetime import datetime
from typing import List
from config.constant import GenConstant
from config.env import GenConfig
from module_generator.entity.vo.gen_vo import GenTableColumnModel, GenTableModel
from utils.string_util import StringUtil
class GenUtils:
"""代码生成器工具类"""
@classmethod
def init_table(cls, gen_table: GenTableModel, oper_name: str) -> None:
"""初始化表信息"""
gen_table.class_name = cls.convert_class_name(gen_table.table_name)
gen_table.package_name = GenConfig.package_name
gen_table.module_name = cls.get_module_name(GenConfig.package_name)
gen_table.business_name = cls.get_business_name(gen_table.table_name)
gen_table.function_name = cls.replace_text(gen_table.table_comment)
gen_table.function_author = GenConfig.author
gen_table.create_by = oper_name
gen_table.create_time = datetime.now()
gen_table.update_by = oper_name
gen_table.update_time = datetime.now()
@classmethod
def init_column_field(cls, column: GenTableColumnModel, table: GenTableModel) -> None:
"""初始化列属性字段"""
data_type = cls.get_db_type(column.column_type)
column_name = column.column_name
column.table_id = table.table_id
column.create_by = table.create_by
# 设置Python字段名
column.python_field = cls.to_camel_case(column_name)
# 设置默认类型
column.python_type = StringUtil.get_mapping_value_by_key_ignore_case(GenConstant.MYSQL_TO_PYTHON_TYPE_MAPPING, data_type)
column.query_type = GenConstant.QUERY_EQ
if cls.arrays_contains(GenConstant.COLUMNTYPE_STR, data_type) or cls.arrays_contains(
GenConstant.COLUMNTYPE_TEXT, data_type
):
# 字符串长度超过500设置为文本域
column_length = cls.get_column_length(column.column_type)
html_type = (
GenConstant.HTML_TEXTAREA
if column_length >= 500 or cls.arrays_contains(GenConstant.COLUMNTYPE_TEXT, data_type)
else GenConstant.HTML_INPUT
)
column.html_type = html_type
elif cls.arrays_contains(GenConstant.COLUMNTYPE_TIME, data_type):
column.html_type = GenConstant.HTML_DATETIME
elif cls.arrays_contains(GenConstant.COLUMNTYPE_NUMBER, data_type):
column.html_type = GenConstant.HTML_INPUT
# 插入字段(默认所有字段都需要插入)
column.is_insert = GenConstant.REQUIRE
# 编辑字段
if not cls.arrays_contains(GenConstant.COLUMNNAME_NOT_EDIT, column_name) and not column.is_pk:
column.is_edit = GenConstant.REQUIRE
# 列表字段
if not cls.arrays_contains(GenConstant.COLUMNNAME_NOT_LIST, column_name) and not column.is_pk:
column.is_list = GenConstant.REQUIRE
# 查询字段
if not cls.arrays_contains(GenConstant.COLUMNNAME_NOT_QUERY, column_name) and not column.is_pk:
column.is_query = GenConstant.REQUIRE
# 查询字段类型
if column_name.lower().endswith('name'):
column.query_type = GenConstant.QUERY_LIKE
# 状态字段设置单选框
if column_name.lower().endswith('status'):
column.html_type = GenConstant.HTML_RADIO
# 类型&性别字段设置下拉框
elif column_name.lower().endswith('type') or column_name.lower().endswith('sex'):
column.html_type = GenConstant.HTML_SELECT
# 图片字段设置图片上传控件
elif column_name.lower().endswith('image'):
column.html_type = GenConstant.HTML_IMAGE_UPLOAD
# 文件字段设置文件上传控件
elif column_name.lower().endswith('file'):
column.html_type = GenConstant.HTML_FILE_UPLOAD
# 内容字段设置富文本控件
elif column_name.lower().endswith('content'):
column.html_type = GenConstant.HTML_EDITOR
@classmethod
def arrays_contains(cls, arr: List[str], target_value: str) -> bool:
"""校验数组是否包含指定值"""
return target_value in arr
@classmethod
def get_module_name(cls, package_name: str) -> str:
"""获取模块名"""
return package_name.split('.')[-1]
@classmethod
def get_business_name(cls, table_name: str) -> str:
"""获取业务名"""
return table_name.split('_')[-1]
@classmethod
def convert_class_name(cls, table_name: str) -> str:
"""表名转换成Python类名"""
auto_remove_pre = GenConfig.auto_remove_pre
table_prefix = GenConfig.table_prefix
if auto_remove_pre and table_prefix:
search_list = table_prefix.split(',')
table_name = cls.replace_first(table_name, search_list)
return StringUtil.convert_to_camel_case(table_name)
@classmethod
def replace_first(cls, replacement: str, search_list: List[str]) -> str:
"""批量替换前缀"""
for search_string in search_list:
if replacement.startswith(search_string):
return replacement.replace(search_string, '', 1)
return replacement
@classmethod
def replace_text(cls, text: str) -> str:
"""关键字替换"""
return re.sub(r'(?:表|若依)', '', text)
@classmethod
def get_db_type(cls, column_type: str) -> str:
"""获取数据库类型字段"""
if '(' in column_type:
return column_type.split('(')[0]
return column_type
@classmethod
def get_column_length(cls, column_type: str) -> int:
"""获取字段长度"""
if '(' in column_type:
length = len(column_type.split('(')[1].split(')')[0])
return length
return 0
@classmethod
def split_column_type(cls, column_type: str) -> List[str]:
"""拆分列类型"""
if '(' in column_type and ')' in column_type:
return column_type.split('(')[1].split(')')[0].split(',')
return []
@classmethod
def to_camel_case(cls, text: str) -> str:
"""将字符串转换为驼峰命名"""
parts = text.split('_')
return parts[0] + ''.join(word.capitalize() for word in parts[1:])

View File

@@ -1,4 +1,4 @@
from typing import List
from typing import Dict, List
from config.constant import CommonConstant
@@ -36,6 +36,16 @@ class StringUtil:
"""
return string is None or len(string) == 0
@classmethod
def is_not_empty(cls, string: str) -> bool:
"""
校验字符串是否不是''和None
:param string: 需要校验的字符串
:return: 校验结果
"""
return not cls.is_empty(string)
@classmethod
def is_http(cls, link: str):
"""
@@ -49,7 +59,7 @@ class StringUtil:
@classmethod
def contains_ignore_case(cls, search_str: str, compare_str: str):
"""
查找指定字符串是否包含指定字符串同时忽略大小写
查找指定字符串是否包含指定字符串同时忽略大小写
:param search_str: 查找的字符串
:param compare_str: 比对的字符串
@@ -62,15 +72,40 @@ class StringUtil:
@classmethod
def contains_any_ignore_case(cls, search_str: str, compare_str_list: List[str]):
"""
查找指定字符串是否包含指定字符串列表中的任意一个字符串同时忽略大小写
查找指定字符串是否包含指定字符串列表中的任意一个字符串同时忽略大小写
:param search_str: 查找的字符串
:param compare_str_list: 比对的字符串列表
:return: 查找结果
"""
if search_str and compare_str_list:
for compare_str in compare_str_list:
return cls.contains_ignore_case(search_str, compare_str)
return any([cls.contains_ignore_case(search_str, compare_str) for compare_str in compare_str_list])
return False
@classmethod
def equals_ignore_case(cls, search_str: str, compare_str: str):
"""
比较两个字符串是否相等同时忽略大小写
:param search_str: 查找的字符串
:param compare_str: 比对的字符串
:return: 比较结果
"""
if search_str and compare_str:
return search_str.lower() == compare_str.lower()
return False
@classmethod
def equals_any_ignore_case(cls, search_str: str, compare_str_list: List[str]):
"""
比较指定字符串是否与指定字符串列表中的任意一个字符串相等同时忽略大小写
:param search_str: 查找的字符串
:param compare_str_list: 比对的字符串列表
:return: 比较结果
"""
if search_str and compare_str_list:
return any([cls.equals_ignore_case(search_str, compare_str) for compare_str in compare_str_list])
return False
@classmethod
@@ -98,3 +133,38 @@ class StringUtil:
if search_str and compare_str_list:
return any([cls.startswith_case(search_str, compare_str) for compare_str in compare_str_list])
return False
@classmethod
def convert_to_camel_case(cls, name: str) -> str:
"""
将下划线大写方式命名的字符串转换为驼峰式。如果转换前的下划线大写方式命名的字符串为空,则返回空字符串
:param name: 转换前的下划线大写方式命名的字符串
:return: 转换后的驼峰式命名的字符串
"""
if not name:
return ''
if '_' not in name:
return name[0].upper() + name[1:]
parts = name.split('_')
result = []
for part in parts:
if not part:
continue
result.append(part[0].upper() + part[1:].lower())
return ''.join(result)
@classmethod
def get_mapping_value_by_key_ignore_case(cls, mapping: Dict[str, str], key: str) -> str:
"""
根据忽略大小写的键获取字典中的对应的值
param mapping: 字典
param key: 字典的键
:return: 字典键对应的值
"""
for k, v in mapping.items():
if key.lower() == k.lower():
return v
return ''

View File

@@ -0,0 +1,296 @@
import json
import os
from datetime import datetime
from jinja2 import Environment, FileSystemLoader
from typing import Dict, List, Set
from config.constant import GenConstant
from module_generator.entity.vo.gen_vo import GenTableModel, GenTableColumnModel
from utils.common_util import CamelCaseUtil, SnakeCaseUtil
from utils.string_util import StringUtil
class TemplateInitializer:
"""
模板引擎初始化类
"""
@classmethod
def init_jinja2(cls):
"""
初始化 Jinja2 模板引擎
:return: Jinja2 环境对象
"""
try:
template_dir = os.path.join(os.getcwd(), 'module_generator', 'templates')
env = Environment(
loader=FileSystemLoader(template_dir),
keep_trailing_newline=True,
trim_blocks=True,
lstrip_blocks=True,
)
env.filters.update(
{
'camel_to_snake': SnakeCaseUtil.camel_to_snake,
'snake_to_camel': CamelCaseUtil.snake_to_camel,
'get_sqlalchemy_type': TemplateUtils.get_sqlalchemy_type,
}
)
return env
except Exception as e:
raise RuntimeError(f'初始化Jinja2模板引擎失败: {e}')
class TemplateUtils:
"""
模板工具类
"""
# 项目路径
FRONTEND_PROJECT_PATH = 'frontend'
BACKEND_PROJECT_PATH = 'backend'
DEFAULT_PARENT_MENU_ID = '3'
@classmethod
def prepare_context(cls, gen_table: GenTableModel):
"""
准备模板变量
:param gen_table: 生成表的配置信息
:return: 模板上下文字典
"""
class_name = gen_table.class_name
module_name = gen_table.module_name
business_name = gen_table.business_name
package_name = gen_table.package_name
tpl_category = gen_table.tpl_category
function_name = gen_table.function_name
context = {
'tplCategory': tpl_category,
'tableName': gen_table.table_name,
'functionName': function_name if StringUtil.is_not_empty(function_name) else '【请填写功能名称】',
'ClassName': class_name,
'className': class_name.lower(),
'moduleName': module_name,
'BusinessName': business_name.capitalize(),
'businessName': business_name,
'basePackage': cls.get_package_prefix(package_name),
'packageName': package_name,
'author': gen_table.function_author,
'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'pkColumn': gen_table.pk_column,
'importList': cls.get_import_list(gen_table),
'permissionPrefix': cls.get_permission_prefix(module_name, business_name),
'columns': gen_table.columns,
'table': gen_table,
'dicts': cls.get_dicts(gen_table),
}
# 设置菜单、树形结构、子表的上下文
cls.set_menu_context(context, gen_table)
if tpl_category == GenConstant.TPL_TREE:
cls.set_tree_context(context, gen_table)
if tpl_category == GenConstant.TPL_SUB:
cls.set_sub_context(context, gen_table)
return context
@classmethod
def set_menu_context(cls, context: Dict, gen_table: GenTableModel):
"""设置菜单上下文"""
options = gen_table.options
params_obj = json.loads(options)
context['parentMenuId'] = cls.get_parent_menu_id(params_obj)
@classmethod
def set_tree_context(cls, context: Dict, gen_table: GenTableModel):
"""设置树形结构上下文"""
options = gen_table.options
params_obj = json.loads(options)
context['treeCode'] = cls.get_tree_code(params_obj)
context['treeParentCode'] = cls.get_tree_parent_code(params_obj)
context['treeName'] = cls.get_tree_name(params_obj)
context['expandColumn'] = cls.get_expand_column(gen_table)
@classmethod
def set_sub_context(cls, context: Dict, gen_table: GenTableModel):
"""设置子表上下文"""
sub_table = gen_table.sub_table
sub_table_name = gen_table.sub_table_name
sub_table_fk_name = gen_table.sub_table_fk_name
sub_class_name = sub_table.class_name
sub_table_fk_class_name = StringUtil.convert_to_camel_case(sub_table_fk_name)
context['subTable'] = sub_table
context['subTableName'] = sub_table_name
context['subTableFkName'] = sub_table_fk_name
context['subTableFkClassName'] = sub_table_fk_class_name
context['subTableFkclassName'] = sub_table_fk_class_name.lower()
context['subClassName'] = sub_class_name
context['subclassName'] = sub_class_name.lower()
context['subImportList'] = cls.get_import_list(sub_table)
@classmethod
def get_template_list(cls, tpl_category, tpl_web_type):
"""获取模板列表"""
use_web_type = 'vue'
if tpl_web_type == 'element-plus':
use_web_type = 'vue/v3'
templates = [
'python/controller.py.jinja2',
'python/dao.py.jinja2',
'python/do.py.jinja2',
'python/service.py.jinja2',
'python/vo.py.jinja2',
'sql/sql.jinja2',
'js/api.js.jinja2',
]
if tpl_category == GenConstant.TPL_CRUD:
templates.append(f'{use_web_type}/index.vue.jinja2')
elif tpl_category == GenConstant.TPL_TREE:
templates.append(f'{use_web_type}/index-tree.vue.jinja2')
elif tpl_category == GenConstant.TPL_SUB:
templates.append(f'{use_web_type}/index.vue.jinja2')
# templates.append('python/sub-domain.python.jinja2')
return templates
@classmethod
def get_file_name(cls, template, gen_table: GenTableModel):
"""根据模板生成文件名"""
package_name = gen_table.package_name
module_name = gen_table.module_name
business_name = gen_table.business_name
vue_path = cls.FRONTEND_PROJECT_PATH
python_path = f"{cls.BACKEND_PROJECT_PATH}/{package_name.replace('.', '/')}"
if 'controller.py.jinja2' in template:
return f'{python_path}/controller/{business_name}_controller.py'
elif 'dao.py.jinja2' in template:
return f'{python_path}/dao/{business_name}_dao.py'
elif 'do.py.jinja2' in template:
return f'{python_path}/entity/do/{business_name}_do.py'
elif 'service.py.jinja2' in template:
return f'{python_path}/service/{business_name}_service.py'
elif 'vo.py.jinja2' in template:
return f'{python_path}/entity/vo/{business_name}_vo.py'
elif 'sql.jinja2' in template:
return f'{cls.BACKEND_PROJECT_PATH}/sql/{business_name}_menu.sql'
elif 'api.js.jinja2' in template:
return f'{vue_path}/api/{module_name}/{business_name}.js'
elif 'index.vue.jinja2' in template or 'index-tree.vue.j2' in template:
return f'{vue_path}/views/{module_name}/{business_name}/index.vue'
return ''
@classmethod
def get_package_prefix(cls, package_name: str):
"""获取包前缀"""
return package_name[: package_name.rfind('.')]
@classmethod
def get_import_list(cls, gen_table: GenTableModel):
"""获取导入包列表"""
columns = gen_table.columns or []
sub_gen_table = gen_table.sub_table
import_list = set()
if sub_gen_table is not None:
import_list.add('python.util.List')
for column in columns:
if not column.super_column and column.python_type in GenConstant.TYPE_DATE:
import_list.add(f'from datetime import {column.python_type}')
elif not column.super_column and column.python_type == GenConstant.TYPE_DECIMAL:
import_list.add('from decimal import Decimal')
return list(import_list)
@classmethod
def get_dicts(cls, gen_table: GenTableModel):
"""获取字典列表"""
columns = gen_table.columns or []
dicts = set()
cls.add_dicts(dicts, columns)
if gen_table.sub_table is not None:
cls.add_dicts(dicts, gen_table.sub_table.columns)
return ', '.join(dicts)
@classmethod
def add_dicts(cls, dicts: Set[str], columns: List[GenTableColumnModel]):
"""添加字典列表"""
for column in columns:
if (
column.super_column
and StringUtil.is_not_empty(column.dict_type)
and StringUtil.equals_any_ignore_case(
column.html_type, [GenConstant.HTML_SELECT, GenConstant.HTML_RADIO, GenConstant.HTML_CHECKBOX]
)
):
dicts.add(f"'{column.dict_type}'")
@classmethod
def get_permission_prefix(cls, module_name: str, business_name: str):
"""获取权限前缀"""
return f'{module_name}:{business_name}'
@classmethod
def get_parent_menu_id(cls, params_obj):
"""获取上级菜单ID"""
if params_obj and params_obj.get(GenConstant.PARENT_MENU_ID):
return params_obj.get(GenConstant.PARENT_MENU_ID)
return cls.DEFAULT_PARENT_MENU_ID
@classmethod
def get_tree_code(cls, params_obj: Dict):
"""获取树编码"""
if GenConstant.TREE_CODE in params_obj:
return cls.to_camel_case(params_obj.get(GenConstant.TREE_CODE))
return ''
@classmethod
def get_tree_parent_code(cls, params_obj: Dict):
"""获取树父编码"""
if GenConstant.TREE_PARENT_CODE in params_obj:
return cls.to_camel_case(params_obj.get(GenConstant.TREE_PARENT_CODE))
return ''
@classmethod
def get_tree_name(cls, params_obj: Dict):
"""获取树名称"""
if GenConstant.TREE_NAME in params_obj:
return cls.to_camel_case(params_obj.get(GenConstant.TREE_NAME))
return ''
@classmethod
def get_expand_column(cls, gen_table: GenTableModel):
"""获取展开列"""
options = gen_table.options
params_obj = json.loads(options)
tree_name = params_obj.get(GenConstant.TREE_NAME)
num = 0
for column in gen_table.columns or []:
if column.list:
num += 1
if column.column_name == tree_name:
break
return num
@classmethod
def to_camel_case(cls, text: str) -> str:
"""将字符串转换为驼峰命名"""
parts = text.split('_')
return parts[0] + ''.join(word.capitalize() for word in parts[1:])
@classmethod
def get_sqlalchemy_type(cls, column_type: str):
if '(' in column_type:
column_type_list = column_type.split('(')
sqlalchemy_type = (
StringUtil.get_mapping_value_by_key_ignore_case(
GenConstant.MYSQL_TO_SQLALCHEMY_TYPE_MAPPING, column_type_list[0]
)
+ '('
+ column_type_list[1]
)
else:
sqlalchemy_type = StringUtil.get_mapping_value_by_key_ignore_case(
GenConstant.MYSQL_TO_SQLALCHEMY_TYPE_MAPPING, column_type
)
return sqlalchemy_type