How to parallelise blocking code with asyncio
我知道 StackOverflow 中的 asyncio 功能很多,但尽管这里回答了很多问题,但我仍然不明白如何做一些简单的事情,比如并行化 2 个执行阻塞代码的任务。
例如,这很好用:
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import asyncio
async def slow_thing():
async def try_alpha():
async def try_bravo():
async def main():
loop = asyncio.get_event_loop() |
输出正是我正在寻找的:
1
2 3 4 5 6 7 |
Alpha start
Bravo start *2 second wait* Alpha stop Bravo stop Alpha Bravo |
但是,如果我将 await syncio.sleep(2) 换成 time.sleep(2),输出就好像我的代码没有任何异步:
1
2 3 4 5 6 7 8 |
Alpha start
*2 second wait* Alpha stop Bravo start *2 second wait* Bravo stop Alpha Bravo |
问题是,在我的真实示例中,我无法控制那些缓慢的代码,因此我无法将其更改为使用协程。在某些情况下,它只是 requests.get() 的一堆用途,而在其他情况下,我正在使用 kodijson 库,它做了很多我无法访问的事情。
所以我想知道 asyncio 是否是正确的工具。当您尝试与 .gather() 并行化时,是否可以在异步代码中使用阻塞代码?
还请注意,我(不幸地)在这方面坚持使用 Python 3.6。我正在编写一个 Mycroft 扩展程序,而这正是他们目前所坚持的环境。
- 对于阻塞调用,使用 loop.run_in_executor
- 另请注意,如果您没有一堆代码可以利用 asyncio 之上的异步 I/O,它可能不是您的应用程序的正确选择,您应该只使用 threading /multiprocessing/concurrent.futures 直接。
- 我用 loop.run_in_executor(None, time.sleep, 2) 替换了 sleep 行,它有点工作。它在应该等待的地方等待,但也会打印出错误:RuntimeError: Event loop is closed
- 但是您使用 concurrent.futures 的建议看起来可能对我有用,谢谢!
- 看看这个答案:根据任务的性质,您有多种选择。但是如果你想要 asyncio 而不是重写,那么,是的,run_in_executor 是一种方法。
- 这个答案显示了如何直接使用 concurrent.futures 来完成这种任务。
在我在这里以评论的形式获得帮助后,我能够使用 concurrent.futures:
汇总一个解决方案
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import concurrent.futures
import time
def slow_1(s):
def slow_2(s):
def slow_3(s):
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: |
哪些输出:
1
2 3 4 5 6 |
2: y
3: z 1: x 1: ok 2: ok 3: ok |
我应该注意到,官方文档并不清楚你可以通过在未来调用 .result() 从函数中获取返回值,或者你需要遍历 futures 值来获取说结果。 .wait() 按返回顺序返回 done 和 not_done 值的元组,因此循环遍历 done 的值对我来说破坏了很多事情。如果你像我一样,只想一次做 3 件缓慢的事情并从这三件事情中得到结果,那么这段代码可能适合你。
只有当有东西等待时,协程才能”并行”做一些事情。例如,在您上面的代码中,它与 asyncio.sleep 一起工作的原因是您可以在其上等待它。您只能等待为此目的而设计的特定功能。这就是标准 time.sleep 不起作用的原因,因为您不能使用关键字 await 。 requests 库也是如此。
幸运的是,您可以使用美妙的 aiohttp 库:https://docs.aiohttp.org
这将为您提供同时发出多个请求所需的确切信息。
- 这个答案是正确的,但是 OP 明确表示他们不控制”慢”代码,即它不能仅仅从 requests 移植到 aiohttp。正如评论中所指出的,这可以使用 run_in_executor 来解决,但也可以直接使用 concurrent.futures。
来源:https://www.codenong.com/62140797/