refactor: 重构数据库orm为异步 #I9MRQS
This commit is contained in:
@@ -1,19 +1,23 @@
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker
|
||||
from sqlalchemy.ext.asyncio import AsyncAttrs
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from urllib.parse import quote_plus
|
||||
from config.env import DataBaseConfig
|
||||
|
||||
SQLALCHEMY_DATABASE_URL = f"mysql+pymysql://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@" \
|
||||
ASYNC_SQLALCHEMY_DATABASE_URL = f"mysql+asyncmy://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@" \
|
||||
f"{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}"
|
||||
|
||||
engine = create_engine(
|
||||
SQLALCHEMY_DATABASE_URL,
|
||||
async_engine = create_async_engine(
|
||||
ASYNC_SQLALCHEMY_DATABASE_URL,
|
||||
echo=DataBaseConfig.db_echo,
|
||||
max_overflow=DataBaseConfig.db_max_overflow,
|
||||
pool_size=DataBaseConfig.db_pool_size,
|
||||
pool_recycle=DataBaseConfig.db_pool_recycle,
|
||||
pool_timeout=DataBaseConfig.db_pool_timeout
|
||||
)
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
Base = declarative_base()
|
||||
AsyncSessionLocal = async_sessionmaker(autocommit=False, autoflush=False, bind=async_engine)
|
||||
|
||||
|
||||
class Base(AsyncAttrs, DeclarativeBase):
|
||||
pass
|
||||
|
@@ -2,16 +2,13 @@ from config.database import *
|
||||
from utils.log_util import logger
|
||||
|
||||
|
||||
def get_db_pro():
|
||||
async def get_db():
|
||||
"""
|
||||
每一个请求处理完毕后会关闭当前连接,不同的请求使用不同的连接
|
||||
:return:
|
||||
"""
|
||||
current_db = SessionLocal()
|
||||
try:
|
||||
async with AsyncSessionLocal() as current_db:
|
||||
yield current_db
|
||||
finally:
|
||||
current_db.close()
|
||||
|
||||
|
||||
async def init_create_table():
|
||||
@@ -20,8 +17,6 @@ async def init_create_table():
|
||||
:return:
|
||||
"""
|
||||
logger.info("初始化数据库连接...")
|
||||
Base.metadata.create_all(bind=engine)
|
||||
async with async_engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
logger.info("数据库连接成功")
|
||||
|
||||
|
||||
get_db = get_db_pro
|
||||
|
@@ -3,7 +3,7 @@ from redis.exceptions import AuthenticationError, TimeoutError, RedisError
|
||||
from module_admin.service.dict_service import DictDataService
|
||||
from module_admin.service.config_service import ConfigService
|
||||
from config.env import RedisConfig
|
||||
from config.database import SessionLocal
|
||||
from config.database import AsyncSessionLocal
|
||||
from utils.log_util import logger
|
||||
|
||||
|
||||
@@ -59,10 +59,8 @@ class RedisUtil:
|
||||
:param redis: redis对象
|
||||
:return:
|
||||
"""
|
||||
session = SessionLocal()
|
||||
await DictDataService.init_cache_sys_dict_services(session, redis)
|
||||
|
||||
session.close()
|
||||
async with AsyncSessionLocal() as session:
|
||||
await DictDataService.init_cache_sys_dict_services(session, redis)
|
||||
|
||||
@classmethod
|
||||
async def init_sys_config(cls, redis):
|
||||
@@ -71,7 +69,5 @@ class RedisUtil:
|
||||
:param redis: redis对象
|
||||
:return:
|
||||
"""
|
||||
session = SessionLocal()
|
||||
await ConfigService.init_cache_sys_config_services(session, redis)
|
||||
|
||||
session.close()
|
||||
async with AsyncSessionLocal() as session:
|
||||
await ConfigService.init_cache_sys_config_services(session, redis)
|
||||
|
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
|
||||
from apscheduler.jobstores.memory import MemoryJobStore
|
||||
@@ -5,12 +6,13 @@ from apscheduler.jobstores.redis import RedisJobStore
|
||||
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from apscheduler.events import EVENT_ALL
|
||||
import json
|
||||
from sqlalchemy.engine import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from datetime import datetime, timedelta
|
||||
from config.database import engine, SQLALCHEMY_DATABASE_URL, SessionLocal
|
||||
from config.env import RedisConfig
|
||||
from config.database import quote_plus, AsyncSessionLocal
|
||||
from config.env import DataBaseConfig, RedisConfig
|
||||
from module_admin.service.job_log_service import JobLogService, JobLogModel
|
||||
from module_admin.dao.job_dao import Session, JobDao
|
||||
from module_admin.dao.job_dao import JobDao
|
||||
from utils.log_util import logger
|
||||
import module_task
|
||||
|
||||
@@ -65,6 +67,17 @@ class MyCronTrigger(CronTrigger):
|
||||
diff += 1
|
||||
|
||||
|
||||
SQLALCHEMY_DATABASE_URL = f"mysql+pymysql://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@" \
|
||||
f"{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}"
|
||||
engine = create_engine(
|
||||
SQLALCHEMY_DATABASE_URL,
|
||||
echo=DataBaseConfig.db_echo,
|
||||
max_overflow=DataBaseConfig.db_max_overflow,
|
||||
pool_size=DataBaseConfig.db_pool_size,
|
||||
pool_recycle=DataBaseConfig.db_pool_recycle,
|
||||
pool_timeout=DataBaseConfig.db_pool_timeout
|
||||
)
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
job_stores = {
|
||||
'default': MemoryJobStore(),
|
||||
'sqlalchemy': SQLAlchemyJobStore(url=SQLALCHEMY_DATABASE_URL, engine=engine),
|
||||
@@ -96,20 +109,20 @@ class SchedulerUtil:
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
async def init_system_scheduler(cls, query_db: Session = SessionLocal()):
|
||||
async def init_system_scheduler(cls):
|
||||
"""
|
||||
应用启动时初始化定时任务
|
||||
:return:
|
||||
"""
|
||||
logger.info("开始启动定时任务...")
|
||||
scheduler.start()
|
||||
job_list = JobDao.get_job_list_for_scheduler(query_db)
|
||||
for item in job_list:
|
||||
query_job = cls.get_scheduler_job(job_id=str(item.job_id))
|
||||
if query_job:
|
||||
cls.remove_scheduler_job(job_id=str(item.job_id))
|
||||
cls.add_scheduler_job(item)
|
||||
query_db.close()
|
||||
async with AsyncSessionLocal() as session:
|
||||
job_list = await JobDao.get_job_list_for_scheduler(session)
|
||||
for item in job_list:
|
||||
query_job = cls.get_scheduler_job(job_id=str(item.job_id))
|
||||
if query_job:
|
||||
cls.remove_scheduler_job(job_id=str(item.job_id))
|
||||
cls.add_scheduler_job(item)
|
||||
scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
|
||||
logger.info("系统初始定时任务加载成功")
|
||||
|
||||
@@ -225,7 +238,8 @@ class SchedulerUtil:
|
||||
jobTrigger=job_trigger,
|
||||
jobMessage=job_message,
|
||||
status=status,
|
||||
exceptionInfo=exception_info
|
||||
exceptionInfo=exception_info,
|
||||
createTime=datetime.now()
|
||||
)
|
||||
session = SessionLocal()
|
||||
JobLogService.add_job_log_services(session, job_log)
|
||||
|
Reference in New Issue
Block a user