44 lines
1.2 KiB
Python
44 lines
1.2 KiB
Python
from __future__ import annotations
|
||
|
||
from celery import Celery
|
||
|
||
from app.core.config import get_settings
|
||
|
||
|
||
def _env_prefix(app_env: str) -> str:
|
||
"""
|
||
根据环境生成前缀:
|
||
- dev -> dev
|
||
- prod -> pro
|
||
|
||
说明:Redis ACL 限制使用 `dev:*` / `pro:*`。
|
||
"""
|
||
|
||
return "dev" if app_env == "dev" else "pro"
|
||
|
||
|
||
settings = get_settings()
|
||
prefix = _env_prefix(settings.app_env)
|
||
|
||
# 关键:使用 Redis transport 的全局 key 前缀,确保所有 broker key 都在 ACL 允许范围内
|
||
broker_transport_options = {"global_keyprefix": f"{prefix}:"}
|
||
|
||
celery_app = Celery(
|
||
"mindfulness",
|
||
broker=settings.celery_broker_url,
|
||
backend=settings.celery_result_backend,
|
||
broker_transport_options=broker_transport_options,
|
||
)
|
||
|
||
# 默认不存结果(降低 Redis 占用)。如需结果存储,可在业务中显式开启并设置 TTL。
|
||
celery_app.conf.update(
|
||
task_ignore_result=True if not settings.celery_result_backend else False,
|
||
task_default_queue=f"{prefix}:celery",
|
||
task_default_exchange=f"{prefix}:celery",
|
||
task_default_routing_key=f"{prefix}:celery",
|
||
)
|
||
|
||
# 自动发现任务(app/tasks 下的 shared_task)
|
||
celery_app.autodiscover_tasks(["app.tasks"])
|
||
|