# Set environment variables
from langchain_opentutorial import set_env
set_env(
{
"OPENAI_API_KEY": "",
"ANTHROPIC_API_KEY": "",
}
)
Environment variables have been set successfully.
from dotenv import load_dotenv
load_dotenv(override=True)
False
Using Stream
All Runnable objects provide two stream methods:
Synchronous (sync): stream
Asynchronous (async): astream
These methods are designed to process the final output in small chunks, returning each chunk as soon as it is available.
How Streaming Works
Streaming is only possible when every step in the program processes input data one chunk at a time and yields the corresponding output chunk.
The complexity of processing can vary, such as:
Simple tasks: Emitting tokens generated by an LLM one by one.
Complex tasks: Streaming parts of a JSON result before the entire JSON is completed.
The best way to explore streaming is to focus on the most critical component in LLM-based apps: the LLMs themselves.
LLMs and Chat Models
LLMs and chat models are the primary bottlenecks in LLM-based applications.
LLMs can take several seconds to generate a complete response to a query.
This is much slower than the 200-300ms threshold at which users perceive an application as responsive.
Solution: Use Streaming to Improve Responsiveness
To reduce waiting time for users, it's essential to show intermediate progress.
This can be achieved by streaming the model’s output token by token and displaying it immediately to the user.
Let’s explore an example of streaming using a chat model. Choose one of the options below to get started!
from langchain_anthropic import ChatAnthropic
model = ChatAnthropic(model="claude-3-5-sonnet-latest", temperature=0)
Let's start with the sync stream API
chunks = []
for chunk in model.stream("What color is a polar bear?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
|Polar bears actually| have transparent, hollow fur, not| white fur as commonly believed. Their skin| is black underneath. The fur appears| white because the transparent hair scatters an|d reflects visible light, creating the white appearance we| see. This helps them blend in with their| snowy Arctic environment. The hollow| nature of their fur also helps trap air for| insulation.||
Alternatively, if you're working in an async environment, you may consider using the async astream API
chunks = []
async for chunk in model.astream("What color is a polar bear?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
|Polar bears actually| have transparent, hollow fur, not white| fur as commonly believed. Their skin| is black underneath. The| fur appears white because the transparent hair| scatters and reflects visible light, creating| the white appearance we see.| This helps them blend in with their| snowy Arctic environment. The| hollow nature of their fur also helps trap| air for insulation.||
AIMessageChunk(content='Polar bears actually have transparent, hollow fur, not white fur as commonly believed. Their skin is black underneath. The fur appears white because the transparent hair', additional_kwargs={}, response_metadata={}, id='run-2611aa2c-0556-4ea6-b622-cf8010df2785', usage_metadata={'input_tokens': 14, 'output_tokens': 4, 'total_tokens': 18, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})
Chains
Most LLM applications involve multiple steps, not just a simple call to a language model.
Here, we’ll create a simple chain using the LangChain Expression Language (LCEL).
This chain will combine a prompt, a model, and a parser, and we’ll verify that streaming works.
Example: Using a Parser for Streaming We’ll use the StrOutputParser to process the output from the model.
This parser extracts the content field from an AIMessageChunk, which gives us each token generated by the model.
What is LCEL?
LCEL is a declarative way to define a "program" by linking together different components (called LangChain primitives).
Chains created with LCEL have some advantages:
Built-in streaming: They automatically support stream and astream, allowing the final output to be streamed in chunks.
Standard Runnable interface: Chains built using LCEL fully implement this interface, making them flexible and easy to use.
Streaming with Parsers
Even though we’re using a parser at the end of the chain, streaming still works.
This is because the parser processes each streaming chunk individually, instead of waiting for the full output.
Many LCEL components also support this kind of chunk-by-chunk processing, which makes it easier to build applications that need streaming.
You can create custom functions that return generators to handle streaming data effectively.
Certain components, like prompt templates or chat models, don’t process individual chunks.
Instead, they collect all data from previous steps before proceeding.
This behavior can interrupt the streaming process, so keep it in mind when designing your chain.
LCEL gives you flexibility to decide how your chain operates (e.g., sync/async, batch/streaming).
If LCEL doesn’t fit your needs, you can always use a standard, step-by-step approach:
Call invoke, batch, or stream on each component.
Assign the results to variables.
Use those variables in the next steps as needed.
This lets you switch between a declarative or imperative style depending on what works best for your project.
Now, let’s get started with creating a simple chain!
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser
async for chunk in chain.astream({"topic": "apple"}):
print(chunk, end="|", flush=True)
|Here's an| apple joke for you:
Why| did the apple go to the| doctor?
Because it wasn|'t peeling well!
Here|'s another one:
What kin|d of apple has a short temper?
A| crab apple! |🍎||
Working with Input Streams
What if you wanted to stream JSON as it’s being generated?
If you use json.loads to parse partial JSON, it will fail because partial JSON is not valid JSON.
At first, this might seem impossible to solve, and you might assume that streaming JSON isn’t doable.
It turns out there is a way!
The parser needs to process the input stream and try to "auto-complete" the partial JSON to make it valid.
This allows the JSON to be processed and streamed chunk by chunk.
To better understand how this works, we’ll explore an example of a parser that can handle partial JSON streams effectively.
from langchain_core.output_parsers import JsonOutputParser
chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
"output a list of the countries france, spain and korea and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`"
):
print(text, flush=True)
Now, let's break streaming. We'll use the previous example and append an extraction function at the end that extracts the country names from the finalized JSON.
Why Does This Break Streaming?
Any step in the chain that processes finalized inputs (rather than handling input streams) can disrupt the streaming process.
When this happens, both stream and astream functionalities stop working properly.
Later, we will discuss the astream_events API which streams results from intermediate steps.
This API will stream results from intermediate steps even if the chain contains steps that only operate on finalized inputs.
from langchain_core.output_parsers import (
JsonOutputParser,
)
# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return []
countries = inputs.get("countries", [])
if not isinstance(countries, list):
return []
country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names
chain = model | JsonOutputParser() | _extract_country_names
async for text in chain.astream(
"output a list of the countries france, spain and korea and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`"
):
print(text, end="|", flush=True)
['France', 'Spain', 'Korea']|
Generator Functions
Let’s fix the streaming issue by using a generator function. A generator function allows you to process input streams efficiently by using the yield keyword.
Why Use a Generator Function?
A generator function processes input streams piece by piece.
It’s a simple way to handle streaming data without waiting for the entire input to be completed.
A generator function (a function that uses yield) makes it easier to write code that works with streaming inputs.
from langchain_core.output_parsers import JsonOutputParser
async def _extract_country_names_streaming(input_stream):
"""A function that operates on input streams."""
country_names_so_far = set()
async for input in input_stream:
if not isinstance(input, dict):
continue
if "countries" not in input:
continue
countries = input["countries"]
if not isinstance(countries, list):
continue
for country in countries:
name = country.get("name")
if not name:
continue
if name not in country_names_so_far:
yield name
country_names_so_far.add(name)
chain = model | JsonOutputParser() | _extract_country_names_streaming
async for text in chain.astream(
"output a list of the countries france, spain and korea and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
print(text, end="|", flush=True)
France|Spain|Korea|
Because the code above is relying on JSON auto-completion, you may see partial names of countries (e.g., Sp and Spain), which is not what one would want for an extraction result!
We're focusing on streaming concepts, not necessarily the results of the chains.
Non-Streaming Components
Some built-in components like Retrievers do not offer any streaming. What happens if we try to stream them? 🤨
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
vectorstore = FAISS.from_texts(
["harrison worked at langchain", "harrison likes spicy food"],
embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()
chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks
[[Document(id='a5d7c82a-e94f-4baa-8822-605c0b987dc7', metadata={}, page_content='harrison worked at langchain'),
Document(id='6e7a681c-8208-4c2f-9015-4e64be453461', metadata={}, page_content='harrison likes spicy food')]]
Stream just yielded the final result from that component.
This is OK 🥹! Not all components have to implement streaming -- in some cases streaming is either unnecessary, difficult or just doesn't make sense.
An LCEL chain constructed using non-streaming components, will still be able to stream in a lot of cases, with streaming of partial output starting after the last non-streaming step in the chain.
for chunk in retrieval_chain.stream(
"Where did harrison work? " "Write 3 made up sentences about this place."
):
print(chunk, end="|", flush=True)
|According| to the context, Harrison worked at| Langchain.
Here are |3 made up sentences about Langchain|:
1. Langchain| is a bustling tech company locate|d in a modern office building with floor|-to-ceiling windows.
2. The| company's collaborative workspace features comfortable loun|ges where employees often brainstorm innovative| ideas.
3. Langchain's office| is known for its well-stocked coffee| bar and weekly team-building activities.||
Now that we've seen how stream and astream work, let's venture into the world of streaming events.
Using Stream Events
Stream Events is a beta API designed for handling event-based streaming in LangChain. Since it’s still in development, it may undergo changes based on user feedback.
This guide uses the V2 API, which requires langchain-core >= 0.2.
For the V1 API compatible with older versions of LangChain, see here
To ensure the astream_events API works correctly, follow these guidelines
Use async Across the Code:
Wherever possible, use async functions and tools throughout your code to maintain compatibility with the API.
Propagate Callbacks:
If you’re defining custom functions or runnable objects, make sure to propagate callbacks properly so that events can be captured and processed.
Force LLMs to Stream Tokens:
When using runnables without LCEL, call .astream() on LLMs instead of .ainvoke to ensure tokens are streamed incrementally.
Test and Report Issues:
If anything doesn’t work as expected, provide feedback to improve the API.
Event Reference
When streaming is implemented properly, the inputs to a runnable will not be known until after the input stream has been entirely consumed.
This means that inputs will often be included only for end events and rather than for start events.
import langchain_core
langchain_core.__version__
'0.3.28'
Chat Model
Let's start off by looking at the events produced by a chat model.
What's with the version="v2" parameter? 😾
The version="v2" parameter is there because this is a beta API. Since we’re still making changes to it, this parameter helps avoid breaking your code in the future.
💡 Note: v2 is only supported in langchain-core >= 0.2.0.
events = []
async for event in model.astream_events("hello", version="v2"):
events.append(event)
Let's take a look at the few of the start event and a few of the end events.
Let's revisit the example chain that parsed streaming JSON to explore the streaming events API.
chain = (
model | JsonOutputParser()
)
events = [
event
async for event in chain.astream_events(
"output a list of the countries france, spain and korea and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
version="v2",
)
]
If you examine at the first few events, you'll notice that there are 3 different start events rather than 2 start events.
The three start events correspond to:
The chain (model + parser)
The model
The parser
events[:3]
[{'event': 'on_chain_start',
'data': {'input': 'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
'name': 'RunnableSequence',
'tags': [],
'run_id': '4b40eb58-73e1-4b7a-9b8f-3caa7530dce2',
'metadata': {},
'parent_ids': []},
{'event': 'on_chat_model_start',
'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}},
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'run_id': 'a1c23e6f-6dd1-4de6-b41a-e518d2a3d54f',
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-5-sonnet-latest',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'parent_ids': ['4b40eb58-73e1-4b7a-9b8f-3caa7530dce2']},
{'event': 'on_chat_model_stream',
'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-a1c23e6f-6dd1-4de6-b41a-e518d2a3d54f', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
'run_id': 'a1c23e6f-6dd1-4de6-b41a-e518d2a3d54f',
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-5-sonnet-latest',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'parent_ids': ['4b40eb58-73e1-4b7a-9b8f-3caa7530dce2']}]
What do you think you'd see if you looked at the last 3 events? what about the middle?
Let's use this API to take output the stream events from the model and the parser. We're ignoring start events, end events and events from the chain.
both the model and the parser support streaming, we see streaming events from both components in real time! Kind of cool isn't it? 🦜
num_events = 0
async for event in chain.astream_events(
"output a list of the countries france, spain and korea and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
version="v2",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Because this API produces so many events, it is useful to be able to filter on events.
You can filter by either component name, component tags or component type.
By Name
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)
max_events = 0
async for event in chain.astream_events(
"output a list of the countries france, spain and korea and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
version="v2",
include_names=["my_parser"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)
max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v2",
include_types=["chat_model"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
Tags in a runnable are inherited by all its child components.
This means that if you assign tags to a parent component, those tags will automatically apply to its children as well.
For example:
If a parent runnable has the tag ["filter"], all its child components will also inherit this tag.
This can affect behavior if you're using tags to filter or track specific components.
What You Should Keep in Mind:
Filtering Tags: If you're using tags to filter components, make sure this automatic inheritance is what you want.
Overlapping Tags: Be cautious if child components already have their own tags, as inherited tags may lead to unintended behavior.
Intentional Tagging: If you don't want tags to apply to certain child components, you may need to adjust the tagging logic manually.
By understanding how tags work, you can ensure they align with your desired functionality!
chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})
max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v2",
include_tags=["my_chain"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': '20a14e39-922c-47d9-ac1b-773756da2a3e', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}}, 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
{'event': 'on_parser_start', 'data': {}, 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'run_id': '50351c06-2ea6-4045-b70d-a2e9de241353', 'metadata': {}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f')}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
{'event': 'on_parser_stream', 'run_id': '50351c06-2ea6-4045-b70d-a2e9de241353', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
{'event': 'on_chain_stream', 'run_id': '20a14e39-922c-47d9-ac1b-773756da2a3e', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': []}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n "countries": [\n {', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f')}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
{'event': 'on_parser_stream', 'run_id': '50351c06-2ea6-4045-b70d-a2e9de241353', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
{'event': 'on_chain_stream', 'run_id': '20a14e39-922c-47d9-ac1b-773756da2a3e', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': []}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n "name": "France",\n "', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f')}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
...
Handling Non-Streaming Components
Some components don’t work well with input streams and may interrupt the streaming of the final output when using astream.
However, when using astream_events, you can still get streaming events from intermediate steps that support streaming!
This allows partial progress to be tracked, even if some components are not compatible with streaming.
# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""
if "countries" not in inputs:
return ""
countries = inputs["countries"]
if not isinstance(countries, list):
return ""
country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names
chain = (
model | JsonOutputParser() | _extract_country_names
)
As expected, the astream API doesn't work correctly because _extract_country_names doesn't operate on streams.
async for chunk in chain.astream(
"output a list of the countries france, spain and korea and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
print(chunk, flush=True)
['France', 'Spain', 'Korea']
Now, let's confirm that with astream_events we're still seeing streaming output from the model and the parser.
num_events = 0
async for event in chain.astream_events(
"output a list of the countries france, spain and korea and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
version="v2",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
When invoking runnables inside your tools, you must manually propagate callbacks to the runnable. Without this, no stream events will be generated.
If you're using RunnableLambdas or the @chain decorator, callbacks are automatically propagated in the background.
This example shows how callbacks are not propagated in a custom tool (bad_tool), resulting in incomplete event tracking and missing streaming events (on_chain_stream).
from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool
def reverse_word(word: str):
return word[::-1]
reverse_word = RunnableLambda(reverse_word)
@tool
def bad_tool(word: str):
"""Custom tool that doesn't propagate callbacks."""
return reverse_word.invoke(word)
async for event in bad_tool.astream_events("hello", version="v2"):
print(event)