当前位置: 首页 > news >正文

一步步理解 Python 异步生成器(AsyncGenerator)——从入门到实践

一步步理解 Python 异步生成器(AsyncGenerator)——从入门到实践

      • 前言
      • 1. 什么是生成器?
      • 2. 什么是异步生成器?
      • 3. 代码示例:异步生成器的实际应用
        • 完整代码
        • 代码详解
        • 运行结果
      • 4. 关键点回顾
      • 结语

前言

在 Python 中,生成器(Generator)是一种非常强大的工具,能够在函数执行的过程中多次返回值,而不需要一次性返回所有结果。异步生成器(AsyncGenerator)则是结合了生成器和异步编程(asyncio)的概念,能够在异步环境中按需生成数据。这篇文章将通过一个简单的例子,帮助初学者理解异步生成器的工作原理,以及 yield 关键字在异步生成器中的作用。

1. 什么是生成器?

首先,我们来回顾一下生成器的基本概念。一个生成器函数类似于一个普通函数,不同的是它使用了 yield 关键字而不是 return。每当生成器的 yield 被执行时,函数会暂停执行,并返回一个值。下次调用这个生成器时,函数会从上次暂停的地方继续执行。

来看一个简单的例子:

def simple_generator():yield 1yield 2yield 3gen = simple_generator()for value in gen:print(value)

运行结果:

1
2
3

在这个例子中,每次循环时,生成器都会返回一个新的值,直到所有 yield 都被执行完毕。

2. 什么是异步生成器?

异步生成器结合了异步编程和生成器的优点。它可以在异步环境中使用,允许我们在等待某些操作(如网络请求、文件操作)完成时,按需返回数据。

使用 async def 定义异步生成器,使用 yield 来返回数据,同时使用 await 来执行异步操作。

3. 代码示例:异步生成器的实际应用

接下来,我们通过一个示例代码来深入理解异步生成器的使用方式。这段代码模拟了一个异步搜索的过程,其中包括了上下文生成、回调函数处理、以及异步任务的执行。

完整代码
import asyncio
from typing import AsyncGenerator, List, Tuple# 模拟的上下文生成器类
class ContextBuilder:def build_context(self, conversation_history=None, **kwargs) -> Tuple[List[str], str]:context_chunks = ["chunk1", "chunk2", "chunk3"]context_records = "Initial Context Record"return context_chunks, context_records# 回调函数模拟类
class Callback:def on_map_response_start(self, context_chunks):print("Callback: Starting map response with context chunks:", context_chunks)def on_map_response_end(self, map_responses):print("Callback: Finished map response with results:", map_responses)class AsyncSearch:def __init__(self):self.context_builder = ContextBuilder()self.context_builder_params = {}self.callbacks = [Callback()]self.map_llm_params = {}self.reduce_llm_params = {}async def _map_response_single_batch(self, context_data: str, query: str, **kwargs) -> str:# 模拟异步处理单个批次数据await asyncio.sleep(1)return f"Processed {context_data} with query '{query}'"async def _stream_reduce_response(self, map_responses: List[str], query: str, **kwargs) -> AsyncGenerator[str, None]:# 模拟异步流式减少响应for response in map_responses:await asyncio.sleep(1)yield f"Reduced result: {response}"async def astream_search(self, query: str) -> AsyncGenerator[str, None]:"""Stream the global search response."""context_chunks, context_records = self.context_builder.build_context()# 如果有回调函数,执行开始回调if self.callbacks:for callback in self.callbacks:callback.on_map_response_start(context_chunks)# 异步处理每个上下文块map_responses = await asyncio.gather(*[self._map_response_single_batch(context_data=data, query=query, **self.map_llm_params)for data in context_chunks])# 如果有回调函数,执行结束回调if self.callbacks:for callback in self.callbacks:callback.on_map_response_end(map_responses)# 先返回上下文记录yield context_records# 返回减少后的响应async for response in self._stream_reduce_response(map_responses=map_responses,query=query,**self.reduce_llm_params,):yield response# 测试代码
async def main():search = AsyncSearch()async for result in search.astream_search("example query"):print("Received:", result)# 执行异步主函数
asyncio.run(main())
代码详解
  1. ContextBuilder 类:这个类负责生成上下文数据,包括 context_chunkscontext_records。在实际应用中,这些数据可能是从数据库或其他服务中获取的。

  2. Callback 类:模拟回调函数,在执行前后打印信息。这展示了如何在复杂操作中使用回调函数。

  3. AsyncSearch 类:核心类,包含了异步生成器的实现。

    • _map_response_single_batch 方法:这是一个模拟的异步函数,假装在处理某个数据块。实际应用中,这里可能是一些耗时的计算或网络请求。
    • _stream_reduce_response 方法:这是另一个异步生成器,用于逐步处理和返回数据。
    • astream_search 方法:这是我们要重点关注的异步生成器。它首先调用了 ContextBuilder,然后执行了一些回调函数,并异步处理数据块。最后,它先返回了 context_records,然后返回了异步处理的结果。
  4. main 函数:测试代码,通过 async for 循环来获取 astream_search 生成器的返回值,并打印出来。

运行结果

运行上面的代码,你会看到类似以下的输出:

Callback: Starting map response with context chunks: ['chunk1', 'chunk2', 'chunk3']
Callback: Finished map response with results: ['Processed chunk1 with query \'example query\'', 'Processed chunk2 with query \'example query\'', 'Processed chunk3 with query \'example query\'']
Received: Initial Context Record
Received: Reduced result: Processed chunk1 with query 'example query'
Received: Reduced result: Processed chunk2 with query 'example query'
Received: Reduced result: Processed chunk3 with query 'example query'

4. 关键点回顾

  • 异步生成器的定义:通过 async defyield 实现,可以在异步任务中按需生成数据。
  • yield 之后的代码继续执行:在生成器中,yield 会暂停函数的执行,但下次继续调用时,代码会从上次暂停的地方继续执行。
  • 结合异步编程:异步生成器允许我们在处理复杂任务(如网络请求、数据库查询)时,流式返回部分结果,优化资源的利用。

结语

这篇文章通过一个简单的示例,向大家介绍了 Python 中的异步生成器,以及如何在实际应用中使用它们。通过理解 yield 和异步编程的结合,大家可以更好地编写高效、可扩展的异步代码。

如果你觉得这篇文章对你有帮助,欢迎点赞、收藏和分享!如果有任何问题,也欢迎在评论区留言讨论。


http://www.mrgr.cn/news/13681.html

相关文章:

  • CMake Error at CMakeLists.txt (find_package)幕后真凶
  • Git 常用命令总结
  • zsh: command not found: ohpm - mac安装ohpm工具 - 鸿蒙开发
  • Aiseesoft Data Recovery for Mac:专业级数据恢复解决方案
  • Semantic Kernel/C#:一种通用的Function Calling方法,文末附经测试可用的大模型
  • Nextjs(App Router) 开发记录
  • C++ | Leetcode C++题解之第382题链表随机节点
  • 农夫山泉:不止一瓶水
  • list的使用及其相关知识点
  • 解除 Excel 表格的文档保护全攻略
  • Java-InputStream转换成MultipartFile工具类
  • 多云复杂性正在危及组织的数字化进程
  • 【Leetcode 2099 】 找到和最大的长度为 K 的子序列 —— 索引与排序
  • PyTorch深度学习模型训练流程:(二、回归)
  • 2024 CygenixCTF repwn 部分wp
  • Java面试宝典-java基础04
  • 软件设计师全套备考系列文章15 -- 数据库:规范化、控制功能、大数据
  • 【Go函数详解】三、匿名函数和闭包
  • 供应链采购管理系统中常见的专有名词解释【自用】
  • 【SpringBoot + Vue 尚庭公寓实战】标签管理接口优化(四)