我有一个Flask
REST API,该API利用Celery
运行异步请求。
想法是async=1
查询参数指示应异步处理请求(立即返回客户端以后将使用的任务ID)。
同时,当等待处理的事务过多时,我想避免接受新任务。
下面的代码可以工作,但是accepting_new_tasks()
大约需要2秒,这太慢了。
Celery中是否有一个配置(或其他东西)可以限制等待任务的数量?还是获得等待任务数量更快的方法?
import math
from celery import Celery
from flask import abort, Flask, jsonify, request
flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")
@flask_app.route("/")
def home():
async_ = request.args.get("async")
settings = request.args.get("settings")
if async_:
if not accepting_new_tasks(celery_app):
return abort(503)
task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
return jsonify({"taskId": task.id})
return jsonify({})
def accepting_new_tasks(celery_app):
inspector = celery_app.control.inspect()
nodes_stats = inspector.stats()
nodes_reserved = inspector.reserved()
workers = 0
for stats in nodes_stats.values():
workers += stats["pool"]["max-concurrency"]
waiting_tasks = 0
for reserved in nodes_reserved.values():
waiting_tasks += len(reserved)
return waiting_tasks < math.ceil(workers / 3)
参考方案
最后,我通过查询https://stackoverflow.com/a/27074594/4183498指出的RabbitMQ管理API来解决了这个问题。
import math
from celery import Celery
from flask import abort, Flask, jsonify, request
from requests import get
from requests.auth import HTTPBasicAuth
flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")
def get_workers_count():
inspector = celery_app.control.inspect()
nodes_stats = inspector.stats()
nodes_reserved = inspector.reserved()
workers = 0
for stats in nodes_stats.values():
workers += stats["pool"]["max-concurrency"]
return workers
WORKERS_COUNT = get_workers_count()
@flask_app.route("/")
def home():
async_ = request.args.get("async")
settings = request.args.get("settings")
if async_:
if not accepting_new_tasks(celery_app):
return abort(503)
task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
return jsonify({"taskId": task.id})
return jsonify({})
def accepting_new_tasks(celery_app):WORKERS_COUNT
auth = HTTPBasicAuth("guest", "guest")
response = get(
"http://localhost:15672/api/queues/my_vhost/celery",
auth=auth
)
waiting_tasks = response.json()["messages"]
return waiting_tasks < math.ceil(WORKERS_COUNT / 3)
Python:在不更改段落顺序的情况下在文件的每个段落中反向单词? - python我想通过反转text_in.txt文件中的单词来生成text_out.txt文件,如下所示:text_in.txt具有两段,如下所示:Hello world, I am Here. I am eighteen years old. text_out.txt应该是这样的:Here. am I world, Hello old. years eighteen a…
用大写字母拆分字符串,但忽略AAA Python Regex - python我的正则表达式:vendor = "MyNameIsJoe. I'mWorkerInAAAinc." ven = re.split(r'(?<=[a-z])[A-Z]|[A-Z](?=[a-z])', vendor) 以大写字母分割字符串,例如:'我的名字是乔。 I'mWorkerInAAAinc”变成…
如何在python中将从PDF提取的文本格式化为json - python我已经使用pyPDF2提取了一些文本格式的发票PDF。我想将此文本文件转换为仅包含重要关键字和令牌的json文件。输出应该是这样的:#PurchaseOrder {"doctype":"PO", "orderingcompany":"Demo Company", "su…
查找字符串中的行数 - python我正在创建一个python电影播放器/制作器,我想在多行字符串中找到行数。我想知道是否有任何内置函数或可以编写代码的函数来做到这一点:x = """ line1 line2 """ getLines(x) python大神给出的解决方案 如果换行符是'\n',则nlines …
我怎样才能从字典的键中算出对象? - python我有这本字典:dict={"asset":[("S3","A1"),"S2",("E4","E5"),("E1","S1"),"A6","A8"], "…