from __future__ import annotations from celery import Celery from app.core.config import get_settings def _env_prefix(app_env: str) -> str: """ 根据环境生成前缀: - dev -> dev - prod -> pro 说明:Redis ACL 限制使用 `dev:*` / `pro:*`。 """ return "dev" if app_env == "dev" else "pro" settings = get_settings() prefix = _env_prefix(settings.app_env) # 关键:使用 Redis transport 的全局 key 前缀,确保所有 broker key 都在 ACL 允许范围内 broker_transport_options = {"global_keyprefix": f"{prefix}:"} celery_app = Celery( "mindfulness", broker=settings.celery_broker_url, backend=settings.celery_result_backend, broker_transport_options=broker_transport_options, ) # 默认不存结果(降低 Redis 占用)。如需结果存储,可在业务中显式开启并设置 TTL。 celery_app.conf.update( task_ignore_result=True if not settings.celery_result_backend else False, task_default_queue=f"{prefix}:celery", task_default_exchange=f"{prefix}:celery", task_default_routing_key=f"{prefix}:celery", ) # 自动发现任务(app/tasks 下的 shared_task) celery_app.autodiscover_tasks(["app.tasks"])