Skip to main content

Streaming With LangChain

流媒体在让基于LLMs的应用对终端用户产生响应感方面至关重要。

重要的LangChain原语,如LLMs、解析器、提示、检索器和代理,实现了LangChain Runnable接口。

这个界面提供了两种通用的流内容方法:

  1. 同步: 流: 异步: 流式: 默认实现流式处理,从链条中流式传输最终输出。
  2. 异步 事件流 和异步 日志流 :它们提供了一种从链中流动中间步骤和最终输出的方法。

让我们看看这两种方法,并尝试理解如何使用它们。 🥷

Using Stream

所有的Runnable对象都实现了一个名为stream的同步方法,以及一个名为astream的异步变体。

这些方法被设计为以块的形式流式传输最终输出,一旦可用就产生每个块。

流媒体只有在程序中的所有步骤都知道如何处理输入流的情况下才能实现;即,一次处理一个输入块,并产生相应的输出块。

这种处理的复杂性可能会有所不同,从像发射由LLM生成的令牌这样简单的任务,到比如在整个JSON完整之前,流式传输JSON结果的部分,这样更具挑战性的任务。

开始探索流媒体的最佳地点是LLM应用程序中最重要的组成部分 - LLM本身!

LLMs and Chat Models

大型语言模型及其聊天变体是基于LLM的应用程序中的主要瓶颈。🙊

大型语言模型可能需要几秒钟来生成对查询的完整响应。这比应用程序对最终用户响应感觉灵敏的约200-300毫秒的阈值要慢得多。

提高应用程序响应性的关键策略是显示中间进度;即,逐个输出来自模型的标记。

我们将展示使用Anthropic的聊天模型进行流式传输的示例。要使用该模型, 您需要安装langchain-anthropic包。您可以使用以下命令执行此操作:

pip install -qU langchain-anthropic
# Showing the example using anthropic, but you can use
# your favorite chat model!
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic()

chunks = []
async for chunk in model.astream("hello. tell me something about yourself"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
 Hello|!| My| name| is| Claude|.| I|'m| an| AI| assistant| created| by| An|throp|ic| to| be| helpful|,| harmless|,| and| honest|.||

让我们检查其中一个块。

chunks[0]
AIMessageChunk(content=' Hello')

我们得到了一个称为 AIMessageChunk 的东西。这个块表示 AIMessage 的一部分。

消息块是按设计添加的 - 可以简单地将它们加在一起以获取到目前为止的响应状态!

chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]
AIMessageChunk(content=' Hello! My name is')

Chains

几乎所有LLM应用都涉及比简单调用语言模型更多的步骤。

让我们使用LCEL(LangChain Expression Language)构建一个简单的链,结合提示、模型和解析器,并验证流式工作。

我们将使用 StrOutputParser 来解析模型输出。这是一个简单的解析器,从 AIMessageChunk 中提取 content 字段,给我们返回模型生成的 token。

tip

LCEL 是一种声明性的方式,通过链接不同的 LangChain 原语来指定一个“程序”。使用 LCEL 创建的链将受益于对流和 astream 的自动实现,允许最终输出的流式传输。实际上,使用 LCEL 创建的链实现了整个标准的 Runnable 接口。

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "parrot"}):
print(chunk, end="|", flush=True)
 Here|'s| a| silly| joke| about| a| par|rot|:|

What| kind| of| teacher| gives| good| advice|?| An| ap|-|parent| (|app|arent|)| one|!||

你可能注意到以上,解析器实际上没有阻止模型的流式输出,而是逐个处理每个块。许多LCEL基元也支持这种转换风格的传递流,这在构建应用程序时非常方便。

某些可运行对象,如prompt模板和chat模型,无法处理单个块,而是会累积所有先前步骤。 这将中断流程过程。可以设计自定义函数以返回生成器。

note

如果上述功能与您构建的内容无关,您可以不使用 LangChain 表达式语言来使用 LangChain,而是可以依靠标准的命令式编程方法,通过在每个组件上分别调用 invoke、batch 或 stream,将结果赋给变量,然后根据需要在下游使用。

如果这符合您的需求,那对我们来说没问题👌!

Working with Input Streams

如果您希望在生成过程中从输出中流式传输 JSON,会怎么样?

如果你要依赖json.loads来解析部分的json,解析将会失败,因为这部分的json不是有效的json。

你可能完全不知道该怎么做,并声称无法流式传输 JSON。

嗯,事实证明这是可以做到的 - 解析器需要在**输入流**上操作,并尝试“自动完成”部分 JSON 到有效状态。

让我们看看这样一个解析器的运行情况,以理解这意味着什么。

from langchain_core.output_parsers import JsonOutputParser

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
print(text, flush=True)
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 6739}]}
{'countries': [{'name': 'France', 'population': 673915}]}
{'countries': [{'name': 'France', 'population': 67391582}]}
{'countries': [{'name': 'France', 'population': 67391582}, {}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Sp'}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 4675}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 467547}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': 'Japan', 'population': 12}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': 'Japan', 'population': 12647}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': 'Japan', 'population': 1264764}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': 'Japan', 'population': 126476461}]}

现在,让我们来中断流式传输。我们将使用之前的示例,并在最后附加一个提取函数,以从最终的JSON中提取国家名称。

danger

任何在链中操作已完成输入而不是输入流的步骤都可能通过stream或astream中断流功能。

tip

稍后,我们将讨论astream_events API,该API可以从中间步骤流传结果。即使链中包含仅在最终输入上操作的步骤,此API也会从中间步骤流传结果。

from langchain_core.output_parsers import (
JsonOutputParser,
)


# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
print(text, end="|", flush=True)
['France', 'Spain', 'Japan']|

Generator Functions

让我们使用可以在输入流上操作的生成器函数来修复流。

tip

生成器函数(使用 yield 的函数)允许编写操作输入流的代码。

from langchain_core.output_parsers import JsonOutputParser


async def _extract_country_names_streaming(input_stream):
"""A function that operates on input streams."""
country_names_so_far = set()

async for input in input_stream:
if not isinstance(input, dict):
continue

if "countries" not in input:
continue

countries = input["countries"]

if not isinstance(countries, list):
continue

for country in countries:
name = country.get("name")
if not name:
continue
if name not in country_names_so_far:
yield name
country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
print(text, end="|", flush=True)
France|Sp|Spain|Japan|
note

因为上面的代码依赖于 JSON 的自动完成功能,您可能会看到一些国家的部分名字(例如 Sp 和 Spain),这并不是我们想要的提取结果!

我们专注于流媒体概念,并非链条的结果。

Non-streaming components

一些内置组件,比如Retrievers,并不提供任何流式传输。如果我们尝试对它们进行流式传输会发生什么呢?🤨

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
["harrison worked at kensho", "harrison likes spicy food"],
embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks
[[Document(page_content='harrison worked at kensho'),
Document(page_content='harrison likes spicy food')]]

该流仅产生了该组件的最终结果。

这是可以的 🥹!并非所有组件都必须实现流式处理 - 在某些情况下,流式处理是不必要的、困难的或者根本毫无意义。

tip

用非流式组件构建的LCEL链,在许多情况下仍然可以进行流式传输,部分输出开始在链中最后一个非流式步骤之后。

retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
for chunk in retrieval_chain.stream(
"Where did harrison work? " "Write 3 made up sentences about this place."
):
print(chunk, end="|", flush=True)
 Based| on| the| given| context|,| the| only| information| provided| about| where| Harrison| worked| is| that| he| worked| at| Ken|sh|o|.| Since| there| are| no| other| details| provided| about| Ken|sh|o|,| I| do| not| have| enough| information| to| write| 3| additional| made| up| sentences| about| this| place|.| I| can| only| state| that| Harrison| worked| at| Ken|sh|o|.||

现在我们已经看到了流stream和异步stream astream是如何工作的,让我们踏入流事件的世界。🏞️

Using Stream Events

事件流是一个测试版API。这个API可能会根据反馈意见进行一些更改。

note

引入在 Langchain-Core 中。 0.1.14。

import langchain_core

langchain_core.__version__
'0.1.18'

为了让 astream_events API 正常工作:

  • 在代码中尽可能使用 async(例如,异步工具等)。
  • 如果定义自定义函数/可运行对象,请传播回调。
  • 使用 runnables 时,如果不使用 LCEL,请确保在 LLM 上调用 .astream(),而不是 .ainvoke,以强制 LLM 流式传输令牌。
  • 请告诉我们如果有任何意外情况! :)

Event Reference

下面是一个参考表格,显示了各种可由不同Runnable对象发出的一些事件。

note

当流式处理实施正确时,只有在输入流完全被消耗之后,才会知道可运行的输入。这意味着输入通常只会包含“结束”事件,而不是“开始”事件。

event name chunk input output
on_chat_model_start [ model name ] {“messages”: [ [ SystemMessage, HumanMessage ] ] }
on_chat_model_stream [ model name ] AIMessageChunk(content=“hello”)
on_chat_model_end [ model name ] {“messages”: [ [ SystemMessage, HumanMessage ] ] } {“generations”: [ ] , “llm_output”: None, …}
on_llm_start [ model name ] {‘input’: ‘hello’}
on_llm_stream [ model name ] ‘Hello’
on_llm_end [ model name ] ‘Hello human!’
on_chain_start format_docs
on_chain_stream format_docs “hello world!, goodbye world!”
on_chain_end format_docs [ Document(…) ] “hello world!, goodbye world!”
on_tool_start some_tool {“x”: 1, “y”: “2”}
on_tool_stream some_tool {“x”: 1, “y”: “2”}
on_tool_end some_tool {“x”: 1, “y”: “2”}
on_retriever_start [ retriever name ] {“query”: “hello”}
on_retriever_chunk [ retriever name ] {documents: [ ] }
on_retriever_end [ retriever name ] {“query”: “hello”} {documents: [ ] }
on_prompt_start [ template_name ] {“question”: “hello”}
on_prompt_end [ template_name ] {“question”: “hello”} ChatPromptValue(messages: [ SystemMessage, … ] )

Chat Model

让我们首先来看一下由聊天模型产生的事件。

events = []
async for event in model.astream_events("hello", version="v1"):
events.append(event)
/home/eugene/src/langchain/libs/core/langchain_core/_api/beta_decorator.py:86: LangChainBetaWarning: This API is in beta and may change in the future.
warn_beta(
note

嘿,API 中的那个搞笑的版本参数“v1”是什么意思? 😾

这是一个测试版API,并且我们几乎肯定会对其进行一些更改。

这个版本参数将帮助我们最小化对你的代码的破坏性更改。

简而言之,我们现在让您感到烦恼,这样以后就不会再让您感到烦恼了。

让我们看一下一些开始事件和一些结束事件。

events[:3]
[{'event': 'on_chat_model_start',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {},
'data': {'input': 'hello'}},
{'event': 'on_chat_model_stream',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'name': 'ChatAnthropic',
'data': {'chunk': AIMessageChunk(content=' Hello')}},
{'event': 'on_chat_model_stream',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'name': 'ChatAnthropic',
'data': {'chunk': AIMessageChunk(content='!')}}]
events[-2:]
[{'event': 'on_chat_model_stream',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'name': 'ChatAnthropic',
'data': {'chunk': AIMessageChunk(content='')}},
{'event': 'on_chat_model_end',
'name': 'ChatAnthropic',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'data': {'output': AIMessageChunk(content=' Hello!')}}]

Chain

让我们重新查看解析流式 JSON 的示例链以探索流式事件 API。

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

events = [
event
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
)
]

如果您检查最初几个事件,您会注意到有3个不同的开始事件,而不是2个开始事件。

三个起始事件对应以下内容:

  1. 链条(模型 + 解析器)
  2. 这个模型
  3. 解析器
events[:3]
[{'event': 'on_chain_start',
'run_id': 'b1074bff-2a17-458b-9e7b-625211710df4',
'name': 'RunnableSequence',
'tags': [],
'metadata': {},
'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}},
{'event': 'on_chat_model_start',
'name': 'ChatAnthropic',
'run_id': '6072be59-1f43-4f1c-9470-3b92e8406a99',
'tags': ['seq:step:1'],
'metadata': {},
'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}},
{'event': 'on_parser_start',
'name': 'JsonOutputParser',
'run_id': 'bf978194-0eda-4494-ad15-3a5bfe69cd59',
'tags': ['seq:step:2'],
'metadata': {},
'data': {}}]

如果您查看最后3个事件,您认为会看到什么?中间的呢?

让我们使用该API来获取模型和解析器的流事件输出。我们忽略起始事件、结束事件和链条中的事件。

num_events = 0

async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ' Here'
Chat model chunk: ' is'
Chat model chunk: ' the'
Chat model chunk: ' JSON'
Chat model chunk: ' with'
Chat model chunk: ' the'
Chat model chunk: ' requested'
Chat model chunk: ' countries'
Chat model chunk: ' and'
Chat model chunk: ' their'
Chat model chunk: ' populations'
Chat model chunk: ':'
Chat model chunk: '\n\n```'
Chat model chunk: 'json'
Parser chunk: {}
Chat model chunk: '\n{'
Chat model chunk: '\n '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Parser chunk: {'countries': []}
Chat model chunk: ' ['
Chat model chunk: '\n '
Parser chunk: {'countries': [{}]}
Chat model chunk: ' {'
...

由于模型和解析器都支持流式处理,我们可以实时看到两个组件的流事件!有点酷,不是吗? 🦜

Filtering Events

由于此 API 生成了许多事件,因此能够筛选事件是很有用的。

您可以按组件名称、组件标签或组件类型进行筛选。

By Name

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
include_names=["my_parser"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_parser_start', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': []}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': ''}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France'}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 6739}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 673915}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {}]}}}
...

By Type

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
include_types=["chat_model"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chat_model_start', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' Here')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' is')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' the')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' JSON')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' with')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' the')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' requested')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' countries')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' and')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' their')}}
...

By Tags

caution

标签会被给定可运行组件的子组件继承。

如果您正在使用标签进行筛选,请确保这正是您要的结果。

chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
include_tags=["my_chain"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chain_start', 'run_id': '190875f3-3fb7-49ad-9b6e-f49da22f3e49', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}}
{'event': 'on_chat_model_start', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}}
{'event': 'on_parser_start', 'name': 'JsonOutputParser', 'run_id': '3b5e4ca1-40fe-4a02-9a19-ba2a43a6115c', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' Here')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' is')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' the')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' JSON')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' with')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' the')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' requested')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' countries')}}
...

Non-streaming components

一些组件无法流畅工作的原因是它们无法在输入流上运行。

当使用 astream 时,这些组件可能会中断最终输出的流,但 astream_events 仍会从支持流式处理的中间步骤产生流事件!

# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = (
model | JsonOutputParser() | _extract_country_names
) # This parser only works with OpenAI right now

正如预期的那样,ASTREAM API不能正常工作,因为_EXTRACT_COUNTRY_NAMES不能对流操作。

async for chunk in chain.astream(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
):
print(chunk, flush=True)
['France', 'Spain', 'Japan']

现在,让我们确认使用“astream_events”我们仍然可以看到模型和解析器的流式输出。

num_events = 0

async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ' Here'
Chat model chunk: ' is'
Chat model chunk: ' the'
Chat model chunk: ' JSON'
Chat model chunk: ' with'
Chat model chunk: ' the'
Chat model chunk: ' requested'
Chat model chunk: ' countries'
Chat model chunk: ' and'
Chat model chunk: ' their'
Chat model chunk: ' populations'
Chat model chunk: ':'
Chat model chunk: '\n\n```'
Chat model chunk: 'json'
Parser chunk: {}
Chat model chunk: '\n{'
Chat model chunk: '\n '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Parser chunk: {'countries': []}
Chat model chunk: ' ['
Chat model chunk: '\n '
Parser chunk: {'countries': [{}]}
Chat model chunk: ' {'
Chat model chunk: '\n '
Chat model chunk: ' "'
...

Propagating Callbacks

caution

如果您在工具中使用了调用运行任务的操作,则需要将回调传递给该任务;否则,不会生成任何流事件。

note

当使用 RunnableLambdas 或 @chain 装饰器时,回调将在幕后自动传播。

from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool


def reverse_word(word: str):
return word[::-1]


reverse_word = RunnableLambda(reverse_word)


@tool
def bad_tool(word: str):
"""Custom tool that doesn't propagate callbacks."""
return reverse_word.invoke(word)


async for event in bad_tool.astream_events("hello", version="v1"):
print(event)
{'event': 'on_tool_start', 'run_id': 'ae7690f8-ebc9-4886-9bbe-cb336ff274f2', 'name': 'bad_tool', 'tags': [], 'metadata': {}, 'data': {'input': 'hello'}}
{'event': 'on_tool_stream', 'run_id': 'ae7690f8-ebc9-4886-9bbe-cb336ff274f2', 'tags': [], 'metadata': {}, 'name': 'bad_tool', 'data': {'chunk': 'olleh'}}
{'event': 'on_tool_end', 'name': 'bad_tool', 'run_id': 'ae7690f8-ebc9-4886-9bbe-cb336ff274f2', 'tags': [], 'metadata': {}, 'data': {'output': 'olleh'}}

这里有一个重新实现的版本,确实正确传播回调。您会注意到现在我们也从 reverse_word 可运行程序中获取事件。

@tool
def correct_tool(word: str, callbacks):
"""A tool that correctly propagates callbacks."""
return reverse_word.invoke(word, {"callbacks": callbacks})


async for event in correct_tool.astream_events("hello", version="v1"):
print(event)
{'event': 'on_tool_start', 'run_id': '384f1710-612e-4022-a6d4-8a7bb0cc757e', 'name': 'correct_tool', 'tags': [], 'metadata': {}, 'data': {'input': 'hello'}}
{'event': 'on_chain_start', 'name': 'reverse_word', 'run_id': 'c4882303-8867-4dff-b031-7d9499b39dda', 'tags': [], 'metadata': {}, 'data': {'input': 'hello'}}
{'event': 'on_chain_end', 'name': 'reverse_word', 'run_id': 'c4882303-8867-4dff-b031-7d9499b39dda', 'tags': [], 'metadata': {}, 'data': {'input': 'hello', 'output': 'olleh'}}
{'event': 'on_tool_stream', 'run_id': '384f1710-612e-4022-a6d4-8a7bb0cc757e', 'tags': [], 'metadata': {}, 'name': 'correct_tool', 'data': {'chunk': 'olleh'}}
{'event': 'on_tool_end', 'name': 'correct_tool', 'run_id': '384f1710-612e-4022-a6d4-8a7bb0cc757e', 'tags': [], 'metadata': {}, 'data': {'output': 'olleh'}}

如果您在Runnable Lambdas或@chains中调用runnables,则回调将会自动代表您传递。

from langchain_core.runnables import RunnableLambda


async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


reverse_and_double = RunnableLambda(reverse_and_double)

await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234", version="v1"):
print(event)
{'event': 'on_chain_start', 'run_id': '4fe56c7b-6982-4999-a42d-79ba56151176', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'input': '1234'}}
{'event': 'on_chain_start', 'name': 'reverse_word', 'run_id': '335fe781-8944-4464-8d2e-81f61d1f85f5', 'tags': [], 'metadata': {}, 'data': {'input': '1234'}}
{'event': 'on_chain_end', 'name': 'reverse_word', 'run_id': '335fe781-8944-4464-8d2e-81f61d1f85f5', 'tags': [], 'metadata': {}, 'data': {'input': '1234', 'output': '4321'}}
{'event': 'on_chain_stream', 'run_id': '4fe56c7b-6982-4999-a42d-79ba56151176', 'tags': [], 'metadata': {}, 'name': 'reverse_and_double', 'data': {'chunk': '43214321'}}
{'event': 'on_chain_end', 'name': 'reverse_and_double', 'run_id': '4fe56c7b-6982-4999-a42d-79ba56151176', 'tags': [], 'metadata': {}, 'data': {'output': '43214321'}}

并且使用 @chain 装饰器:

from langchain_core.runnables import chain


@chain
async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234", version="v1"):
print(event)
{'event': 'on_chain_start', 'run_id': '7485eedb-1854-429c-a2f8-03d01452daef', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'input': '1234'}}
{'event': 'on_chain_start', 'name': 'reverse_word', 'run_id': 'e7cddab2-9b95-4e80-abaf-4b2429117835', 'tags': [], 'metadata': {}, 'data': {'input': '1234'}}
{'event': 'on_chain_end', 'name': 'reverse_word', 'run_id': 'e7cddab2-9b95-4e80-abaf-4b2429117835', 'tags': [], 'metadata': {}, 'data': {'input': '1234', 'output': '4321'}}
{'event': 'on_chain_stream', 'run_id': '7485eedb-1854-429c-a2f8-03d01452daef', 'tags': [], 'metadata': {}, 'name': 'reverse_and_double', 'data': {'chunk': '43214321'}}
{'event': 'on_chain_end', 'name': 'reverse_and_double', 'run_id': '7485eedb-1854-429c-a2f8-03d01452daef', 'tags': [], 'metadata': {}, 'data': {'output': '43214321'}}

Help us out by providing feedback on this documentation page: