自动发现芹菜任务 - python

我有以下项目树(出于测试目的),并且我试图了解Celery如何加载任务。

app
├── __init__.py
├── app.py
├── celery.py
└── my_tasks
    ├── __init__.py
    └── tasks.py

celery.py包含以下用于创建Celery实例的代码:

from celery import Celery

app = Celery("app", backend="rpc://", broker="redis://localhost:6379/0")
app.autodiscover_tasks()

tasks.py创建一个任务:

from app.celery import app
from time import sleep


@app.task
def run_tests_for_hash():
   # task code here

app.py包含一个带有一个端点的FastAPI应用程序,用于创建任务

from fastapi import FastAPI
from app.call_flow_tasks.tasks import run_tests_for_hash

app = FastAPI(title="Testing Studio runners")

@app.post("/create_task")
def create_task():

    results = run_tests_for_hash.delay()

    return {"task_id": results.id, "status": results.status}

但是尝试从命令行运行Celery worker时出现错误:

$celery -A app worker
Traceback (most recent call last):
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/bin/celery", line 8, in <module>
    sys.exit(main())
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/__main__.py", line 16, in main
    _main()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 322, in main
    cmd.execute_from_commandline(argv)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 495, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/base.py", line 305, in execute_from_commandline
    return self.handle_argv(self.prog_name, argv[1:])
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 487, in handle_argv
    return self.execute(command, argv)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 419, in execute
    ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/worker.py", line 223, in run_from_argv
    return self(*args, **options)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/base.py", line 253, in __call__
    ret = self.run(*args, **kwargs)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/worker.py", line 258, in run
    **kwargs)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/worker/worker.py", line 96, in __init__
    self.app.loader.init_worker()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/loaders/base.py", line 114, in init_worker
    self.import_default_modules()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/loaders/base.py", line 108, in import_default_modules
    raise response
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/utils/dispatch/signal.py", line 288, in send
    response = receiver(signal=self, sender=sender, **named)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 695, in _autodiscover_tasks
    return self._autodiscover_tasks_from_fixups(related_name)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 705, in _autodiscover_tasks_from_fixups
    pkg for fixup in self._fixups
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 706, in <listcomp>
    for pkg in fixup.autodiscover_tasks()
AttributeError: 'NoneType' object has no attribute 'autodiscover_tasks'

命令在与app python包相同的级别上执行。如果删除autodiscover,它将正常工作,并且正常加载了工作程序。关于芹菜如何自动发现任务以及如何从不同模块加载任务的任何帮助?

参考方案

您可以结合两种自动发现:

app.autodiscover_tasks()  # Find tasks using celery.fixups (i.e. Django apps via INSTALLED_APPS).
app.autodiscover_tasks(  # Add other tasks not included in the apps.
    [
        'project.other.tasks',
        'another_project.another.app',
    ]
)

芹菜只有一个BUILTIN_FIXUPS = {'celery.fixups.django:fixup'},请看它是code to understand。

    def autodiscover_tasks(self):
        from django.apps import apps
        return [config.name for config in apps.get_app_configs()]

用大写字母拆分字符串,但忽略AAA Python Regex - python

我的正则表达式:vendor = "MyNameIsJoe. I'mWorkerInAAAinc." ven = re.split(r'(?<=[a-z])[A-Z]|[A-Z](?=[a-z])', vendor) 以大写字母分割字符串,例如:'我的名字是乔。 I'mWorkerInAAAinc”变成…

如何在python中将从PDF提取的文本格式化为json - python

我已经使用pyPDF2提取了一些文本格式的发票PDF。我想将此文本文件转换为仅包含重要关键字和令牌的json文件。输出应该是这样的:#PurchaseOrder {"doctype":"PO", "orderingcompany":"Demo Company", "su…

即使import语句在先前的代码中工作,Python模块在import语句中也有属性错误 - python

我有一个项目突然停止正常运行。我不知道为什么,因为我事先没有对其进行任何更改。构建它时出现以下错误:Traceback (most recent call last): File ".\engine.py", line 7, in <module> from controllers.game_panel_controller …

Python:将两列组合在一起,找到第三列的总和 - python

python真的很新,需要我完成的问题需要一些帮助。我需要根据用户对月份(MM)和年份(YYYY)的输入来找到每个时间段(月/年)的平均收入。我的输入如下:year_value = int(input("Year (YYYY): ")) month_value = int(input("Month (MM): ")) …

查找字符串中的行数 - python

我正在创建一个python电影播放器​​/制作器,我想在多行字符串中找到行数。我想知道是否有任何内置函数或可以编写代码的函数来做到这一点:x = """ line1 line2 """ getLines(x) python大神给出的解决方案 如果换行符是'\n',则nlines …