LangChain OpenTutorial
  • 🦜️🔗 The LangChain Open Tutorial for Everyone
  • 01-Basic
    • Getting Started on Windows
    • 02-Getting-Started-Mac
    • OpenAI API Key Generation and Testing Guide
    • LangSmith Tracking Setup
    • Using the OpenAI API (GPT-4o Multimodal)
    • Basic Example: Prompt+Model+OutputParser
    • LCEL Interface
    • Runnable
  • 02-Prompt
    • Prompt Template
    • Few-Shot Templates
    • LangChain Hub
    • Personal Prompts for LangChain
    • Prompt Caching
  • 03-OutputParser
    • PydanticOutputParser
    • PydanticOutputParser
    • CommaSeparatedListOutputParser
    • Structured Output Parser
    • JsonOutputParser
    • PandasDataFrameOutputParser
    • DatetimeOutputParser
    • EnumOutputParser
    • Output Fixing Parser
  • 04-Model
    • Using Various LLM Models
    • Chat Models
    • Caching
    • Caching VLLM
    • Model Serialization
    • Check Token Usage
    • Google Generative AI
    • Huggingface Endpoints
    • HuggingFace Local
    • HuggingFace Pipeline
    • ChatOllama
    • GPT4ALL
    • Video Q&A LLM (Gemini)
  • 05-Memory
    • ConversationBufferMemory
    • ConversationBufferWindowMemory
    • ConversationTokenBufferMemory
    • ConversationEntityMemory
    • ConversationKGMemory
    • ConversationSummaryMemory
    • VectorStoreRetrieverMemory
    • LCEL (Remembering Conversation History): Adding Memory
    • Memory Using SQLite
    • Conversation With History
  • 06-DocumentLoader
    • Document & Document Loader
    • PDF Loader
    • WebBaseLoader
    • CSV Loader
    • Excel File Loading in LangChain
    • Microsoft Word(doc, docx) With Langchain
    • Microsoft PowerPoint
    • TXT Loader
    • JSON
    • Arxiv Loader
    • UpstageDocumentParseLoader
    • LlamaParse
    • HWP (Hangeul) Loader
  • 07-TextSplitter
    • Character Text Splitter
    • 02. RecursiveCharacterTextSplitter
    • Text Splitting Methods in NLP
    • TokenTextSplitter
    • SemanticChunker
    • Split code with Langchain
    • MarkdownHeaderTextSplitter
    • HTMLHeaderTextSplitter
    • RecursiveJsonSplitter
  • 08-Embedding
    • OpenAI Embeddings
    • CacheBackedEmbeddings
    • HuggingFace Embeddings
    • Upstage
    • Ollama Embeddings With Langchain
    • LlamaCpp Embeddings With Langchain
    • GPT4ALL
    • Multimodal Embeddings With Langchain
  • 09-VectorStore
    • Vector Stores
    • Chroma
    • Faiss
    • Pinecone
    • Qdrant
    • Elasticsearch
    • MongoDB Atlas
    • PGVector
    • Neo4j
    • Weaviate
    • Faiss
    • {VectorStore Name}
  • 10-Retriever
    • VectorStore-backed Retriever
    • Contextual Compression Retriever
    • Ensemble Retriever
    • Long Context Reorder
    • Parent Document Retriever
    • MultiQueryRetriever
    • MultiVectorRetriever
    • Self-querying
    • TimeWeightedVectorStoreRetriever
    • TimeWeightedVectorStoreRetriever
    • Kiwi BM25 Retriever
    • Ensemble Retriever with Convex Combination (CC)
  • 11-Reranker
    • Cross Encoder Reranker
    • JinaReranker
    • FlashRank Reranker
  • 12-RAG
    • Understanding the basic structure of RAG
    • RAG Basic WebBaseLoader
    • Exploring RAG in LangChain
    • RAPTOR: Recursive Abstractive Processing for Tree-Organized Retrieval
    • Conversation-With-History
    • Translation
    • Multi Modal RAG
  • 13-LangChain-Expression-Language
    • RunnablePassthrough
    • Inspect Runnables
    • RunnableLambda
    • Routing
    • Runnable Parallel
    • Configure-Runtime-Chain-Components
    • Creating Runnable objects with chain decorator
    • RunnableWithMessageHistory
    • Generator
    • Binding
    • Fallbacks
    • RunnableRetry
    • WithListeners
    • How to stream runnables
  • 14-Chains
    • Summarization
    • SQL
    • Structured Output Chain
    • StructuredDataChat
  • 15-Agent
    • Tools
    • Bind Tools
    • Tool Calling Agent
    • Tool Calling Agent with More LLM Models
    • Iteration-human-in-the-loop
    • Agentic RAG
    • CSV/Excel Analysis Agent
    • Agent-with-Toolkits-File-Management
    • Make Report Using RAG, Web searching, Image generation Agent
    • TwoAgentDebateWithTools
    • React Agent
  • 16-Evaluations
    • Generate synthetic test dataset (with RAGAS)
    • Evaluation using RAGAS
    • HF-Upload
    • LangSmith-Dataset
    • LLM-as-Judge
    • Embedding-based Evaluator(embedding_distance)
    • LangSmith Custom LLM Evaluation
    • Heuristic Evaluation
    • Compare experiment evaluations
    • Summary Evaluators
    • Groundedness Evaluation
    • Pairwise Evaluation
    • LangSmith Repeat Evaluation
    • LangSmith Online Evaluation
    • LangFuse Online Evaluation
  • 17-LangGraph
    • 01-Core-Features
      • Understanding Common Python Syntax Used in LangGraph
      • Title
      • Building a Basic Chatbot with LangGraph
      • Building an Agent with LangGraph
      • Agent with Memory
      • LangGraph Streaming Outputs
      • Human-in-the-loop
      • LangGraph Manual State Update
      • Asking Humans for Help: Customizing State in LangGraph
      • DeleteMessages
      • DeleteMessages
      • LangGraph ToolNode
      • LangGraph ToolNode
      • Branch Creation for Parallel Node Execution
      • Conversation Summaries with LangGraph
      • Conversation Summaries with LangGraph
      • LangGrpah Subgraph
      • How to transform the input and output of a subgraph
      • LangGraph Streaming Mode
      • Errors
      • A Long-Term Memory Agent
    • 02-Structures
      • LangGraph-Building-Graphs
      • Naive RAG
      • Add Groundedness Check
      • Adding a Web Search Module
      • LangGraph-Add-Query-Rewrite
      • Agentic RAG
      • Adaptive RAG
      • Multi-Agent Structures (1)
      • Multi Agent Structures (2)
    • 03-Use-Cases
      • LangGraph Agent Simulation
      • Meta Prompt Generator based on User Requirements
      • CRAG: Corrective RAG
      • Plan-and-Execute
      • Multi Agent Collaboration Network
      • Multi Agent Collaboration Network
      • Multi-Agent Supervisor
      • 08-LangGraph-Hierarchical-Multi-Agent-Teams
      • 08-LangGraph-Hierarchical-Multi-Agent-Teams
      • SQL-Agent
      • 10-LangGraph-Research-Assistant
      • LangGraph Code Assistant
      • Deploy on LangGraph Cloud
      • Tree of Thoughts (ToT)
      • Ollama Deep Researcher (Deepseek-R1)
      • Functional API
      • Reflection in LangGraph
  • 19-Cookbook
    • 01-SQL
      • TextToSQL
      • SpeechToSQL
    • 02-RecommendationSystem
      • ResumeRecommendationReview
    • 03-GraphDB
      • Movie QA System with Graph Database
      • 05-TitanicQASystem
      • Real-Time GraphRAG QA
    • 04-GraphRAG
      • Academic Search System
      • Academic QA System with GraphRAG
    • 05-AIMemoryManagementSystem
      • ConversationMemoryManagementSystem
    • 06-Multimodal
      • Multimodal RAG
      • Shopping QnA
    • 07-Agent
      • 14-MoARAG
      • CoT Based Smart Web Search
      • 16-MultiAgentShoppingMallSystem
      • Agent-Based Dynamic Slot Filling
      • Code Debugging System
      • New Employee Onboarding Chatbot
      • 20-LangGraphStudio-MultiAgent
      • Multi-Agent Scheduler System
    • 08-Serving
      • FastAPI Serving
      • Sending Requests to Remote Graph Server
      • Building a Agent API with LangServe: Integrating Currency Exchange and Trip Planning
    • 08-SyntheticDataset
      • Synthetic Dataset Generation using RAG
    • 09-Monitoring
      • Langfuse Selfhosting
Powered by GitBook
On this page
  • Overview
  • Table of Contents
  • References
  • Environment Setup
  • Using Stream
  • LLMs and Chat Models
  • Chains
  • What is LCEL?
  • Streaming with Parsers
  • Working with Input Streams
  • Non-Streaming Components
  • Using Stream Events
  • Event Reference
  • Chat Model
  • Filtering Events
  • By Name
  • By Type
  • By Tags
  • Handling Non-Streaming Components
  • Propagating Callbacks
  1. 13-LangChain-Expression-Language

How to stream runnables

PreviousWithListenersNext14-Chains

Last updated 28 days ago

  • Author:

  • Peer Review : ,

  • Proofread :

  • This is a part of

Overview

Streaming is critical in making applications based on LLMs feel responsive to end-users.

Important LangChain primitives like chat models, output parsers, prompts, retrievers, and agents implement the LangChain Runnable Interface.

This interface provides two general approaches to stream content:

  1. sync stream and async astream:

    • a default implementation of streaming that streams the final output from the chain.

  2. async astream_events and async astream_log:

    • these provide a way to stream both intermediate steps and final output from the chain.

Let's explore both approaches, and try to understand how to use them.

Table of Contents

References


Environment Setup

[Note]

  • langchain-opentutorial is a package that provides a set of easy-to-use environment setup, useful functions and utilities for tutorials.

%%capture --no-stderr
%pip install langchain-opentutorial
# Install required packages
from langchain_opentutorial import package

package.install(
    [
        "langchain",
        "langchain_community",
        "langchain_openai",
        "langchain_anthropic",
    ],
    verbose=False,
    upgrade=False,
)
# Set environment variables
from langchain_opentutorial import set_env

set_env(
    {
        "OPENAI_API_KEY": "",
        "ANTHROPIC_API_KEY": "",
    }
)
Environment variables have been set successfully.
from dotenv import load_dotenv

load_dotenv(override=True)
False

Using Stream

All Runnable objects provide two stream methods:

  • Synchronous (sync): stream

  • Asynchronous (async): astream

These methods are designed to process the final output in small chunks, returning each chunk as soon as it is available.

How Streaming Works

Streaming is only possible when every step in the program processes input data one chunk at a time and yields the corresponding output chunk.

The complexity of processing can vary, such as:

  • Simple tasks: Emitting tokens generated by an LLM one by one.

  • Complex tasks: Streaming parts of a JSON result before the entire JSON is completed.

The best way to explore streaming is to focus on the most critical component in LLM-based apps: the LLMs themselves.

LLMs and Chat Models

LLMs and chat models are the primary bottlenecks in LLM-based applications.

LLMs can take several seconds to generate a complete response to a query. This is much slower than the 200-300ms threshold at which users perceive an application as responsive.

Solution: Use Streaming to Improve Responsiveness

To reduce waiting time for users, it's essential to show intermediate progress. This can be achieved by streaming the model’s output token by token and displaying it immediately to the user.

Let’s explore an example of streaming using a chat model. Choose one of the options below to get started!

from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-5-sonnet-latest", temperature=0)

Let's start with the sync stream API

chunks = []
for chunk in model.stream("What color is a polar bear?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)
|Polar bears actually| have transparent, hollow fur, not| white fur as commonly believed. Their skin| is black underneath. The fur appears| white because the transparent hair scatters an|d reflects visible light, creating the white appearance we| see. This helps them blend in with their| snowy Arctic environment. The hollow| nature of their fur also helps trap air for| insulation.||

Alternatively, if you're working in an async environment, you may consider using the async astream API

chunks = []
async for chunk in model.astream("What color is a polar bear?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)
|Polar bears actually| have transparent, hollow fur, not white| fur as commonly believed. Their skin| is black underneath. The| fur appears white because the transparent hair| scatters and reflects visible light, creating| the white appearance we see.| This helps them blend in with their| snowy Arctic environment. The| hollow nature of their fur also helps trap| air for insulation.||

Let's inspect one of the chunks

chunks[0]
AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-2611aa2c-0556-4ea6-b622-cf8010df2785', usage_metadata={'input_tokens': 14, 'output_tokens': 4, 'total_tokens': 18, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})

We got back something called an AIMessageChunk. This chunk represents a part of an AIMessage.

Message chunks are additive by design -- one can simply add them up to get the state of the response so far

chunks[0] + chunks[1] + chunks[2]
AIMessageChunk(content='Polar bears actually have transparent, hollow fur, not white', additional_kwargs={}, response_metadata={}, id='run-2611aa2c-0556-4ea6-b622-cf8010df2785', usage_metadata={'input_tokens': 14, 'output_tokens': 4, 'total_tokens': 18, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})
chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4] + chunks[5]
AIMessageChunk(content='Polar bears actually have transparent, hollow fur, not white fur as commonly believed. Their skin is black underneath. The fur appears white because the transparent hair', additional_kwargs={}, response_metadata={}, id='run-2611aa2c-0556-4ea6-b622-cf8010df2785', usage_metadata={'input_tokens': 14, 'output_tokens': 4, 'total_tokens': 18, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})

Chains

Most LLM applications involve multiple steps, not just a simple call to a language model.

Here, we’ll create a simple chain using the LangChain Expression Language (LCEL). This chain will combine a prompt, a model, and a parser, and we’ll verify that streaming works.

Example: Using a Parser for Streaming We’ll use the StrOutputParser to process the output from the model. This parser extracts the content field from an AIMessageChunk, which gives us each token generated by the model.

What is LCEL?

LCEL is a declarative way to define a "program" by linking together different components (called LangChain primitives).

Chains created with LCEL have some advantages:

  • Built-in streaming: They automatically support stream and astream, allowing the final output to be streamed in chunks.

  • Standard Runnable interface: Chains built using LCEL fully implement this interface, making them flexible and easy to use.

Streaming with Parsers

Even though we’re using a parser at the end of the chain, streaming still works.

This is because the parser processes each streaming chunk individually, instead of waiting for the full output. Many LCEL components also support this kind of chunk-by-chunk processing, which makes it easier to build applications that need streaming.

You can create custom functions that return generators to handle streaming data effectively.

Certain components, like prompt templates or chat models, don’t process individual chunks. Instead, they collect all data from previous steps before proceeding. This behavior can interrupt the streaming process, so keep it in mind when designing your chain.

LCEL gives you flexibility to decide how your chain operates (e.g., sync/async, batch/streaming). If LCEL doesn’t fit your needs, you can always use a standard, step-by-step approach:

  • Call invoke, batch, or stream on each component.

  • Assign the results to variables.

  • Use those variables in the next steps as needed. This lets you switch between a declarative or imperative style depending on what works best for your project.

Now, let’s get started with creating a simple chain!

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "apple"}):
    print(chunk, end="|", flush=True)
|Here's an| apple joke for you:
    
    Why| did the apple go to the| doctor?
    Because it wasn|'t peeling well! 
    
    Here|'s another one:
    What kin|d of apple has a short temper?
    A| crab apple! |🍎||

Working with Input Streams

What if you wanted to stream JSON as it’s being generated?

If you use json.loads to parse partial JSON, it will fail because partial JSON is not valid JSON. At first, this might seem impossible to solve, and you might assume that streaming JSON isn’t doable.

It turns out there is a way!

  • The parser needs to process the input stream and try to "auto-complete" the partial JSON to make it valid.

  • This allows the JSON to be processed and streamed chunk by chunk.

To better understand how this works, we’ll explore an example of a parser that can handle partial JSON streams effectively.

from langchain_core.output_parsers import JsonOutputParser

chain = (
    model | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
    "output a list of the countries france, spain and korea and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, flush=True)
{}
    {'countries': [{}]}
    {'countries': [{'name': 'France'}]}
    {'countries': [{'name': 'France', 'population': 67391582}]}
    {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain'}]}
    {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615}]}
    {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {}]}
    {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea'}]}
    {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea', 'population': 51744876}]}

Now, let's break streaming. We'll use the previous example and append an extraction function at the end that extracts the country names from the finalized JSON.

Why Does This Break Streaming?

  • Any step in the chain that processes finalized inputs (rather than handling input streams) can disrupt the streaming process.

  • When this happens, both stream and astream functionalities stop working properly.

Later, we will discuss the astream_events API which streams results from intermediate steps. This API will stream results from intermediate steps even if the chain contains steps that only operate on finalized inputs.

from langchain_core.output_parsers import (
    JsonOutputParser,
)

# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return []

    countries = inputs.get("countries", [])

    if not isinstance(countries, list):
        return []

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names

chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
    "output a list of the countries france, spain and korea and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, end="|", flush=True)
['France', 'Spain', 'Korea']|

Generator Functions

Let’s fix the streaming issue by using a generator function. A generator function allows you to process input streams efficiently by using the yield keyword.

Why Use a Generator Function?

  • A generator function processes input streams piece by piece.

  • It’s a simple way to handle streaming data without waiting for the entire input to be completed.

A generator function (a function that uses yield) makes it easier to write code that works with streaming inputs.

from langchain_core.output_parsers import JsonOutputParser

async def _extract_country_names_streaming(input_stream):
    """A function that operates on input streams."""
    country_names_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                yield name
                country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
    "output a list of the countries france, spain and korea and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(text, end="|", flush=True)
France|Spain|Korea|

Because the code above is relying on JSON auto-completion, you may see partial names of countries (e.g., Sp and Spain), which is not what one would want for an extraction result!

We're focusing on streaming concepts, not necessarily the results of the chains.

Non-Streaming Components

Some built-in components like Retrievers do not offer any streaming. What happens if we try to stream them? 🤨

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
    ["harrison worked at langchain", "harrison likes spicy food"],
    embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks
[[Document(id='a5d7c82a-e94f-4baa-8822-605c0b987dc7', metadata={}, page_content='harrison worked at langchain'),
      Document(id='6e7a681c-8208-4c2f-9015-4e64be453461', metadata={}, page_content='harrison likes spicy food')]]

Stream just yielded the final result from that component.

This is OK 🥹! Not all components have to implement streaming -- in some cases streaming is either unnecessary, difficult or just doesn't make sense.

An LCEL chain constructed using non-streaming components, will still be able to stream in a lot of cases, with streaming of partial output starting after the last non-streaming step in the chain.

retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
    | StrOutputParser()
)
for chunk in retrieval_chain.stream(
    "Where did harrison work? " "Write 3 made up sentences about this place."
):
    print(chunk, end="|", flush=True)
|According| to the context, Harrison worked at| Langchain.
    
    Here are |3 made up sentences about Langchain|:
    1. Langchain| is a bustling tech company locate|d in a modern office building with floor|-to-ceiling windows.
    2. The| company's collaborative workspace features comfortable loun|ges where employees often brainstorm innovative| ideas.
    3. Langchain's office| is known for its well-stocked coffee| bar and weekly team-building activities.||

Now that we've seen how stream and astream work, let's venture into the world of streaming events.

Using Stream Events

Stream Events is a beta API designed for handling event-based streaming in LangChain. Since it’s still in development, it may undergo changes based on user feedback.

  • This guide uses the V2 API, which requires langchain-core >= 0.2.

To ensure the astream_events API works correctly, follow these guidelines

  1. Use async Across the Code:

    • Wherever possible, use async functions and tools throughout your code to maintain compatibility with the API.

  2. Propagate Callbacks:

    • If you’re defining custom functions or runnable objects, make sure to propagate callbacks properly so that events can be captured and processed.

  3. Force LLMs to Stream Tokens:

    • When using runnables without LCEL, call .astream() on LLMs instead of .ainvoke to ensure tokens are streamed incrementally.

  4. Test and Report Issues:

    • If anything doesn’t work as expected, provide feedback to improve the API.

Event Reference

When streaming is implemented properly, the inputs to a runnable will not be known until after the input stream has been entirely consumed. This means that inputs will often be included only for end events and rather than for start events.

import langchain_core

langchain_core.__version__
'0.3.28'

Chat Model

Let's start off by looking at the events produced by a chat model.

What's with the version="v2" parameter? 😾

  • The version="v2" parameter is there because this is a beta API. Since we’re still making changes to it, this parameter helps avoid breaking your code in the future.

💡 Note: v2 is only supported in langchain-core >= 0.2.0.

events = []
async for event in model.astream_events("hello", version="v2"):
    events.append(event)

Let's take a look at the few of the start event and a few of the end events.

events[:3]
[{'event': 'on_chat_model_start',
      'data': {'input': 'hello'},
      'name': 'ChatAnthropic',
      'tags': [],
      'run_id': '42b66300-54db-4f6d-82b8-8b7fe89ca077',
      'metadata': {'ls_provider': 'anthropic',
       'ls_model_name': 'claude-3-5-sonnet-latest',
       'ls_model_type': 'chat',
       'ls_temperature': 0.0,
       'ls_max_tokens': 1024},
      'parent_ids': []},
     {'event': 'on_chat_model_stream',
      'run_id': '42b66300-54db-4f6d-82b8-8b7fe89ca077',
      'name': 'ChatAnthropic',
      'tags': [],
      'metadata': {'ls_provider': 'anthropic',
       'ls_model_name': 'claude-3-5-sonnet-latest',
       'ls_model_type': 'chat',
       'ls_temperature': 0.0,
       'ls_max_tokens': 1024},
      'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-42b66300-54db-4f6d-82b8-8b7fe89ca077', usage_metadata={'input_tokens': 8, 'output_tokens': 1, 'total_tokens': 9, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
      'parent_ids': []},
     {'event': 'on_chat_model_stream',
      'run_id': '42b66300-54db-4f6d-82b8-8b7fe89ca077',
      'name': 'ChatAnthropic',
      'tags': [],
      'metadata': {'ls_provider': 'anthropic',
       'ls_model_name': 'claude-3-5-sonnet-latest',
       'ls_model_type': 'chat',
       'ls_temperature': 0.0,
       'ls_max_tokens': 1024},
      'data': {'chunk': AIMessageChunk(content='Hi', additional_kwargs={}, response_metadata={}, id='run-42b66300-54db-4f6d-82b8-8b7fe89ca077')},
      'parent_ids': []}]
events[-2:]
[{'event': 'on_chat_model_stream',
      'run_id': '42b66300-54db-4f6d-82b8-8b7fe89ca077',
      'name': 'ChatAnthropic',
      'tags': [],
      'metadata': {'ls_provider': 'anthropic',
       'ls_model_name': 'claude-3-5-sonnet-latest',
       'ls_model_type': 'chat',
       'ls_temperature': 0.0,
       'ls_max_tokens': 1024},
      'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-42b66300-54db-4f6d-82b8-8b7fe89ca077', usage_metadata={'input_tokens': 0, 'output_tokens': 12, 'total_tokens': 12, 'input_token_details': {}})},
      'parent_ids': []},
     {'event': 'on_chat_model_end',
      'data': {'output': AIMessageChunk(content='Hi! How can I help you today?', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-42b66300-54db-4f6d-82b8-8b7fe89ca077', usage_metadata={'input_tokens': 8, 'output_tokens': 13, 'total_tokens': 21, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
      'run_id': '42b66300-54db-4f6d-82b8-8b7fe89ca077',
      'name': 'ChatAnthropic',
      'tags': [],
      'metadata': {'ls_provider': 'anthropic',
       'ls_model_name': 'claude-3-5-sonnet-latest',
       'ls_model_type': 'chat',
       'ls_temperature': 0.0,
       'ls_max_tokens': 1024},
      'parent_ids': []}]

Let's revisit the example chain that parsed streaming JSON to explore the streaming events API.

chain = (
    model | JsonOutputParser()
) 

events = [
    event
    async for event in chain.astream_events(
        "output a list of the countries france, spain and korea and their populations in JSON format. "
        'Use a dict with an outer key of "countries" which contains a list of countries. '
        "Each country should have the key `name` and `population`",
        version="v2",
    )
]

If you examine at the first few events, you'll notice that there are 3 different start events rather than 2 start events.

The three start events correspond to:

  1. The chain (model + parser)

  2. The model

  3. The parser

events[:3]
[{'event': 'on_chain_start',
      'data': {'input': 'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
      'name': 'RunnableSequence',
      'tags': [],
      'run_id': '4b40eb58-73e1-4b7a-9b8f-3caa7530dce2',
      'metadata': {},
      'parent_ids': []},
     {'event': 'on_chat_model_start',
      'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}},
      'name': 'ChatAnthropic',
      'tags': ['seq:step:1'],
      'run_id': 'a1c23e6f-6dd1-4de6-b41a-e518d2a3d54f',
      'metadata': {'ls_provider': 'anthropic',
       'ls_model_name': 'claude-3-5-sonnet-latest',
       'ls_model_type': 'chat',
       'ls_temperature': 0.0,
       'ls_max_tokens': 1024},
      'parent_ids': ['4b40eb58-73e1-4b7a-9b8f-3caa7530dce2']},
     {'event': 'on_chat_model_stream',
      'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-a1c23e6f-6dd1-4de6-b41a-e518d2a3d54f', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
      'run_id': 'a1c23e6f-6dd1-4de6-b41a-e518d2a3d54f',
      'name': 'ChatAnthropic',
      'tags': ['seq:step:1'],
      'metadata': {'ls_provider': 'anthropic',
       'ls_model_name': 'claude-3-5-sonnet-latest',
       'ls_model_type': 'chat',
       'ls_temperature': 0.0,
       'ls_max_tokens': 1024},
      'parent_ids': ['4b40eb58-73e1-4b7a-9b8f-3caa7530dce2']}]

What do you think you'd see if you looked at the last 3 events? what about the middle? Let's use this API to take output the stream events from the model and the parser. We're ignoring start events, end events and events from the chain.

both the model and the parser support streaming, we see streaming events from both components in real time! Kind of cool isn't it? 🦜

num_events = 0

async for event in chain.astream_events(
    "output a list of the countries france, spain and korea and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break
Chat model chunk: ''
    Chat model chunk: '{'
    Parser chunk: {}
    Chat model chunk: '\n  "countries":'
    Chat model chunk: ' [\n    {\n      "name": "'
    Parser chunk: {'countries': [{'name': ''}]}
    Chat model chunk: 'France",\n      "'
    Parser chunk: {'countries': [{'name': 'France'}]}
    Chat model chunk: 'population": 67391582\n    },'
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}]}
    Chat model chunk: '\n    {\n      "name": "Spain",'
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain'}]}
    Chat model chunk: '\n      "population": 47615'
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615}]}
    Chat model chunk: '034\n    },\n    '
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}]}
    Chat model chunk: '{\n      "name": "Korea",'
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea'}]}
    Chat model chunk: '\n      "population": 51744'
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea', 'population': 51744}]}
    ...

Filtering Events

Because this API produces so many events, it is useful to be able to filter on events.

You can filter by either component name, component tags or component type.

By Name

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
    "output a list of the countries france, spain and korea and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
    include_names=["my_parser"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break
{'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'metadata': {}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}
    {'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}
    {'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}
    {'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France'}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}
    {'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}
    {'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain'}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}
    {'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}
    {'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}
    {'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea'}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}
    {'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea', 'population': 51744876}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}
    {'event': 'on_parser_end', 'data': {'output': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea', 'population': 51744876}]}}, 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}
    ...

By Type

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v2",
    include_types=["chat_model"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break
{'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n  "countries":', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content=' [\n    {\n      "name": "', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='France",\n      "population": 67391', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='582\n    },', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n    {\n      "', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='name": "Spain",\n      "population":', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content=' 47615034\n    },\n    ', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{\n      "name": "Korea",', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}
    ...

By Tags

Tags in a runnable are inherited by all its child components.

This means that if you assign tags to a parent component, those tags will automatically apply to its children as well.

For example:

  • If a parent runnable has the tag ["filter"], all its child components will also inherit this tag.

  • This can affect behavior if you're using tags to filter or track specific components.

What You Should Keep in Mind:

  • Filtering Tags: If you're using tags to filter components, make sure this automatic inheritance is what you want.

  • Overlapping Tags: Be cautious if child components already have their own tags, as inherited tags may lead to unintended behavior.

  • Intentional Tagging: If you don't want tags to apply to certain child components, you may need to adjust the tagging logic manually.

By understanding how tags work, you can ensure they align with your desired functionality!

chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v2",
    include_tags=["my_chain"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break
{'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': '20a14e39-922c-47d9-ac1b-773756da2a3e', 'metadata': {}, 'parent_ids': []}
    {'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}}, 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
    {'event': 'on_parser_start', 'data': {}, 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'run_id': '50351c06-2ea6-4045-b70d-a2e9de241353', 'metadata': {}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f')}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
    {'event': 'on_parser_stream', 'run_id': '50351c06-2ea6-4045-b70d-a2e9de241353', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
    {'event': 'on_chain_stream', 'run_id': '20a14e39-922c-47d9-ac1b-773756da2a3e', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': []}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n  "countries": [\n    {', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f')}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
    {'event': 'on_parser_stream', 'run_id': '50351c06-2ea6-4045-b70d-a2e9de241353', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
    {'event': 'on_chain_stream', 'run_id': '20a14e39-922c-47d9-ac1b-773756da2a3e', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': []}
    {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n      "name": "France",\n      "', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f')}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}
    ...

Handling Non-Streaming Components

Some components don’t work well with input streams and may interrupt the streaming of the final output when using astream.

However, when using astream_events, you can still get streaming events from intermediate steps that support streaming! This allows partial progress to be tracked, even if some components are not compatible with streaming.

# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = (
    model | JsonOutputParser() | _extract_country_names
) 

As expected, the astream API doesn't work correctly because _extract_country_names doesn't operate on streams.

async for chunk in chain.astream(
    "output a list of the countries france, spain and korea and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(chunk, flush=True)
['France', 'Spain', 'Korea']

Now, let's confirm that with astream_events we're still seeing streaming output from the model and the parser.

num_events = 0

async for event in chain.astream_events(
    "output a list of the countries france, spain and korea and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break
Chat model chunk: ''
    Chat model chunk: '{'
    Parser chunk: {}
    Chat model chunk: '\n  "countries": [\n    {'
    Parser chunk: {'countries': [{}]}
    Chat model chunk: '\n      "name": "France",\n      "'
    Parser chunk: {'countries': [{'name': 'France'}]}
    Chat model chunk: 'population": 67391582\n    },'
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}]}
    Chat model chunk: '\n    {\n      "name": "Spain",'
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain'}]}
    Chat model chunk: '\n      "population": 47615'
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615}]}
    Chat model chunk: '034\n    },\n    {\n      "name'
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {}]}
    Chat model chunk: '": "Korea",\n      "population": '
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea'}]}
    Chat model chunk: '51744876\n    }\n  '
    Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea', 'population': 51744876}]}
    Chat model chunk: ']\n}'
    Chat model chunk: ''
    ...

Propagating Callbacks

When invoking runnables inside your tools, you must manually propagate callbacks to the runnable. Without this, no stream events will be generated.

If you're using RunnableLambdas or the @chain decorator, callbacks are automatically propagated in the background.

This example shows how callbacks are not propagated in a custom tool (bad_tool), resulting in incomplete event tracking and missing streaming events (on_chain_stream).

from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool

def reverse_word(word: str):
    return word[::-1]

reverse_word = RunnableLambda(reverse_word)

@tool
def bad_tool(word: str):
    """Custom tool that doesn't propagate callbacks."""
    return reverse_word.invoke(word)


async for event in bad_tool.astream_events("hello", version="v2"):
    print(event)
{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'bad_tool', 'tags': [], 'run_id': 'f179cc39-a3d2-4fb3-83f7-7574dfecf2f2', 'metadata': {}, 'parent_ids': []}
    {'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '75693727-ebbc-4cd3-9142-700e8e54a6b3', 'metadata': {}, 'parent_ids': ['f179cc39-a3d2-4fb3-83f7-7574dfecf2f2']}
    {'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '75693727-ebbc-4cd3-9142-700e8e54a6b3', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['f179cc39-a3d2-4fb3-83f7-7574dfecf2f2']}
    {'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'f179cc39-a3d2-4fb3-83f7-7574dfecf2f2', 'name': 'bad_tool', 'tags': [], 'metadata': {}, 'parent_ids': []}

If you're invoking runnables from within RunnableLambda or @chains, callbacks are handled automatically.

This means:

  • You don’t need to manually pass callbacks between runnables.

  • Events like on_chain_start, on_chain_end, and even streaming events (on_chain_stream) are seamlessly emitted and linked.

from langchain_core.runnables import RunnableLambda

async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2

reverse_and_double = RunnableLambda(reverse_and_double)

async for event in reverse_and_double.astream_events("1234", version="v2"):
    print(event)
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': 'bba3e8c4-7e4c-4daa-8c45-03f45a30913d', 'metadata': {}, 'parent_ids': []}
    {'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '883ad65d-cebb-41ef-8ce8-718c4a310521', 'metadata': {}, 'parent_ids': ['bba3e8c4-7e4c-4daa-8c45-03f45a30913d']}
    {'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '883ad65d-cebb-41ef-8ce8-718c4a310521', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['bba3e8c4-7e4c-4daa-8c45-03f45a30913d']}
    {'event': 'on_chain_stream', 'run_id': 'bba3e8c4-7e4c-4daa-8c45-03f45a30913d', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'chunk': '43214321'}, 'parent_ids': []}
    {'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': 'bba3e8c4-7e4c-4daa-8c45-03f45a30913d', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'parent_ids': []}

And with the @chain decorator:

from langchain_core.runnables import chain

@chain
async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2

async for event in reverse_and_double.astream_events("1234", version="v2"):
    print(event)
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': 'bc7ff368-3ac9-48be-a18a-882441d1c065', 'metadata': {}, 'parent_ids': []}
    {'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': 'be79cf18-29e6-4de7-acf6-45f51cd101e3', 'metadata': {}, 'parent_ids': ['bc7ff368-3ac9-48be-a18a-882441d1c065']}
    {'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': 'be79cf18-29e6-4de7-acf6-45f51cd101e3', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['bc7ff368-3ac9-48be-a18a-882441d1c065']}
    {'event': 'on_chain_stream', 'run_id': 'bc7ff368-3ac9-48be-a18a-882441d1c065', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'chunk': '43214321'}, 'parent_ids': []}
    {'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': 'bc7ff368-3ac9-48be-a18a-882441d1c065', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'parent_ids': []}

Set up the environment. You may refer to for more details.

You can checkout the for more details.

For the V1 API compatible with older versions of LangChain, see

Langchain Conceptual Guide > Streaming
LangChain Conceptual Guide > Runnable interface
Environment Setup
langchain-opentutorial
here
byoon
seofield
stsr1284
Q0211
LangChain Open Tutorial
Overview
Environement Setup
Using Stream
Chains
Streaming with Parsers
Working with Input Streams
Non-Streaming Components
Using Stream Events
Chat Model
Filtering Events
Handling Non-Streaming Components
Propagating Callbacks