我正在使用烧瓶,设置了一个简单的应用程序,并试图学习如何有效利用芹菜来工作,并在查询时显示结果。
在最基本的示例中,我通过task.delay(args)
创建单个任务。然后,该对象让我提取该作业的ID,稍后可以通过单击其他端点进行查询。简单。
我的目标是模仿这一点,尽管要利用群体。阅读文档后,我发现组原语是惰性的,因此必须先实际调用它才能保存它。
我确定我的问题来自缺乏了解,但基本上是:
如果我的目标是能够通过Flask Pipeline异步并并行运行后台任务组,那么在以下约束条件下,我如何为该组检索.join()
结果
请求Endpoint1(可能返回一个ID)
请求Endpotin2,传递ID以返回工作完成的结果
这是正确的方法吗?还是我应该思考的心态有所不同?
伪代码:
# From my apps init
celery_instance = Celery("module.modulename", backend = app.config['CELERY_RESULT_BACKEND'], broker = app.config['CELERY_BROKER_URL'])
celery_instance.conf.update(app.config)
<snip>
from celery import group
from app import celery_instance
@app.route("/status/domain/<id>", methods=['GET'])
def query(id):
# Works for single job, not job group
result = celery_instance.AsyncResult(id)
...
@app.route("/query/domain/<domain>", methods=['GET'])
def query_by_domain(domain):
...
job = group([task1.delay(domain), task2.delay(domain)])
return redirect(url_for('app.query', id=job.id), code=302)
参考方案
您需要通过GroupResult.save()
方法将组结果保存为:
@app.route("/query/domain/<domain>", methods=['GET'])
def query_by_domain(domain):
...
job = group([task1.delay(domain), task2.delay(domain)])
<b>job.save()</b>
return redirect(url_for('app.query', id=job.id), code=302)
然后,您可以通过GroupResult
方法获取GroupResult.restore()
。
@app.route("/status/domain/<id>", methods=['GET'])
def query(id):
# Works for single job, not job group
<b>result = celery_instance.GroupResult.restore(group_id)</b>
...
R'relaimpo'软件包的Python端口 - python我需要计算Lindeman-Merenda-Gold(LMG)分数,以进行回归分析。我发现R语言的relaimpo包下有该文件。不幸的是,我对R没有任何经验。我检查了互联网,但找不到。这个程序包有python端口吗?如果不存在,是否可以通过python使用该包? python参考方案 最近,我遇到了pingouin库。
Python:传递记录器是个好主意吗? - python我的Web服务器的API日志如下:started started succeeded failed 那是同时收到的两个请求。很难说哪一个成功或失败。为了彼此分离请求,我为每个请求创建了一个随机数,并将其用作记录器的名称logger = logging.getLogger(random_number) 日志变成[111] started [222] start…
Python-Excel导出 - python我有以下代码:import pandas as pd import requests from bs4 import BeautifulSoup res = requests.get("https://www.bankier.pl/gielda/notowania/akcje") soup = BeautifulSoup(res.cont…
Python sqlite3数据库已锁定 - python我在Windows上使用Python 3和sqlite3。我正在开发一个使用数据库存储联系人的小型应用程序。我注意到,如果应用程序被强制关闭(通过错误或通过任务管理器结束),则会收到sqlite3错误(sqlite3.OperationalError:数据库已锁定)。我想这是因为在应用程序关闭之前,我没有正确关闭数据库连接。我已经试过了: connectio…
Python:如何根据另一列元素明智地查找一列中的空单元格计数? - pythondf = pd.DataFrame({'user': ['Bob', 'Jane', 'Alice','Jane', 'Alice','Bob', 'Alice'], 'income…