从Flask路由进行Python异步调用 - python

我想在每次执行Flask路由时执行一个异步函数。为什么abar函数从不执行?

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=loop)
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
    loop.run_forever()

我还尝试将阻塞调用放在单独的线程中。但是它仍然没有调用abar函数。

import asyncio
from threading import Thread
from flask import Flask

async def abar(a):
    print(a)

app = Flask(__name__)

def start_worker(loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    finally:
        loop.close()

worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_worker, args=(worker_loop,))

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=worker_loop)
    return "OK"

if __name__ == "__main__":
    worker.start()
    app.run(debug=False, use_reloader=False)

python大神给出的解决方案

您可以将一些异步功能集成到Flask应用中,而不必完全将其转换为异步。

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    loop.run_until_complete(abar("abar"))
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)

这将阻止Flask响应,直到异步函数返回为止,但是它仍然允许您做一些聪明的事情。我已经使用此模式使用aiohttp并行执行许多外部请求,然后在完成后,我回到传统的烧瓶中进行数据处理和模板渲染。

import aiohttp
import asyncio
import async_timeout
from flask import Flask

loop = asyncio.get_event_loop()
app = Flask(__name__)

async def fetch(url):
    async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
        async with session.get(url) as response:
            return await response.text()

def fight(responses):
    return "Why can't we all just get along?"

@app.route("/")
def index():
    # perform multiple async requests concurrently
    responses = loop.run_until_complete(asyncio.gather(
        fetch("https://google.com/"),
        fetch("https://bing.com/"),
        fetch("https://duckduckgo.com"),
        fetch("http://www.dogpile.com"),
    ))

    # do something with the results
    return fight(responses)

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)

用大写字母拆分字符串,但忽略AAA Python Regex - python

我的正则表达式:vendor = "MyNameIsJoe. I'mWorkerInAAAinc." ven = re.split(r'(?<=[a-z])[A-Z]|[A-Z](?=[a-z])', vendor) 以大写字母分割字符串,例如:'我的名字是乔。 I'mWorkerInAAAinc”变成…

Python-熊猫描述了抛出错误:无法散列的类型“ dict” - python

更新:我正在使用“ Socrata开源API”中的一些示例代码。我在代码中注意到以下注释:# First 2000 results, returned as JSON from API / converted to Python # list of dictionaries by sodapy. 我不熟悉JSON。我已经下载了一个数据集,并创建了一个包含大量…

子条件的python条件覆盖 - python

我试图找到一个python代码覆盖率工具,该工具可以衡量语句中是否包含子表达式:例如,我想看看下面的示例是否涵盖了condition1 / condition2 / condtion3?if condition1 or condition2 or condition3: x = true_value python大神给出的解决方案 对此的唯一合理答案是:当前…

USB设备发行 - python

我目前正在使用PyUSB。由于我不熟悉USB,所以我不知道如何执行以下操作。我已经从Python PyUSB成功连接到我的USB设备硬件。在代码中,我需要重置USB设备硬件。通过向硬件发送命令来完成。现在,在硬件重置后,我想从Python PyUSB释放当前的USB设备。然后,我想在重置后将其重新连接到USB设备硬件。请让我知道,如何释放USB设备连接和接口…

在Flask中测试文件上传 - python

我在Flask集成测试中使用Flask-Testing。我有一个表单,该表单具有我要为其编写测试的徽标的文件上传,但是我不断收到错误消息:TypeError: 'str' does not support the buffer interface。我正在使用Python3。我找到的最接近的答案是this,但是它对我不起作用。这是我的许多尝…