芹菜(Celery):过多排队时,防止添加更多任务 - python

我有一个Flask REST API,该API利用Celery运行异步请求。

想法是async=1查询参数指示应异步处理请求(立即返回客户端以后将使用的任务ID)。

同时,当等待处理的事务过多时,我想避免接受新任务。

下面的代码可以工作,但是accepting_new_tasks()大约需要2秒,这太慢了。

Celery中是否有一个配置(或其他东西)可以限制等待任务的数量?还是获得等待任务数量更快的方法?

import math

from celery import Celery
from flask import abort, Flask, jsonify, request


flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")


@flask_app.route("/")
def home():
    async_ = request.args.get("async")
    settings = request.args.get("settings")

    if async_:
        if not accepting_new_tasks(celery_app):
            return abort(503)

        task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
        return jsonify({"taskId": task.id})

    return jsonify({})


def accepting_new_tasks(celery_app):
    inspector = celery_app.control.inspect()
    nodes_stats = inspector.stats()
    nodes_reserved = inspector.reserved()

    workers = 0
    for stats in nodes_stats.values():
        workers += stats["pool"]["max-concurrency"]

    waiting_tasks = 0
    for reserved in nodes_reserved.values():
        waiting_tasks += len(reserved)

    return waiting_tasks < math.ceil(workers / 3)

参考方案

最后,我通过查询https://stackoverflow.com/a/27074594/4183498指出的RabbitMQ管理API来解决了这个问题。

import math

from celery import Celery
from flask import abort, Flask, jsonify, request
from requests import get
from requests.auth import HTTPBasicAuth


flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")


def get_workers_count():
    inspector = celery_app.control.inspect()
    nodes_stats = inspector.stats()
    nodes_reserved = inspector.reserved()

    workers = 0
    for stats in nodes_stats.values():
        workers += stats["pool"]["max-concurrency"]

    return workers


WORKERS_COUNT = get_workers_count()


@flask_app.route("/")
def home():
    async_ = request.args.get("async")
    settings = request.args.get("settings")

    if async_:
        if not accepting_new_tasks(celery_app):
            return abort(503)

        task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
        return jsonify({"taskId": task.id})

    return jsonify({})


def accepting_new_tasks(celery_app):WORKERS_COUNT
    auth = HTTPBasicAuth("guest", "guest")
    response = get(
        "http://localhost:15672/api/queues/my_vhost/celery",
         auth=auth
    )
    waiting_tasks = response.json()["messages"]
    return waiting_tasks < math.ceil(WORKERS_COUNT / 3)

Python:在不更改段落顺序的情况下在文件的每个段落中反向单词? - python

我想通过反转text_in.txt文件中的单词来生成text_out.txt文件,如下所示:text_in.txt具有两段,如下所示:Hello world, I am Here. I am eighteen years old. text_out.txt应该是这样的:Here. am I world, Hello old. years eighteen a…

用大写字母拆分字符串,但忽略AAA Python Regex - python

我的正则表达式:vendor = "MyNameIsJoe. I'mWorkerInAAAinc." ven = re.split(r'(?<=[a-z])[A-Z]|[A-Z](?=[a-z])', vendor) 以大写字母分割字符串,例如:'我的名字是乔。 I'mWorkerInAAAinc”变成…

如何在python中将从PDF提取的文本格式化为json - python

我已经使用pyPDF2提取了一些文本格式的发票PDF。我想将此文本文件转换为仅包含重要关键字和令牌的json文件。输出应该是这样的:#PurchaseOrder {"doctype":"PO", "orderingcompany":"Demo Company", "su…

查找字符串中的行数 - python

我正在创建一个python电影播放器​​/制作器,我想在多行字符串中找到行数。我想知道是否有任何内置函数或可以编写代码的函数来做到这一点:x = """ line1 line2 """ getLines(x) python大神给出的解决方案 如果换行符是'\n',则nlines …

我怎样才能从字典的键中算出对象? - python

我有这本字典:dict={"asset":[("S3","A1"),"S2",("E4","E5"),("E1","S1"),"A6","A8"], "…