Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions ruoyi-fastapi-backend/config/get_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import importlib
import json
from asyncio import iscoroutinefunction
from datetime import datetime, timedelta
from typing import Optional, Union
from typing import Any, Callable, Optional, Union

from apscheduler.events import EVENT_ALL, SchedulerEvent
from apscheduler.executors.asyncio import AsyncIOExecutor
Expand Down Expand Up @@ -152,6 +153,18 @@ async def close_system_scheduler(cls) -> None:
scheduler.shutdown()
logger.info('✅️ 关闭定时任务成功')

@classmethod
def _import_function(cls, func_path: str) -> Callable[..., Any]:
"""
动态导入函数

:param func_path: 函数字符串,如module_task.scheduler_test.job
:return: 导入的函数对象
"""
module_path, func_name = func_path.rsplit('.', 1)
module = importlib.import_module(module_path)
return getattr(module, func_name)

@classmethod
def get_scheduler_job(cls, job_id: Union[str, int]) -> Job:
"""
Expand All @@ -172,12 +185,12 @@ def add_scheduler_job(cls, job_info: JobModel) -> None:
:param job_info: 任务对象信息
:return:
"""
job_func = eval(job_info.invoke_target)
job_func = cls._import_function(job_info.invoke_target)
job_executor = job_info.job_executor
if iscoroutinefunction(job_func):
job_executor = 'default'
scheduler.add_job(
func=eval(job_info.invoke_target),
func=job_func,
trigger=MyCronTrigger.from_crontab(job_info.cron_expression),
args=job_info.job_args.split(',') if job_info.job_args else None,
kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None,
Expand All @@ -198,15 +211,15 @@ def execute_scheduler_job_once(cls, job_info: JobModel) -> None:
:param job_info: 任务对象信息
:return:
"""
job_func = eval(job_info.invoke_target)
job_func = cls._import_function(job_info.invoke_target)
job_executor = job_info.job_executor
if iscoroutinefunction(job_func):
job_executor = 'default'
job_trigger = DateTrigger()
if job_info.status == '0':
job_trigger = OrTrigger(triggers=[DateTrigger(), MyCronTrigger.from_crontab(job_info.cron_expression)])
scheduler.add_job(
func=eval(job_info.invoke_target),
func=job_func,
trigger=job_trigger,
args=job_info.job_args.split(',') if job_info.job_args else None,
kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None,
Expand Down